From bb33c9230d022f04ccb9f07320f7a3cffee4c96e Mon Sep 17 00:00:00 2001 From: ishaan-jaff Date: Wed, 28 Feb 2024 14:44:02 -0800 Subject: [PATCH] (fix) async logging race condition --- litellm/utils.py | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/litellm/utils.py b/litellm/utils.py index 18307eebc8..a5eee8c565 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -1640,7 +1640,7 @@ class Logging: verbose_logger.debug( "Async success callbacks: Got a complete streaming response" ) - self.model_call_details["complete_streaming_response"] = ( + self.model_call_details["async_complete_streaming_response"] = ( complete_streaming_response ) try: @@ -1688,28 +1688,31 @@ class Logging: print_verbose("async success_callback: reaches cache for logging!") kwargs = self.model_call_details if self.stream: - if "complete_streaming_response" not in kwargs: + if "async_complete_streaming_response" not in kwargs: print_verbose( - f"async success_callback: reaches cache for logging, there is no complete_streaming_response. Kwargs={kwargs}\n\n" + f"async success_callback: reaches cache for logging, there is no async_complete_streaming_response. Kwargs={kwargs}\n\n" ) pass else: print_verbose( - "async success_callback: reaches cache for logging, there is a complete_streaming_response. Adding to cache" + "async success_callback: reaches cache for logging, there is a async_complete_streaming_response. Adding to cache" ) - result = kwargs["complete_streaming_response"] + result = kwargs["async_complete_streaming_response"] # only add to cache once we have a complete streaming response litellm.cache.add_cache(result, **kwargs) if isinstance(callback, CustomLogger): # custom logger class print_verbose( - f"Async success callbacks: {callback}; self.stream: {self.stream}; complete_streaming_response: {self.model_call_details.get('complete_streaming_response', None)}" + f"Running Async success callback: {callback}; self.stream: {self.stream}; async_complete_streaming_response: {self.model_call_details.get('async_complete_streaming_response', None)} result={result}" ) if self.stream == True: - if "complete_streaming_response" in self.model_call_details: + if ( + "async_complete_streaming_response" + in self.model_call_details + ): await callback.async_log_success_event( kwargs=self.model_call_details, response_obj=self.model_call_details[ - "complete_streaming_response" + "async_complete_streaming_response" ], start_time=start_time, end_time=end_time, @@ -1730,15 +1733,18 @@ class Logging: ) if callable(callback): # custom logger functions print_verbose( - f"Making async function logging call - {self.model_call_details}" + f"Making async function logging call for {callback}, result={result} - {self.model_call_details}" ) if self.stream: - if "complete_streaming_response" in self.model_call_details: + if ( + "async_complete_streaming_response" + in self.model_call_details + ): await customLogger.async_log_event( kwargs=self.model_call_details, response_obj=self.model_call_details[ - "complete_streaming_response" + "async_complete_streaming_response" ], start_time=start_time, end_time=end_time, @@ -1759,14 +1765,17 @@ class Logging: if dynamoLogger is None: dynamoLogger = DyanmoDBLogger() if self.stream: - if "complete_streaming_response" in self.model_call_details: + if ( + "async_complete_streaming_response" + in self.model_call_details + ): print_verbose( "DynamoDB Logger: Got Stream Event - Completed Stream Response" ) await dynamoLogger._async_log_event( kwargs=self.model_call_details, response_obj=self.model_call_details[ - "complete_streaming_response" + "async_complete_streaming_response" ], start_time=start_time, end_time=end_time,