diff --git a/litellm/integrations/dynamodb.py b/litellm/integrations/dynamodb.py index c4d409d57..a77270170 100644 --- a/litellm/integrations/dynamodb.py +++ b/litellm/integrations/dynamodb.py @@ -49,8 +49,10 @@ class DyanmoDBLogger: self.dynamodb.create_table(**table_params) print_verbose(f'Table {self.table_name} created successfully') - - def log_event(self, kwargs, response_obj, start_time, end_time, print_verbose=print): + + async def _async_log_event(self, kwargs, response_obj, start_time, end_time, print_verbose): + self.log_event(kwargs, response_obj, start_time, end_time, print_verbose) + def log_event(self, kwargs, response_obj, start_time, end_time, print_verbose): try: print_verbose( f"DynamoDB Logging - Enters logging function for model {kwargs}" diff --git a/litellm/utils.py b/litellm/utils.py index e86d8df86..9a847ddc4 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -996,28 +996,6 @@ class Logging: end_time=end_time, print_verbose=print_verbose, ) - if callback == "dynamodb": - global dynamoLogger - if dynamoLogger is None: - dynamoLogger = DyanmoDBLogger() - if self.stream: - if "complete_streaming_response" in self.model_call_details: - print_verbose("DynamoDB Logger: Got Stream Event - Completed Stream Response") - dynamoLogger.log_event( - kwargs=self.model_call_details, - response_obj=self.model_call_details["complete_streaming_response"], - start_time=start_time, - end_time=end_time, - ) - else: - print_verbose("DynamoDB Logger: Got Stream Event - No complete stream response as yet") - else: - dynamoLogger.log_event( - kwargs=self.model_call_details, - response_obj=result, - start_time=start_time, - end_time=end_time, - ) if callback == "cache" and litellm.cache is not None: # this only logs streaming once, complete_streaming_response exists i.e when stream ends print_verbose("success_callback: reaches cache for logging!") @@ -1106,6 +1084,7 @@ class Logging: else: self.streaming_chunks.append(result) if complete_streaming_response: + print_verbose("Async success callbacks: Got a complete streaming response") self.model_call_details["complete_streaming_response"] = complete_streaming_response start_time, end_time, result = self._success_handler_helper_fn(start_time=start_time, end_time=end_time, result=result, cache_hit=cache_hit) for callback in litellm._async_success_callback: @@ -1157,6 +1136,30 @@ class Logging: print_verbose=print_verbose, callback_func=callback ) + if callback == "dynamodb": + global dynamoLogger + if dynamoLogger is None: + dynamoLogger = DyanmoDBLogger() + if self.stream: + if "complete_streaming_response" in self.model_call_details: + print_verbose("DynamoDB Logger: Got Stream Event - Completed Stream Response") + await dynamoLogger._async_log_event( + kwargs=self.model_call_details, + response_obj=self.model_call_details["complete_streaming_response"], + start_time=start_time, + end_time=end_time, + print_verbose=print_verbose + ) + else: + print_verbose("DynamoDB Logger: Got Stream Event - No complete stream response as yet") + else: + await dynamoLogger._async_log_event( + kwargs=self.model_call_details, + response_obj=result, + start_time=start_time, + end_time=end_time, + print_verbose=print_verbose, + ) except: print_verbose( f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {traceback.format_exc()}" @@ -1437,6 +1440,10 @@ def client(original_function): if inspect.iscoroutinefunction(callback): litellm._async_success_callback.append(callback) removed_async_items.append(index) + elif callback == "dynamodb": + # dynamo is an async callback, it's used for the proxy and needs to be async + litellm._async_success_callback.append(callback) + removed_async_items.append(index) # Pop the async items from success_callback in reverse order to avoid index issues for index in reversed(removed_async_items):