From 1562cba823f1fa3a122e8255e7e04f1139d6f9c7 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Fri, 26 Jul 2024 19:03:42 -0700 Subject: [PATCH] fix(utils.py): fix cache hits for streaming Fixes https://github.com/BerriAI/litellm/issues/4109 --- litellm/integrations/opentelemetry.py | 2 +- litellm/litellm_core_utils/litellm_logging.py | 4 +- litellm/proxy/_new_secret_config.yaml | 4 ++ litellm/tests/test_proxy_server.py | 1 + litellm/utils.py | 47 +++++++++++++------ 5 files changed, 42 insertions(+), 16 deletions(-) diff --git a/litellm/integrations/opentelemetry.py b/litellm/integrations/opentelemetry.py index c47911b4fd..ef14ad6794 100644 --- a/litellm/integrations/opentelemetry.py +++ b/litellm/integrations/opentelemetry.py @@ -463,7 +463,7 @@ class OpenTelemetry(CustomLogger): ############################################# # OTEL Attributes for the RAW Request to https://docs.anthropic.com/en/api/messages - if complete_input_dict: + if complete_input_dict and isinstance(complete_input_dict, dict): for param, val in complete_input_dict.items(): if not isinstance(val, str): val = str(val) diff --git a/litellm/litellm_core_utils/litellm_logging.py b/litellm/litellm_core_utils/litellm_logging.py index 0785933aab..85b6adc1e7 100644 --- a/litellm/litellm_core_utils/litellm_logging.py +++ b/litellm/litellm_core_utils/litellm_logging.py @@ -1220,7 +1220,9 @@ class Logging: """ Implementing async callbacks, to handle asyncio event loop issues when custom integrations need to use async functions. """ - print_verbose("Logging Details LiteLLM-Async Success Call") + print_verbose( + "Logging Details LiteLLM-Async Success Call, cache_hit={}".format(cache_hit) + ) start_time, end_time, result = self._success_handler_helper_fn( start_time=start_time, end_time=end_time, result=result, cache_hit=cache_hit ) diff --git a/litellm/proxy/_new_secret_config.yaml b/litellm/proxy/_new_secret_config.yaml index deec60b43d..c128477362 100644 --- a/litellm/proxy/_new_secret_config.yaml +++ b/litellm/proxy/_new_secret_config.yaml @@ -2,3 +2,7 @@ model_list: - model_name: "*" litellm_params: model: "*" + +litellm_settings: + success_callback: ["logfire"] + cache: true \ No newline at end of file diff --git a/litellm/tests/test_proxy_server.py b/litellm/tests/test_proxy_server.py index e088f2055d..0e5431c3fe 100644 --- a/litellm/tests/test_proxy_server.py +++ b/litellm/tests/test_proxy_server.py @@ -625,6 +625,7 @@ def test_chat_completion_optional_params(mock_acompletion, client_no_auth): # Run the test # test_chat_completion_optional_params() + # Test Reading config.yaml file from litellm.proxy.proxy_server import ProxyConfig diff --git a/litellm/utils.py b/litellm/utils.py index 7c22953bc2..a8ef6119ba 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -10009,6 +10009,12 @@ class CustomStreamWrapper: return model_response def __next__(self): + cache_hit = False + if ( + self.custom_llm_provider is not None + and self.custom_llm_provider == "cached_response" + ): + cache_hit = True try: if self.completion_stream is None: self.fetch_sync_stream() @@ -10073,7 +10079,8 @@ class CustomStreamWrapper: response.usage = complete_streaming_response.usage # type: ignore ## LOGGING threading.Thread( - target=self.logging_obj.success_handler, args=(response,) + target=self.logging_obj.success_handler, + args=(response, None, None, cache_hit), ).start() # log response self.sent_stream_usage = True return response @@ -10083,7 +10090,8 @@ class CustomStreamWrapper: processed_chunk = self.finish_reason_handler() ## LOGGING threading.Thread( - target=self.logging_obj.success_handler, args=(processed_chunk,) + target=self.logging_obj.success_handler, + args=(processed_chunk, None, None, cache_hit), ).start() # log response return processed_chunk except Exception as e: @@ -10120,6 +10128,12 @@ class CustomStreamWrapper: return self.completion_stream async def __anext__(self): + cache_hit = False + if ( + self.custom_llm_provider is not None + and self.custom_llm_provider == "cached_response" + ): + cache_hit = True try: if self.completion_stream is None: await self.fetch_stream() @@ -10174,11 +10188,12 @@ class CustomStreamWrapper: continue ## LOGGING threading.Thread( - target=self.logging_obj.success_handler, args=(processed_chunk,) + target=self.logging_obj.success_handler, + args=(processed_chunk, None, None, cache_hit), ).start() # log response asyncio.create_task( self.logging_obj.async_success_handler( - processed_chunk, + processed_chunk, cache_hit=cache_hit ) ) self.response_uptil_now += ( @@ -10225,11 +10240,11 @@ class CustomStreamWrapper: ## LOGGING threading.Thread( target=self.logging_obj.success_handler, - args=(processed_chunk,), + args=(processed_chunk, None, None, cache_hit), ).start() # log processed_chunk asyncio.create_task( self.logging_obj.async_success_handler( - processed_chunk, + processed_chunk, cache_hit=cache_hit ) ) @@ -10257,11 +10272,12 @@ class CustomStreamWrapper: response.usage = complete_streaming_response.usage ## LOGGING threading.Thread( - target=self.logging_obj.success_handler, args=(response,) + target=self.logging_obj.success_handler, + args=(response, None, None, cache_hit), ).start() # log response asyncio.create_task( self.logging_obj.async_success_handler( - response, + response, cache_hit=cache_hit ) ) self.sent_stream_usage = True @@ -10272,11 +10288,12 @@ class CustomStreamWrapper: processed_chunk = self.finish_reason_handler() ## LOGGING threading.Thread( - target=self.logging_obj.success_handler, args=(processed_chunk,) + target=self.logging_obj.success_handler, + args=(processed_chunk, None, None, cache_hit), ).start() # log response asyncio.create_task( self.logging_obj.async_success_handler( - processed_chunk, + processed_chunk, cache_hit=cache_hit ) ) return processed_chunk @@ -10295,11 +10312,12 @@ class CustomStreamWrapper: response.usage = complete_streaming_response.usage ## LOGGING threading.Thread( - target=self.logging_obj.success_handler, args=(response,) + target=self.logging_obj.success_handler, + args=(response, None, None, cache_hit), ).start() # log response asyncio.create_task( self.logging_obj.async_success_handler( - response, + response, cache_hit=cache_hit ) ) self.sent_stream_usage = True @@ -10310,11 +10328,12 @@ class CustomStreamWrapper: processed_chunk = self.finish_reason_handler() ## LOGGING threading.Thread( - target=self.logging_obj.success_handler, args=(processed_chunk,) + target=self.logging_obj.success_handler, + args=(processed_chunk, None, None, cache_hit), ).start() # log response asyncio.create_task( self.logging_obj.async_success_handler( - processed_chunk, + processed_chunk, cache_hit=cache_hit ) ) return processed_chunk