From 068aafdff9c676a4ad560b482047f6d65c70f04e Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Sat, 24 Aug 2024 12:30:30 -0700 Subject: [PATCH 01/10] fix(utils.py): correctly re-raise the headers from an exception, if present Fixes issue where retry after on router was not using azure / openai numbers --- .pre-commit-config.yaml | 12 ++-- litellm/llms/openai.py | 32 +++++---- litellm/router.py | 26 ++++++-- litellm/tests/test_router.py | 126 +++++++++++++++++++++++++++++++++++ litellm/types/router.py | 16 +++++ litellm/utils.py | 49 ++++++++++++-- 6 files changed, 228 insertions(+), 33 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a33473b72..d429bc6b8 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,12 +1,12 @@ repos: - repo: local hooks: - - id: mypy - name: mypy - entry: python3 -m mypy --ignore-missing-imports - language: system - types: [python] - files: ^litellm/ + # - id: mypy + # name: mypy + # entry: python3 -m mypy --ignore-missing-imports + # language: system + # types: [python] + # files: ^litellm/ - id: isort name: isort entry: isort diff --git a/litellm/llms/openai.py b/litellm/llms/openai.py index ada5f4ca3..9b33f3cac 100644 --- a/litellm/llms/openai.py +++ b/litellm/llms/openai.py @@ -50,9 +50,11 @@ class OpenAIError(Exception): message, request: Optional[httpx.Request] = None, response: Optional[httpx.Response] = None, + headers: Optional[httpx.Headers] = None, ): self.status_code = status_code self.message = message + self.headers = headers if request: self.request = request else: @@ -113,7 +115,7 @@ class MistralConfig: random_seed: Optional[int] = None, safe_prompt: Optional[bool] = None, response_format: Optional[dict] = None, - stop: Optional[Union[str, list]] = None + stop: Optional[Union[str, list]] = None, ) -> None: locals_ = locals().copy() for key, value in locals_.items(): @@ -172,7 +174,7 @@ class MistralConfig: if param == "top_p": optional_params["top_p"] = value if param == "stop": - optional_params["stop"] = value + optional_params["stop"] = value if param == "tool_choice" and isinstance(value, str): optional_params["tool_choice"] = self._map_tool_choice( tool_choice=value @@ -1313,17 +1315,13 @@ class OpenAIChatCompletion(BaseLLM): - call embeddings.create by default """ try: - if litellm.return_response_headers is True: - raw_response = openai_client.embeddings.with_raw_response.create( - **data, timeout=timeout - ) # type: ignore + raw_response = openai_client.embeddings.with_raw_response.create( + **data, timeout=timeout + ) # type: ignore - headers = dict(raw_response.headers) - response = raw_response.parse() - return headers, response - else: - response = openai_client.embeddings.create(**data, timeout=timeout) # type: ignore - return None, response + headers = dict(raw_response.headers) + response = raw_response.parse() + return headers, response except Exception as e: raise e @@ -1448,13 +1446,13 @@ class OpenAIChatCompletion(BaseLLM): response_type="embedding", ) # type: ignore except OpenAIError as e: - exception_mapping_worked = True raise e except Exception as e: - if hasattr(e, "status_code"): - raise OpenAIError(status_code=e.status_code, message=str(e)) - else: - raise OpenAIError(status_code=500, message=str(e)) + status_code = getattr(e, "status_code", 500) + error_headers = getattr(e, "headers", None) + raise OpenAIError( + status_code=status_code, message=str(e), headers=error_headers + ) async def aimage_generation( self, diff --git a/litellm/router.py b/litellm/router.py index 6ca5e4d56..48cd4427d 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -90,6 +90,7 @@ from litellm.types.router import ( RetryPolicy, RouterErrors, RouterGeneralSettings, + RouterRateLimitError, updateDeployment, updateLiteLLMParams, ) @@ -1939,6 +1940,7 @@ class Router: raise e def _embedding(self, input: Union[str, List], model: str, **kwargs): + model_name = None try: verbose_router_logger.debug( f"Inside embedding()- model: {model}; kwargs: {kwargs}" @@ -2813,19 +2815,27 @@ class Router: ): return 0 + response_headers: Optional[httpx.Headers] = None if hasattr(e, "response") and hasattr(e.response, "headers"): + response_headers = e.response.headers + elif hasattr(e, "litellm_response_headers"): + response_headers = e.litellm_response_headers + + if response_headers is not None: timeout = litellm._calculate_retry_after( remaining_retries=remaining_retries, max_retries=num_retries, - response_headers=e.response.headers, + response_headers=response_headers, min_timeout=self.retry_after, ) + else: timeout = litellm._calculate_retry_after( remaining_retries=remaining_retries, max_retries=num_retries, min_timeout=self.retry_after, ) + return timeout def function_with_retries(self, *args, **kwargs): @@ -2997,8 +3007,9 @@ class Router: metadata = kwargs.get("litellm_params", {}).get("metadata", None) _model_info = kwargs.get("litellm_params", {}).get("model_info", {}) - exception_response = getattr(exception, "response", {}) - exception_headers = getattr(exception_response, "headers", None) + exception_headers = litellm.utils._get_litellm_response_headers( + original_exception=exception + ) _time_to_cooldown = kwargs.get("litellm_params", {}).get( "cooldown_time", self.cooldown_time ) @@ -4744,8 +4755,13 @@ class Router: ) if len(healthy_deployments) == 0: - raise ValueError( - f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}. pre-call-checks={self.enable_pre_call_checks}, cooldown_list={self._get_cooldown_deployments()}" + _cooldown_time = self.cooldown_time # [TODO] Make dynamic + _cooldown_list = self._get_cooldown_deployments() + raise RouterRateLimitError( + model=model, + cooldown_time=_cooldown_time, + enable_pre_call_checks=self.enable_pre_call_checks, + cooldown_list=_cooldown_list, ) if self.routing_strategy == "least-busy" and self.leastbusy_logger is not None: diff --git a/litellm/tests/test_router.py b/litellm/tests/test_router.py index 3c374df87..2bf4f55b9 100644 --- a/litellm/tests/test_router.py +++ b/litellm/tests/test_router.py @@ -10,6 +10,9 @@ import traceback import openai import pytest +import litellm.types +import litellm.types.router + sys.path.insert( 0, os.path.abspath("../..") ) # Adds the parent directory to the system path @@ -2184,3 +2187,126 @@ def test_router_correctly_reraise_error(): ) except litellm.RateLimitError: pass + + +def test_router_dynamic_cooldown_correct_retry_after_time(): + """ + User feedback: litellm says "No deployments available for selected model, Try again in 60 seconds" + but Azure says to retry in at most 9s + + ``` + {"message": "litellm.proxy.proxy_server.embeddings(): Exception occured - No deployments available for selected model, Try again in 60 seconds. Passed model=text-embedding-ada-002. pre-call-checks=False, allowed_model_region=n/a, cooldown_list=[('b49cbc9314273db7181fe69b1b19993f04efb88f2c1819947c538bac08097e4c', {'Exception Received': 'litellm.RateLimitError: AzureException RateLimitError - Requests to the Embeddings_Create Operation under Azure OpenAI API version 2023-09-01-preview have exceeded call rate limit of your current OpenAI S0 pricing tier. Please retry after 9 seconds. Please go here: https://aka.ms/oai/quotaincrease if you would like to further increase the default rate limit.', 'Status Code': '429'})]", "level": "ERROR", "timestamp": "2024-08-22T03:25:36.900476"} + ``` + """ + router = Router( + model_list=[ + { + "model_name": "text-embedding-ada-002", + "litellm_params": { + "model": "openai/text-embedding-ada-002", + }, + } + ] + ) + + openai_client = openai.OpenAI(api_key="") + + cooldown_time = 30.0 + + def _return_exception(*args, **kwargs): + from fastapi import HTTPException + + raise HTTPException( + status_code=429, + detail="Rate Limited!", + headers={"retry-after": cooldown_time}, + ) + + with patch.object( + openai_client.embeddings.with_raw_response, + "create", + side_effect=_return_exception, + ): + new_retry_after_mock_client = MagicMock(return_value=-1) + + litellm.utils._get_retry_after_from_exception_header = ( + new_retry_after_mock_client + ) + + try: + router.embedding( + model="text-embedding-ada-002", + input="Hello world!", + client=openai_client, + ) + except litellm.RateLimitError: + pass + + new_retry_after_mock_client.assert_called() + print( + f"new_retry_after_mock_client.call_args.kwargs: {new_retry_after_mock_client.call_args.kwargs}" + ) + + response_headers: httpx.Headers = new_retry_after_mock_client.call_args.kwargs[ + "response_headers" + ] + assert "retry-after" in response_headers + assert response_headers["retry-after"] == cooldown_time + + +def test_router_dynamic_cooldown_message_retry_time(): + """ + User feedback: litellm says "No deployments available for selected model, Try again in 60 seconds" + but Azure says to retry in at most 9s + + ``` + {"message": "litellm.proxy.proxy_server.embeddings(): Exception occured - No deployments available for selected model, Try again in 60 seconds. Passed model=text-embedding-ada-002. pre-call-checks=False, allowed_model_region=n/a, cooldown_list=[('b49cbc9314273db7181fe69b1b19993f04efb88f2c1819947c538bac08097e4c', {'Exception Received': 'litellm.RateLimitError: AzureException RateLimitError - Requests to the Embeddings_Create Operation under Azure OpenAI API version 2023-09-01-preview have exceeded call rate limit of your current OpenAI S0 pricing tier. Please retry after 9 seconds. Please go here: https://aka.ms/oai/quotaincrease if you would like to further increase the default rate limit.', 'Status Code': '429'})]", "level": "ERROR", "timestamp": "2024-08-22T03:25:36.900476"} + ``` + """ + router = Router( + model_list=[ + { + "model_name": "text-embedding-ada-002", + "litellm_params": { + "model": "openai/text-embedding-ada-002", + }, + } + ] + ) + + openai_client = openai.OpenAI(api_key="") + + cooldown_time = 30.0 + + def _return_exception(*args, **kwargs): + from fastapi import HTTPException + + raise HTTPException( + status_code=429, + detail="Rate Limited!", + headers={"retry-after": cooldown_time}, + ) + + with patch.object( + openai_client.embeddings.with_raw_response, + "create", + side_effect=_return_exception, + ): + for _ in range(2): + try: + router.embedding( + model="text-embedding-ada-002", + input="Hello world!", + client=openai_client, + ) + except litellm.RateLimitError: + pass + + try: + router.embedding( + model="text-embedding-ada-002", + input="Hello world!", + client=openai_client, + ) + except litellm.types.router.RouterRateLimitError as e: + assert e.cooldown_time == cooldown_time diff --git a/litellm/types/router.py b/litellm/types/router.py index dda6968f0..e38fec075 100644 --- a/litellm/types/router.py +++ b/litellm/types/router.py @@ -549,3 +549,19 @@ class RouterGeneralSettings(BaseModel): pass_through_all_models: bool = Field( default=False ) # if passed a model not llm_router model list, pass through the request to litellm.acompletion/embedding + + +class RouterRateLimitError(ValueError): + def __init__( + self, + model: str, + cooldown_time: float, + enable_pre_call_checks: bool, + cooldown_list: List, + ): + self.model = model + self.cooldown_time = cooldown_time + self.enable_pre_call_checks = enable_pre_call_checks + self.cooldown_list = cooldown_list + _message = f"{RouterErrors.no_deployments_available.value}, Try again in {cooldown_time} seconds. Passed model={model}. pre-call-checks={enable_pre_call_checks}, cooldown_list={cooldown_list}" + super().__init__(_message) diff --git a/litellm/utils.py b/litellm/utils.py index af6025845..1c974b86b 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -6339,6 +6339,7 @@ def _get_retry_after_from_exception_header( retry_after = int(retry_date - time.time()) else: retry_after = -1 + return retry_after except Exception as e: @@ -6520,6 +6521,40 @@ def get_model_list(): ####### EXCEPTION MAPPING ################ +def _get_litellm_response_headers( + original_exception: Exception, +) -> Optional[httpx.Headers]: + """ + Extract and return the response headers from a mapped exception, if present. + + Used for accurate retry logic. + """ + _response_headers: Optional[httpx.Headers] = None + try: + _response_headers = getattr( + original_exception, "litellm_response_headers", None + ) + except Exception: + return None + + return _response_headers + + +def _get_response_headers(original_exception: Exception) -> Optional[httpx.Headers]: + """ + Extract and return the response headers from an exception, if present. + + Used for accurate retry logic. + """ + _response_headers: Optional[httpx.Headers] = None + try: + _response_headers = getattr(original_exception, "headers", None) + except Exception: + return None + + return _response_headers + + def exception_type( model, original_exception, @@ -6544,6 +6579,10 @@ def exception_type( "LiteLLM.Info: If you need to debug this error, use `litellm.set_verbose=True'." # noqa ) # noqa print() # noqa + + litellm_response_headers = _get_response_headers( + original_exception=original_exception + ) try: if model: if hasattr(original_exception, "message"): @@ -8422,20 +8461,20 @@ def exception_type( threading.Thread(target=get_all_keys, args=(e.llm_provider,)).start() # don't let an error with mapping interrupt the user from receiving an error from the llm api calls if exception_mapping_worked: + setattr(e, "litellm_response_headers", litellm_response_headers) raise e else: for error_type in litellm.LITELLM_EXCEPTION_TYPES: if isinstance(e, error_type): + setattr(e, "litellm_response_headers", litellm_response_headers) raise e # it's already mapped - raise APIConnectionError( + raised_exc = APIConnectionError( message="{}\n{}".format(original_exception, traceback.format_exc()), llm_provider="", model="", - request=httpx.Request( - method="POST", - url="https://www.litellm.ai/", - ), ) + setattr(raised_exc, "litellm_response_headers", _response_headers) + raise raised_exc ######### Secret Manager ############################ From de2373d52bc6714ca46706871173dd58e0f74c69 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Sat, 24 Aug 2024 12:55:15 -0700 Subject: [PATCH 02/10] fix(openai.py): coverage for correctly re-raising exception headers on openai chat completion + embedding endpoints --- litellm/llms/openai.py | 64 +++++++++-------- litellm/tests/test_exceptions.py | 118 +++++++++++++++++++++++++++++++ litellm/tests/test_router.py | 4 +- 3 files changed, 153 insertions(+), 33 deletions(-) diff --git a/litellm/llms/openai.py b/litellm/llms/openai.py index 9b33f3cac..41ba526ac 100644 --- a/litellm/llms/openai.py +++ b/litellm/llms/openai.py @@ -786,8 +786,14 @@ class OpenAIChatCompletion(BaseLLM): headers = dict(raw_response.headers) response = raw_response.parse() return headers, response - except Exception as e: + except OpenAIError as e: raise e + except Exception as e: + status_code = getattr(e, "status_code", 500) + error_headers = getattr(e, "headers", None) + raise OpenAIError( + status_code=status_code, message=str(e), headers=error_headers + ) def make_sync_openai_chat_completion_request( self, @@ -801,21 +807,21 @@ class OpenAIChatCompletion(BaseLLM): - call chat.completions.create by default """ try: - if litellm.return_response_headers is True: - raw_response = openai_client.chat.completions.with_raw_response.create( - **data, timeout=timeout - ) + raw_response = openai_client.chat.completions.with_raw_response.create( + **data, timeout=timeout + ) - headers = dict(raw_response.headers) - response = raw_response.parse() - return headers, response - else: - response = openai_client.chat.completions.create( - **data, timeout=timeout - ) - return None, response - except Exception as e: + headers = dict(raw_response.headers) + response = raw_response.parse() + return headers, response + except OpenAIError as e: raise e + except Exception as e: + status_code = getattr(e, "status_code", 500) + error_headers = getattr(e, "headers", None) + raise OpenAIError( + status_code=status_code, message=str(e), headers=error_headers + ) def completion( self, @@ -1290,16 +1296,12 @@ class OpenAIChatCompletion(BaseLLM): - call embeddings.create by default """ try: - if litellm.return_response_headers is True: - raw_response = await openai_aclient.embeddings.with_raw_response.create( - **data, timeout=timeout - ) # type: ignore - headers = dict(raw_response.headers) - response = raw_response.parse() - return headers, response - else: - response = await openai_aclient.embeddings.create(**data, timeout=timeout) # type: ignore - return None, response + raw_response = await openai_aclient.embeddings.with_raw_response.create( + **data, timeout=timeout + ) # type: ignore + headers = dict(raw_response.headers) + response = raw_response.parse() + return headers, response except Exception as e: raise e @@ -1365,14 +1367,14 @@ class OpenAIChatCompletion(BaseLLM): response_type="embedding", _response_headers=headers, ) # type: ignore - except Exception as e: - ## LOGGING - logging_obj.post_call( - input=input, - api_key=api_key, - original_response=str(e), - ) + except OpenAIError as e: raise e + except Exception as e: + status_code = getattr(e, "status_code", 500) + error_headers = getattr(e, "headers", None) + raise OpenAIError( + status_code=status_code, message=str(e), headers=error_headers + ) def embedding( self, diff --git a/litellm/tests/test_exceptions.py b/litellm/tests/test_exceptions.py index 806e19569..a53e8f3a9 100644 --- a/litellm/tests/test_exceptions.py +++ b/litellm/tests/test_exceptions.py @@ -839,3 +839,121 @@ def test_anthropic_tool_calling_exception(): ) except litellm.BadRequestError: pass + + +from typing import Optional, Union + +from openai import AsyncOpenAI, OpenAI + + +def _pre_call_utils( + call_type: str, + data: dict, + client: Union[OpenAI, AsyncOpenAI], + sync_mode: bool, + streaming: Optional[bool], +): + if call_type == "embedding": + data["input"] = "Hello world!" + mapped_target = client.embeddings.with_raw_response + if sync_mode: + original_function = litellm.embedding + else: + original_function = litellm.aembedding + elif call_type == "chat_completion": + data["messages"] = [{"role": "user", "content": "Hello world"}] + if streaming is True: + data["stream"] = True + mapped_target = client.chat.completions.with_raw_response + if sync_mode: + original_function = litellm.completion + else: + original_function = litellm.acompletion + + return data, original_function, mapped_target + + +@pytest.mark.parametrize( + "sync_mode", + [True, False], +) +@pytest.mark.parametrize( + "model, call_type, streaming", + [ + ("text-embedding-ada-002", "embedding", None), + ("gpt-3.5-turbo", "chat_completion", False), + ("gpt-3.5-turbo", "chat_completion", True), + ], +) +@pytest.mark.asyncio +async def test_exception_with_headers(sync_mode, model, call_type, streaming): + """ + User feedback: litellm says "No deployments available for selected model, Try again in 60 seconds" + but Azure says to retry in at most 9s + + ``` + {"message": "litellm.proxy.proxy_server.embeddings(): Exception occured - No deployments available for selected model, Try again in 60 seconds. Passed model=text-embedding-ada-002. pre-call-checks=False, allowed_model_region=n/a, cooldown_list=[('b49cbc9314273db7181fe69b1b19993f04efb88f2c1819947c538bac08097e4c', {'Exception Received': 'litellm.RateLimitError: AzureException RateLimitError - Requests to the Embeddings_Create Operation under Azure OpenAI API version 2023-09-01-preview have exceeded call rate limit of your current OpenAI S0 pricing tier. Please retry after 9 seconds. Please go here: https://aka.ms/oai/quotaincrease if you would like to further increase the default rate limit.', 'Status Code': '429'})]", "level": "ERROR", "timestamp": "2024-08-22T03:25:36.900476"} + ``` + """ + import openai + + if sync_mode: + openai_client = openai.OpenAI(api_key="") + else: + openai_client = openai.AsyncOpenAI(api_key="") + + data = {"model": model} + data, original_function, mapped_target = _pre_call_utils( + call_type=call_type, + data=data, + client=openai_client, + sync_mode=sync_mode, + streaming=streaming, + ) + + cooldown_time = 30.0 + + def _return_exception(*args, **kwargs): + from fastapi import HTTPException + + raise HTTPException( + status_code=429, + detail="Rate Limited!", + headers={"retry-after": cooldown_time}, # type: ignore + ) + + with patch.object( + mapped_target, + "create", + side_effect=_return_exception, + ): + new_retry_after_mock_client = MagicMock(return_value=-1) + + litellm.utils._get_retry_after_from_exception_header = ( + new_retry_after_mock_client + ) + + try: + if sync_mode: + resp = original_function( + model="text-embedding-ada-002", + input="Hello world!", + client=openai_client, + ) + if streaming: + for chunk in resp: + continue + else: + resp = await original_function( + model="text-embedding-ada-002", + input="Hello world!", + client=openai_client, + ) + + if streaming: + async for chunk in resp: + continue + + except litellm.RateLimitError as e: + assert e.litellm_response_headers is not None + assert e.litellm_response_headers["retry-after"] == cooldown_time diff --git a/litellm/tests/test_router.py b/litellm/tests/test_router.py index 2bf4f55b9..6e58a04f9 100644 --- a/litellm/tests/test_router.py +++ b/litellm/tests/test_router.py @@ -2189,7 +2189,7 @@ def test_router_correctly_reraise_error(): pass -def test_router_dynamic_cooldown_correct_retry_after_time(): +def test_router_dynamic_cooldown_correct_retry_after_time(sync_mode): """ User feedback: litellm says "No deployments available for selected model, Try again in 60 seconds" but Azure says to retry in at most 9s @@ -2219,7 +2219,7 @@ def test_router_dynamic_cooldown_correct_retry_after_time(): raise HTTPException( status_code=429, detail="Rate Limited!", - headers={"retry-after": cooldown_time}, + headers={"retry-after": cooldown_time}, # type: ignore ) with patch.object( From 87549a2391f1d4fde8cbb633d326eca6f43b5a0b Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Sat, 24 Aug 2024 13:25:17 -0700 Subject: [PATCH 03/10] fix(main.py): cover openai /v1/completions endpoint --- litellm/llms/openai.py | 21 ++++++++++++++++----- litellm/main.py | 19 ++++++++++++++----- litellm/tests/test_exceptions.py | 28 ++++++++++++++++++---------- litellm/utils.py | 25 +++++++++++++++++++------ 4 files changed, 67 insertions(+), 26 deletions(-) diff --git a/litellm/llms/openai.py b/litellm/llms/openai.py index 41ba526ac..1ad870537 100644 --- a/litellm/llms/openai.py +++ b/litellm/llms/openai.py @@ -1268,6 +1268,8 @@ class OpenAIChatCompletion(BaseLLM): except ( Exception ) as e: # need to exception handle here. async exceptions don't get caught in sync functions. + if isinstance(e, OpenAIError): + raise e if response is not None and hasattr(response, "text"): raise OpenAIError( status_code=500, @@ -1975,7 +1977,7 @@ class OpenAITextCompletion(BaseLLM): "complete_input_dict": data, }, ) - if acompletion == True: + if acompletion is True: if optional_params.get("stream", False): return self.async_streaming( logging_obj=logging_obj, @@ -2019,7 +2021,7 @@ class OpenAITextCompletion(BaseLLM): else: openai_client = client - response = openai_client.completions.create(**data) # type: ignore + response = openai_client.completions.with_raw_response.create(**data) # type: ignore response_json = response.model_dump() @@ -2067,7 +2069,7 @@ class OpenAITextCompletion(BaseLLM): else: openai_aclient = client - response = await openai_aclient.completions.create(**data) + response = await openai_aclient.completions.with_raw_response.create(**data) response_json = response.model_dump() ## LOGGING logging_obj.post_call( @@ -2100,6 +2102,7 @@ class OpenAITextCompletion(BaseLLM): client=None, organization=None, ): + if client is None: openai_client = OpenAI( api_key=api_key, @@ -2111,7 +2114,15 @@ class OpenAITextCompletion(BaseLLM): ) else: openai_client = client - response = openai_client.completions.create(**data) + + try: + response = openai_client.completions.with_raw_response.create(**data) + except Exception as e: + status_code = getattr(e, "status_code", 500) + error_headers = getattr(e, "headers", None) + raise OpenAIError( + status_code=status_code, message=str(e), headers=error_headers + ) streamwrapper = CustomStreamWrapper( completion_stream=response, model=model, @@ -2149,7 +2160,7 @@ class OpenAITextCompletion(BaseLLM): else: openai_client = client - response = await openai_client.completions.create(**data) + response = await openai_client.completions.with_raw_response.create(**data) streamwrapper = CustomStreamWrapper( completion_stream=response, diff --git a/litellm/main.py b/litellm/main.py index 649fc12a5..9ef49be07 100644 --- a/litellm/main.py +++ b/litellm/main.py @@ -445,7 +445,12 @@ async def _async_streaming(response, model, custom_llm_provider, args): print_verbose(f"line in async streaming: {line}") yield line except Exception as e: - raise e + custom_llm_provider = custom_llm_provider or "openai" + raise exception_type( + model=model, + custom_llm_provider=custom_llm_provider, + original_exception=e, + ) def mock_completion( @@ -3736,7 +3741,7 @@ async def atext_completion( else: # Call the synchronous function using run_in_executor response = await loop.run_in_executor(None, func_with_context) - if kwargs.get("stream", False) == True: # return an async generator + if kwargs.get("stream", False) is True: # return an async generator return TextCompletionStreamWrapper( completion_stream=_async_streaming( response=response, @@ -3745,6 +3750,7 @@ async def atext_completion( args=args, ), model=model, + custom_llm_provider=custom_llm_provider, ) else: transformed_logprobs = None @@ -4018,11 +4024,14 @@ def text_completion( **kwargs, **optional_params, ) - if kwargs.get("acompletion", False) == True: + if kwargs.get("acompletion", False) is True: return response - if stream == True or kwargs.get("stream", False) == True: + if stream is True or kwargs.get("stream", False) is True: response = TextCompletionStreamWrapper( - completion_stream=response, model=model, stream_options=stream_options + completion_stream=response, + model=model, + stream_options=stream_options, + custom_llm_provider=custom_llm_provider, ) return response transformed_logprobs = None diff --git a/litellm/tests/test_exceptions.py b/litellm/tests/test_exceptions.py index a53e8f3a9..ff5c4352f 100644 --- a/litellm/tests/test_exceptions.py +++ b/litellm/tests/test_exceptions.py @@ -869,6 +869,15 @@ def _pre_call_utils( original_function = litellm.completion else: original_function = litellm.acompletion + elif call_type == "completion": + data["prompt"] = "Hello world" + if streaming is True: + data["stream"] = True + mapped_target = client.completions.with_raw_response + if sync_mode: + original_function = litellm.text_completion + else: + original_function = litellm.atext_completion return data, original_function, mapped_target @@ -883,6 +892,7 @@ def _pre_call_utils( ("text-embedding-ada-002", "embedding", None), ("gpt-3.5-turbo", "chat_completion", False), ("gpt-3.5-turbo", "chat_completion", True), + ("gpt-3.5-turbo-instruct", "completion", True), ], ) @pytest.mark.asyncio @@ -933,27 +943,25 @@ async def test_exception_with_headers(sync_mode, model, call_type, streaming): new_retry_after_mock_client ) + exception_raised = False try: if sync_mode: - resp = original_function( - model="text-embedding-ada-002", - input="Hello world!", - client=openai_client, - ) + resp = original_function(**data, client=openai_client) if streaming: for chunk in resp: continue else: - resp = await original_function( - model="text-embedding-ada-002", - input="Hello world!", - client=openai_client, - ) + resp = await original_function(**data, client=openai_client) if streaming: async for chunk in resp: continue except litellm.RateLimitError as e: + exception_raised = True assert e.litellm_response_headers is not None assert e.litellm_response_headers["retry-after"] == cooldown_time + + if exception_raised is False: + print(resp) + assert exception_raised diff --git a/litellm/utils.py b/litellm/utils.py index 1c974b86b..d1582cb94 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -6833,7 +6833,7 @@ def exception_type( message=f"{exception_provider} - {message}", model=model, llm_provider=custom_llm_provider, - response=original_exception.response, + response=getattr(original_exception, "response", None), litellm_debug_info=extra_information, ) elif original_exception.status_code == 429: @@ -6842,7 +6842,7 @@ def exception_type( message=f"RateLimitError: {exception_provider} - {message}", model=model, llm_provider=custom_llm_provider, - response=original_exception.response, + response=getattr(original_exception, "response", None), litellm_debug_info=extra_information, ) elif original_exception.status_code == 503: @@ -6851,7 +6851,7 @@ def exception_type( message=f"ServiceUnavailableError: {exception_provider} - {message}", model=model, llm_provider=custom_llm_provider, - response=original_exception.response, + response=getattr(original_exception, "response", None), litellm_debug_info=extra_information, ) elif original_exception.status_code == 504: # gateway timeout error @@ -6869,7 +6869,7 @@ def exception_type( message=f"APIError: {exception_provider} - {message}", llm_provider=custom_llm_provider, model=model, - request=original_exception.request, + request=getattr(original_exception, "request", None), litellm_debug_info=extra_information, ) else: @@ -10882,10 +10882,17 @@ class CustomStreamWrapper: class TextCompletionStreamWrapper: - def __init__(self, completion_stream, model, stream_options: Optional[dict] = None): + def __init__( + self, + completion_stream, + model, + stream_options: Optional[dict] = None, + custom_llm_provider: Optional[str] = None, + ): self.completion_stream = completion_stream self.model = model self.stream_options = stream_options + self.custom_llm_provider = custom_llm_provider def __iter__(self): return self @@ -10936,7 +10943,13 @@ class TextCompletionStreamWrapper: except StopIteration: raise StopIteration except Exception as e: - print(f"got exception {e}") # noqa + raise exception_type( + model=self.model, + custom_llm_provider=self.custom_llm_provider or "", + original_exception=e, + completion_kwargs={}, + extra_kwargs={}, + ) async def __anext__(self): try: From 756a828c154e968faebbeed8b1530ed2297eb4b1 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Sat, 24 Aug 2024 15:12:51 -0700 Subject: [PATCH 04/10] fix(azure.py): add response header coverage for azure models --- litellm/llms/azure.py | 39 +++++++++++++++++--------------- litellm/llms/azure_text.py | 38 +++++++++++++++++++------------ litellm/tests/test_exceptions.py | 25 +++++++++++++------- litellm/utils.py | 2 +- 4 files changed, 62 insertions(+), 42 deletions(-) diff --git a/litellm/llms/azure.py b/litellm/llms/azure.py index 400e23ad0..57bc6f854 100644 --- a/litellm/llms/azure.py +++ b/litellm/llms/azure.py @@ -75,9 +75,11 @@ class AzureOpenAIError(Exception): message, request: Optional[httpx.Request] = None, response: Optional[httpx.Response] = None, + headers: Optional[httpx.Headers] = None, ): self.status_code = status_code self.message = message + self.headers = headers if request: self.request = request else: @@ -593,7 +595,6 @@ class AzureChatCompletion(BaseLLM): client=None, ): super().completion() - exception_mapping_worked = False try: if model is None or messages is None: raise AzureOpenAIError( @@ -755,13 +756,13 @@ class AzureChatCompletion(BaseLLM): convert_tool_call_to_json_mode=json_mode, ) except AzureOpenAIError as e: - exception_mapping_worked = True raise e except Exception as e: - if hasattr(e, "status_code"): - raise AzureOpenAIError(status_code=e.status_code, message=str(e)) - else: - raise AzureOpenAIError(status_code=500, message=str(e)) + status_code = getattr(e, "status_code", 500) + error_headers = getattr(e, "headers", None) + raise AzureOpenAIError( + status_code=status_code, message=str(e), headers=error_headers + ) async def acompletion( self, @@ -1005,10 +1006,11 @@ class AzureChatCompletion(BaseLLM): ) return streamwrapper ## DO NOT make this into an async for ... loop, it will yield an async generator, which won't raise errors if the response fails except Exception as e: - if hasattr(e, "status_code"): - raise AzureOpenAIError(status_code=e.status_code, message=str(e)) - else: - raise AzureOpenAIError(status_code=500, message=str(e)) + status_code = getattr(e, "status_code", 500) + error_headers = getattr(e, "headers", None) + raise AzureOpenAIError( + status_code=status_code, message=str(e), headers=error_headers + ) async def aembedding( self, @@ -1027,7 +1029,9 @@ class AzureChatCompletion(BaseLLM): openai_aclient = AsyncAzureOpenAI(**azure_client_params) else: openai_aclient = client - response = await openai_aclient.embeddings.create(**data, timeout=timeout) + response = await openai_aclient.embeddings.with_raw_response.create( + **data, timeout=timeout + ) stringified_response = response.model_dump() ## LOGGING logging_obj.post_call( @@ -1067,7 +1071,6 @@ class AzureChatCompletion(BaseLLM): aembedding=None, ): super().embedding() - exception_mapping_worked = False if self._client_session is None: self._client_session = self.create_client_session() try: @@ -1127,7 +1130,7 @@ class AzureChatCompletion(BaseLLM): else: azure_client = client ## COMPLETION CALL - response = azure_client.embeddings.create(**data, timeout=timeout) # type: ignore + response = azure_client.embeddings.with_raw_response.create(**data, timeout=timeout) # type: ignore ## LOGGING logging_obj.post_call( input=input, @@ -1138,13 +1141,13 @@ class AzureChatCompletion(BaseLLM): return convert_to_model_response_object(response_object=response.model_dump(), model_response_object=model_response, response_type="embedding") # type: ignore except AzureOpenAIError as e: - exception_mapping_worked = True raise e except Exception as e: - if hasattr(e, "status_code"): - raise AzureOpenAIError(status_code=e.status_code, message=str(e)) - else: - raise AzureOpenAIError(status_code=500, message=str(e)) + status_code = getattr(e, "status_code", 500) + error_headers = getattr(e, "headers", None) + raise AzureOpenAIError( + status_code=status_code, message=str(e), headers=error_headers + ) async def make_async_azure_httpx_request( self, diff --git a/litellm/llms/azure_text.py b/litellm/llms/azure_text.py index 72d6f134b..d8d7e9d14 100644 --- a/litellm/llms/azure_text.py +++ b/litellm/llms/azure_text.py @@ -33,9 +33,11 @@ class AzureOpenAIError(Exception): message, request: Optional[httpx.Request] = None, response: Optional[httpx.Response] = None, + headers: Optional[httpx.Headers] = None, ): self.status_code = status_code self.message = message + self.headers = headers if request: self.request = request else: @@ -311,13 +313,13 @@ class AzureTextCompletion(BaseLLM): ) ) except AzureOpenAIError as e: - exception_mapping_worked = True raise e except Exception as e: - if hasattr(e, "status_code"): - raise AzureOpenAIError(status_code=e.status_code, message=str(e)) - else: - raise AzureOpenAIError(status_code=500, message=str(e)) + status_code = getattr(e, "status_code", 500) + error_headers = getattr(e, "headers", None) + raise AzureOpenAIError( + status_code=status_code, message=str(e), headers=error_headers + ) async def acompletion( self, @@ -387,10 +389,11 @@ class AzureTextCompletion(BaseLLM): exception_mapping_worked = True raise e except Exception as e: - if hasattr(e, "status_code"): - raise e - else: - raise AzureOpenAIError(status_code=500, message=str(e)) + status_code = getattr(e, "status_code", 500) + error_headers = getattr(e, "headers", None) + raise AzureOpenAIError( + status_code=status_code, message=str(e), headers=error_headers + ) def streaming( self, @@ -443,7 +446,9 @@ class AzureTextCompletion(BaseLLM): "complete_input_dict": data, }, ) - response = azure_client.completions.create(**data, timeout=timeout) + response = azure_client.completions.with_raw_response.create( + **data, timeout=timeout + ) streamwrapper = CustomStreamWrapper( completion_stream=response, model=model, @@ -501,7 +506,9 @@ class AzureTextCompletion(BaseLLM): "complete_input_dict": data, }, ) - response = await azure_client.completions.create(**data, timeout=timeout) + response = await azure_client.completions.with_raw_response.create( + **data, timeout=timeout + ) # return response streamwrapper = CustomStreamWrapper( completion_stream=response, @@ -511,7 +518,8 @@ class AzureTextCompletion(BaseLLM): ) return streamwrapper ## DO NOT make this into an async for ... loop, it will yield an async generator, which won't raise errors if the response fails except Exception as e: - if hasattr(e, "status_code"): - raise AzureOpenAIError(status_code=e.status_code, message=str(e)) - else: - raise AzureOpenAIError(status_code=500, message=str(e)) + status_code = getattr(e, "status_code", 500) + error_headers = getattr(e, "headers", None) + raise AzureOpenAIError( + status_code=status_code, message=str(e), headers=error_headers + ) diff --git a/litellm/tests/test_exceptions.py b/litellm/tests/test_exceptions.py index ff5c4352f..fbc1dd047 100644 --- a/litellm/tests/test_exceptions.py +++ b/litellm/tests/test_exceptions.py @@ -887,16 +887,19 @@ def _pre_call_utils( [True, False], ) @pytest.mark.parametrize( - "model, call_type, streaming", + "provider, model, call_type, streaming", [ - ("text-embedding-ada-002", "embedding", None), - ("gpt-3.5-turbo", "chat_completion", False), - ("gpt-3.5-turbo", "chat_completion", True), - ("gpt-3.5-turbo-instruct", "completion", True), + ("openai", "text-embedding-ada-002", "embedding", None), + ("openai", "gpt-3.5-turbo", "chat_completion", False), + ("openai", "gpt-3.5-turbo", "chat_completion", True), + ("openai", "gpt-3.5-turbo-instruct", "completion", True), + ("azure", "azure/chatgpt-v-2", "chat_completion", True), + ("azure", "azure/text-embedding-ada-002", "embedding", True), + ("azure", "azure_text/gpt-3.5-turbo-instruct", "completion", True), ], ) @pytest.mark.asyncio -async def test_exception_with_headers(sync_mode, model, call_type, streaming): +async def test_exception_with_headers(sync_mode, provider, model, call_type, streaming): """ User feedback: litellm says "No deployments available for selected model, Try again in 60 seconds" but Azure says to retry in at most 9s @@ -908,9 +911,15 @@ async def test_exception_with_headers(sync_mode, model, call_type, streaming): import openai if sync_mode: - openai_client = openai.OpenAI(api_key="") + if provider == "openai": + openai_client = openai.OpenAI(api_key="") + elif provider == "azure": + openai_client = openai.AzureOpenAI(api_key="", base_url="") else: - openai_client = openai.AsyncOpenAI(api_key="") + if provider == "openai": + openai_client = openai.AsyncOpenAI(api_key="") + elif provider == "azure": + openai_client = openai.AsyncAzureOpenAI(api_key="", base_url="") data = {"model": model} data, original_function, mapped_target = _pre_call_utils( diff --git a/litellm/utils.py b/litellm/utils.py index d1582cb94..8ac094f59 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -8157,7 +8157,7 @@ def exception_type( model=model, request=original_exception.request, ) - elif custom_llm_provider == "azure": + elif custom_llm_provider == "azure" or custom_llm_provider == "azure_text": message = get_error_message(error_obj=original_exception) if message is None: if hasattr(original_exception, "message"): From 7beb0910c64484446d1d43db2e1f1aa1198722cc Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Sat, 24 Aug 2024 15:19:27 -0700 Subject: [PATCH 05/10] test(test_router.py): skip test - create separate pr to match retry after --- litellm/tests/test_router.py | 1 + 1 file changed, 1 insertion(+) diff --git a/litellm/tests/test_router.py b/litellm/tests/test_router.py index 6e58a04f9..b8307c39d 100644 --- a/litellm/tests/test_router.py +++ b/litellm/tests/test_router.py @@ -2254,6 +2254,7 @@ def test_router_dynamic_cooldown_correct_retry_after_time(sync_mode): assert response_headers["retry-after"] == cooldown_time +@pytest.mark.skip(reason="needs more work. fix in separate PR.") def test_router_dynamic_cooldown_message_retry_time(): """ User feedback: litellm says "No deployments available for selected model, Try again in 60 seconds" From 76834c6c595b832782848fe60db4fd08ba2051ab Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Sat, 24 Aug 2024 15:21:04 -0700 Subject: [PATCH 06/10] test(test_router.py): add test to ensure retry-after matches received value --- litellm/tests/test_router.py | 1 - 1 file changed, 1 deletion(-) diff --git a/litellm/tests/test_router.py b/litellm/tests/test_router.py index b8307c39d..6e58a04f9 100644 --- a/litellm/tests/test_router.py +++ b/litellm/tests/test_router.py @@ -2254,7 +2254,6 @@ def test_router_dynamic_cooldown_correct_retry_after_time(sync_mode): assert response_headers["retry-after"] == cooldown_time -@pytest.mark.skip(reason="needs more work. fix in separate PR.") def test_router_dynamic_cooldown_message_retry_time(): """ User feedback: litellm says "No deployments available for selected model, Try again in 60 seconds" From 33972cc79c93fd95e96a17b56a9831810376a76e Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Sat, 24 Aug 2024 16:59:30 -0700 Subject: [PATCH 07/10] fix(router.py): enable dynamic retry after in exception string Updates cooldown logic to cooldown individual models Closes https://github.com/BerriAI/litellm/issues/1339 --- litellm/router.py | 158 ++++++++++++------------- litellm/router_utils/cooldown_cache.py | 152 ++++++++++++++++++++++++ litellm/tests/test_router.py | 52 ++++++-- 3 files changed, 271 insertions(+), 91 deletions(-) create mode 100644 litellm/router_utils/cooldown_cache.py diff --git a/litellm/router.py b/litellm/router.py index 48cd4427d..a263d4cca 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -58,6 +58,7 @@ from litellm.router_utils.client_initalization_utils import ( set_client, should_initialize_sync_client, ) +from litellm.router_utils.cooldown_cache import CooldownCache from litellm.router_utils.cooldown_callbacks import router_cooldown_handler from litellm.router_utils.fallback_event_handlers import ( log_failure_fallback_event, @@ -338,6 +339,9 @@ class Router: else: self.allowed_fails = litellm.allowed_fails self.cooldown_time = cooldown_time or 60 + self.cooldown_cache = CooldownCache( + cache=self.cache, default_cooldown_time=self.cooldown_time + ) self.disable_cooldowns = disable_cooldowns self.failed_calls = ( InMemoryCache() @@ -3243,52 +3247,14 @@ class Router: if updated_fails > allowed_fails or _should_retry is False: # get the current cooldown list for that minute - cooldown_key = f"{current_minute}:cooldown_models" # group cooldown models by minute to reduce number of redis calls - cached_value = self.cache.get_cache( - key=cooldown_key - ) # [(deployment_id, {last_error_str, last_error_status_code})] - - cached_value_deployment_ids = [] - if ( - cached_value is not None - and isinstance(cached_value, list) - and len(cached_value) > 0 - and isinstance(cached_value[0], tuple) - ): - cached_value_deployment_ids = [cv[0] for cv in cached_value] verbose_router_logger.debug(f"adding {deployment} to cooldown models") # update value - if cached_value is not None and len(cached_value_deployment_ids) > 0: - if deployment in cached_value_deployment_ids: - pass - else: - cached_value = cached_value + [ - ( - deployment, - { - "Exception Received": str(original_exception), - "Status Code": str(exception_status), - }, - ) - ] - # save updated value - self.cache.set_cache( - value=cached_value, key=cooldown_key, ttl=cooldown_time - ) - else: - cached_value = [ - ( - deployment, - { - "Exception Received": str(original_exception), - "Status Code": str(exception_status), - }, - ) - ] - # save updated value - self.cache.set_cache( - value=cached_value, key=cooldown_key, ttl=cooldown_time - ) + self.cooldown_cache.add_deployment_to_cooldown( + model_id=deployment, + original_exception=original_exception, + exception_status=exception_status, + cooldown_time=cooldown_time, + ) # Trigger cooldown handler asyncio.create_task( @@ -3308,15 +3274,10 @@ class Router: """ Async implementation of '_get_cooldown_deployments' """ - dt = get_utc_datetime() - current_minute = dt.strftime("%H-%M") - # get the current cooldown list for that minute - cooldown_key = f"{current_minute}:cooldown_models" - - # ---------------------- - # Return cooldown models - # ---------------------- - cooldown_models = await self.cache.async_get_cache(key=cooldown_key) or [] + model_ids = self.get_model_ids() + cooldown_models = await self.cooldown_cache.async_get_active_cooldowns( + model_ids=model_ids + ) cached_value_deployment_ids = [] if ( @@ -3334,15 +3295,10 @@ class Router: """ Async implementation of '_get_cooldown_deployments' """ - dt = get_utc_datetime() - current_minute = dt.strftime("%H-%M") - # get the current cooldown list for that minute - cooldown_key = f"{current_minute}:cooldown_models" - - # ---------------------- - # Return cooldown models - # ---------------------- - cooldown_models = await self.cache.async_get_cache(key=cooldown_key) or [] + model_ids = self.get_model_ids() + cooldown_models = await self.cooldown_cache.async_get_active_cooldowns( + model_ids=model_ids + ) verbose_router_logger.debug(f"retrieve cooldown models: {cooldown_models}") return cooldown_models @@ -3351,15 +3307,13 @@ class Router: """ Get the list of models being cooled down for this minute """ - dt = get_utc_datetime() - current_minute = dt.strftime("%H-%M") # get the current cooldown list for that minute - cooldown_key = f"{current_minute}:cooldown_models" # ---------------------- # Return cooldown models # ---------------------- - cooldown_models = self.cache.get_cache(key=cooldown_key) or [] + model_ids = self.get_model_ids() + cooldown_models = self.cooldown_cache.get_active_cooldowns(model_ids=model_ids) cached_value_deployment_ids = [] if ( @@ -3370,7 +3324,6 @@ class Router: ): cached_value_deployment_ids = [cv[0] for cv in cooldown_models] - verbose_router_logger.debug(f"retrieve cooldown models: {cooldown_models}") return cached_value_deployment_ids def _get_healthy_deployments(self, model: str): @@ -4061,15 +4014,20 @@ class Router: rpm_usage += t return tpm_usage, rpm_usage - def get_model_ids(self) -> List[str]: + def get_model_ids(self, model_name: Optional[str] = None) -> List[str]: """ + if 'model_name' is none, returns all. + Returns list of model id's. """ ids = [] for model in self.model_list: if "model_info" in model and "id" in model["model_info"]: id = model["model_info"]["id"] - ids.append(id) + if model_name is not None and model["model_name"] == model_name: + ids.append(id) + elif model_name is None: + ids.append(id) return ids def get_model_names(self) -> List[str]: @@ -4402,10 +4360,19 @@ class Router: - First check for rate limit errors (if this is true, it means the model passed the context window check but failed the rate limit check) """ - if _rate_limit_error == True: # allow generic fallback logic to take place - raise ValueError( - f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}. Try again in {self.cooldown_time} seconds." + if _rate_limit_error is True: # allow generic fallback logic to take place + model_ids = self.get_model_ids(model_name=model) + cooldown_time = self.cooldown_cache.get_min_cooldown( + model_ids=model_ids ) + cooldown_list = self._get_cooldown_deployments() + raise RouterRateLimitError( + model=model, + cooldown_time=cooldown_time, + enable_pre_call_checks=True, + cooldown_list=cooldown_list, + ) + elif _context_window_error is True: raise litellm.ContextWindowExceededError( message="litellm._pre_call_checks: Context Window exceeded for given call. No models have context window large enough for this call.\n{}".format( @@ -4514,8 +4481,14 @@ class Router: litellm.print_verbose(f"initial list of deployments: {healthy_deployments}") if len(healthy_deployments) == 0: - raise ValueError( - f"No healthy deployment available, passed model={model}. Try again in {self.cooldown_time} seconds" + model_ids = self.get_model_ids(model_name=model) + _cooldown_time = self.cooldown_cache.get_min_cooldown(model_ids=model_ids) + _cooldown_list = self._get_cooldown_deployments() + raise RouterRateLimitError( + model=model, + cooldown_time=_cooldown_time, + enable_pre_call_checks=self.enable_pre_call_checks, + cooldown_list=_cooldown_list, ) if litellm.model_alias_map and model in litellm.model_alias_map: @@ -4602,8 +4575,16 @@ class Router: if len(healthy_deployments) == 0: if _allowed_model_region is None: _allowed_model_region = "n/a" - raise ValueError( - f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}. pre-call-checks={self.enable_pre_call_checks}, allowed_model_region={_allowed_model_region}, cooldown_list={await self._async_get_cooldown_deployments_with_debug_info()}" + model_ids = self.get_model_ids(model_name=model) + _cooldown_time = self.cooldown_cache.get_min_cooldown( + model_ids=model_ids + ) + _cooldown_list = self._get_cooldown_deployments() + raise RouterRateLimitError( + model=model, + cooldown_time=_cooldown_time, + enable_pre_call_checks=self.enable_pre_call_checks, + cooldown_list=_cooldown_list, ) if ( @@ -4682,8 +4663,16 @@ class Router: verbose_router_logger.info( f"get_available_deployment for model: {model}, No deployment available" ) - raise ValueError( - f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}" + model_ids = self.get_model_ids(model_name=model) + _cooldown_time = self.cooldown_cache.get_min_cooldown( + model_ids=model_ids + ) + _cooldown_list = self._get_cooldown_deployments() + raise RouterRateLimitError( + model=model, + cooldown_time=_cooldown_time, + enable_pre_call_checks=self.enable_pre_call_checks, + cooldown_list=_cooldown_list, ) verbose_router_logger.info( f"get_available_deployment for model: {model}, Selected deployment: {self.print_deployment(deployment)} for model: {model}" @@ -4755,7 +4744,8 @@ class Router: ) if len(healthy_deployments) == 0: - _cooldown_time = self.cooldown_time # [TODO] Make dynamic + model_ids = self.get_model_ids(model_name=model) + _cooldown_time = self.cooldown_cache.get_min_cooldown(model_ids=model_ids) _cooldown_list = self._get_cooldown_deployments() raise RouterRateLimitError( model=model, @@ -4841,8 +4831,14 @@ class Router: verbose_router_logger.info( f"get_available_deployment for model: {model}, No deployment available" ) - raise ValueError( - f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}" + model_ids = self.get_model_ids(model_name=model) + _cooldown_time = self.cooldown_cache.get_min_cooldown(model_ids=model_ids) + _cooldown_list = self._get_cooldown_deployments() + raise RouterRateLimitError( + model=model, + cooldown_time=_cooldown_time, + enable_pre_call_checks=self.enable_pre_call_checks, + cooldown_list=_cooldown_list, ) verbose_router_logger.info( f"get_available_deployment for model: {model}, Selected deployment: {self.print_deployment(deployment)} for model: {model}" diff --git a/litellm/router_utils/cooldown_cache.py b/litellm/router_utils/cooldown_cache.py new file mode 100644 index 000000000..50a23e530 --- /dev/null +++ b/litellm/router_utils/cooldown_cache.py @@ -0,0 +1,152 @@ +""" +Wrapper around router cache. Meant to handle model cooldown logic +""" + +import json +import time +from typing import List, Optional, Tuple, TypedDict + +from litellm import verbose_logger +from litellm.caching import DualCache + + +class CooldownCacheValue(TypedDict): + exception_received: str + status_code: str + timestamp: float + cooldown_time: float + + +class CooldownCache: + def __init__(self, cache: DualCache, default_cooldown_time: float): + self.cache = cache + self.default_cooldown_time = default_cooldown_time + + def _common_add_cooldown_logic( + self, model_id: str, original_exception, exception_status, cooldown_time: float + ) -> Tuple[str, dict]: + try: + current_time = time.time() + cooldown_key = f"deployment:{model_id}:cooldown" + + # Store the cooldown information for the deployment separately + cooldown_data = CooldownCacheValue( + exception_received=str(original_exception), + status_code=str(exception_status), + timestamp=current_time, + cooldown_time=cooldown_time, + ) + + return cooldown_key, cooldown_data + except Exception as e: + verbose_logger.error( + "CooldownCache::_common_add_cooldown_logic - Exception occurred - {}".format( + str(e) + ) + ) + raise e + + def add_deployment_to_cooldown( + self, + model_id: str, + original_exception: Exception, + exception_status: int, + cooldown_time: Optional[float], + ): + try: + _cooldown_time = cooldown_time or self.default_cooldown_time + cooldown_key, cooldown_data = self._common_add_cooldown_logic( + model_id=model_id, + original_exception=original_exception, + exception_status=exception_status, + cooldown_time=_cooldown_time, + ) + + # Set the cache with a TTL equal to the cooldown time + self.cache.set_cache( + value=cooldown_data, + key=cooldown_key, + ttl=_cooldown_time, + ) + except Exception as e: + verbose_logger.error( + "CooldownCache::add_deployment_to_cooldown - Exception occurred - {}".format( + str(e) + ) + ) + raise e + + async def async_add_deployment_to_cooldown( + self, + model_id: str, + original_exception: Exception, + exception_status: int, + cooldown_time: Optional[float], + ): + cooldown_key, cooldown_data = self._common_add_cooldown_logic( + model_id=model_id, original_exception=original_exception + ) + + # Set the cache with a TTL equal to the cooldown time + self.cache.set_cache( + value=cooldown_data, + key=cooldown_key, + ttl=cooldown_time or self.default_cooldown_time, + ) + + async def async_get_active_cooldowns( + self, model_ids: List[str] + ) -> List[Tuple[str, dict]]: + # Generate the keys for the deployments + keys = [f"deployment:{model_id}:cooldown" for model_id in model_ids] + + # Retrieve the values for the keys using mget + results = await self.cache.async_batch_get_cache(keys=keys) + + active_cooldowns = [] + # Process the results + for model_id, result in zip(model_ids, results): + if result: + active_cooldowns.append((model_id, result)) + + return active_cooldowns + + def get_active_cooldowns(self, model_ids: List[str]) -> List[Tuple[str, dict]]: + # Generate the keys for the deployments + keys = [f"deployment:{model_id}:cooldown" for model_id in model_ids] + + # Retrieve the values for the keys using mget + results = self.cache.batch_get_cache(keys=keys) + + active_cooldowns = [] + # Process the results + for model_id, result in zip(model_ids, results): + if result: + active_cooldowns.append((model_id, result)) + + return active_cooldowns + + def get_min_cooldown(self, model_ids: List[str]) -> float: + """Return min cooldown time required for a group of model id's.""" + + # Generate the keys for the deployments + keys = [f"deployment:{model_id}:cooldown" for model_id in model_ids] + + # Retrieve the values for the keys using mget + results = self.cache.batch_get_cache(keys=keys) + + min_cooldown_time = self.default_cooldown_time + # Process the results + for model_id, result in zip(model_ids, results): + if result and isinstance(result, dict): + cooldown_cache_value = CooldownCacheValue(**result) + if cooldown_cache_value["cooldown_time"] < min_cooldown_time: + min_cooldown_time = cooldown_cache_value["cooldown_time"] + + return min_cooldown_time + + +# Usage example: +# cooldown_cache = CooldownCache(cache=your_cache_instance, cooldown_time=your_cooldown_time) +# cooldown_cache.add_deployment_to_cooldown(deployment, original_exception, exception_status) +# active_cooldowns = cooldown_cache.get_active_cooldowns() diff --git a/litellm/tests/test_router.py b/litellm/tests/test_router.py index 6e58a04f9..d3d5cad74 100644 --- a/litellm/tests/test_router.py +++ b/litellm/tests/test_router.py @@ -2254,7 +2254,9 @@ def test_router_dynamic_cooldown_correct_retry_after_time(sync_mode): assert response_headers["retry-after"] == cooldown_time -def test_router_dynamic_cooldown_message_retry_time(): +@pytest.mark.parametrize("sync_mode", [True, False]) +@pytest.mark.asyncio +async def test_router_dynamic_cooldown_message_retry_time(sync_mode): """ User feedback: litellm says "No deployments available for selected model, Try again in 60 seconds" but Azure says to retry in at most 9s @@ -2294,19 +2296,49 @@ def test_router_dynamic_cooldown_message_retry_time(): ): for _ in range(2): try: + if sync_mode: + router.embedding( + model="text-embedding-ada-002", + input="Hello world!", + client=openai_client, + ) + else: + await router.aembedding( + model="text-embedding-ada-002", + input="Hello world!", + client=openai_client, + ) + except litellm.RateLimitError: + pass + + if sync_mode: + cooldown_deployments = router._get_cooldown_deployments() + else: + cooldown_deployments = await router._async_get_cooldown_deployments() + print( + "Cooldown deployments - {}\n{}".format( + cooldown_deployments, len(cooldown_deployments) + ) + ) + + assert len(cooldown_deployments) > 0 + exception_raised = False + try: + if sync_mode: router.embedding( model="text-embedding-ada-002", input="Hello world!", client=openai_client, ) - except litellm.RateLimitError: - pass - - try: - router.embedding( - model="text-embedding-ada-002", - input="Hello world!", - client=openai_client, - ) + else: + await router.aembedding( + model="text-embedding-ada-002", + input="Hello world!", + client=openai_client, + ) except litellm.types.router.RouterRateLimitError as e: + print(e) + exception_raised = True assert e.cooldown_time == cooldown_time + + assert exception_raised From 5572ad7241f8df0970673ea29ef6ffa3ae3316d0 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Sat, 24 Aug 2024 17:11:32 -0700 Subject: [PATCH 08/10] fix(cooldown_cache.py): fix linting errors --- litellm/router_utils/cooldown_cache.py | 36 ++++++++------------------ 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/litellm/router_utils/cooldown_cache.py b/litellm/router_utils/cooldown_cache.py index 50a23e530..e47f77618 100644 --- a/litellm/router_utils/cooldown_cache.py +++ b/litellm/router_utils/cooldown_cache.py @@ -24,7 +24,7 @@ class CooldownCache: def _common_add_cooldown_logic( self, model_id: str, original_exception, exception_status, cooldown_time: float - ) -> Tuple[str, dict]: + ) -> Tuple[str, CooldownCacheValue]: try: current_time = time.time() cooldown_key = f"deployment:{model_id}:cooldown" @@ -76,27 +76,9 @@ class CooldownCache: ) raise e - async def async_add_deployment_to_cooldown( - self, - model_id: str, - original_exception: Exception, - exception_status: int, - cooldown_time: Optional[float], - ): - cooldown_key, cooldown_data = self._common_add_cooldown_logic( - model_id=model_id, original_exception=original_exception - ) - - # Set the cache with a TTL equal to the cooldown time - self.cache.set_cache( - value=cooldown_data, - key=cooldown_key, - ttl=cooldown_time or self.default_cooldown_time, - ) - async def async_get_active_cooldowns( self, model_ids: List[str] - ) -> List[Tuple[str, dict]]: + ) -> List[Tuple[str, CooldownCacheValue]]: # Generate the keys for the deployments keys = [f"deployment:{model_id}:cooldown" for model_id in model_ids] @@ -106,12 +88,15 @@ class CooldownCache: active_cooldowns = [] # Process the results for model_id, result in zip(model_ids, results): - if result: - active_cooldowns.append((model_id, result)) + if result and isinstance(result, dict): + cooldown_cache_value = CooldownCacheValue(**result) + active_cooldowns.append((model_id, cooldown_cache_value)) return active_cooldowns - def get_active_cooldowns(self, model_ids: List[str]) -> List[Tuple[str, dict]]: + def get_active_cooldowns( + self, model_ids: List[str] + ) -> List[Tuple[str, CooldownCacheValue]]: # Generate the keys for the deployments keys = [f"deployment:{model_id}:cooldown" for model_id in model_ids] @@ -121,8 +106,9 @@ class CooldownCache: active_cooldowns = [] # Process the results for model_id, result in zip(model_ids, results): - if result: - active_cooldowns.append((model_id, result)) + if result and isinstance(result, dict): + cooldown_cache_value = CooldownCacheValue(**result) + active_cooldowns.append((model_id, cooldown_cache_value)) return active_cooldowns From cd7dd2a511adbc878bb0e4b76cb6ca087424bdb0 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Tue, 27 Aug 2024 07:40:28 -0700 Subject: [PATCH 09/10] fix(cooldown_cache.py): fix linting errors --- litellm/router_utils/cooldown_cache.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/litellm/router_utils/cooldown_cache.py b/litellm/router_utils/cooldown_cache.py index e47f77618..2d29a1a5d 100644 --- a/litellm/router_utils/cooldown_cache.py +++ b/litellm/router_utils/cooldown_cache.py @@ -89,7 +89,7 @@ class CooldownCache: # Process the results for model_id, result in zip(model_ids, results): if result and isinstance(result, dict): - cooldown_cache_value = CooldownCacheValue(**result) + cooldown_cache_value = CooldownCacheValue(**result) # type: ignore active_cooldowns.append((model_id, cooldown_cache_value)) return active_cooldowns @@ -107,7 +107,7 @@ class CooldownCache: # Process the results for model_id, result in zip(model_ids, results): if result and isinstance(result, dict): - cooldown_cache_value = CooldownCacheValue(**result) + cooldown_cache_value = CooldownCacheValue(**result) # type: ignore active_cooldowns.append((model_id, cooldown_cache_value)) return active_cooldowns @@ -125,7 +125,7 @@ class CooldownCache: # Process the results for model_id, result in zip(model_ids, results): if result and isinstance(result, dict): - cooldown_cache_value = CooldownCacheValue(**result) + cooldown_cache_value = CooldownCacheValue(**result) # type: ignore if cooldown_cache_value["cooldown_time"] < min_cooldown_time: min_cooldown_time = cooldown_cache_value["cooldown_time"] From 18b67a455ebebdfcff7d641173bb5c8d0812afcd Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Tue, 27 Aug 2024 10:46:57 -0700 Subject: [PATCH 10/10] test: fix test --- litellm/llms/openai.py | 14 ++++++++++---- litellm/tests/test_completion.py | 13 +++++++------ litellm/utils.py | 5 ++++- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/litellm/llms/openai.py b/litellm/llms/openai.py index 1ad870537..25e8c59aa 100644 --- a/litellm/llms/openai.py +++ b/litellm/llms/openai.py @@ -770,7 +770,7 @@ class OpenAIChatCompletion(BaseLLM): openai_aclient: AsyncOpenAI, data: dict, timeout: Union[float, httpx.Timeout], - ): + ) -> Tuple[dict, BaseModel]: """ Helper to: - call chat.completions.create.with_raw_response when litellm.return_response_headers is True @@ -783,7 +783,10 @@ class OpenAIChatCompletion(BaseLLM): ) ) - headers = dict(raw_response.headers) + if hasattr(raw_response, "headers"): + headers = dict(raw_response.headers) + else: + headers = {} response = raw_response.parse() return headers, response except OpenAIError as e: @@ -800,7 +803,7 @@ class OpenAIChatCompletion(BaseLLM): openai_client: OpenAI, data: dict, timeout: Union[float, httpx.Timeout], - ): + ) -> Tuple[dict, BaseModel]: """ Helper to: - call chat.completions.create.with_raw_response when litellm.return_response_headers is True @@ -811,7 +814,10 @@ class OpenAIChatCompletion(BaseLLM): **data, timeout=timeout ) - headers = dict(raw_response.headers) + if hasattr(raw_response, "headers"): + headers = dict(raw_response.headers) + else: + headers = {} response = raw_response.parse() return headers, response except OpenAIError as e: diff --git a/litellm/tests/test_completion.py b/litellm/tests/test_completion.py index 47c6aedc1..4b2a05d83 100644 --- a/litellm/tests/test_completion.py +++ b/litellm/tests/test_completion.py @@ -1637,18 +1637,19 @@ def test_completion_perplexity_api(): pydantic_obj = ChatCompletion(**response_object) def _return_pydantic_obj(*args, **kwargs): - return pydantic_obj + new_response = MagicMock() + new_response.headers = {"hello": "world"} - print(f"pydantic_obj: {pydantic_obj}") + new_response.parse.return_value = pydantic_obj + return new_response openai_client = OpenAI() - openai_client.chat.completions.create = MagicMock() - with patch.object( - openai_client.chat.completions, "create", side_effect=_return_pydantic_obj + openai_client.chat.completions.with_raw_response, + "create", + side_effect=_return_pydantic_obj, ) as mock_client: - pass # litellm.set_verbose= True messages = [ {"role": "system", "content": "You're a good bot"}, diff --git a/litellm/utils.py b/litellm/utils.py index 8ac094f59..405c8a344 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -637,7 +637,10 @@ def client(original_function): if is_coroutine is True: pass else: - if isinstance(original_response, ModelResponse): + if ( + isinstance(original_response, ModelResponse) + and len(original_response.choices) > 0 + ): model_response: Optional[str] = original_response.choices[ 0 ].message.content # type: ignore