From 1738341dcb16884bfff42a0b2004ba5afd856c5d Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Tue, 14 Nov 2023 18:55:01 -0800 Subject: [PATCH] fix(main.py): misrouting ollama models to nlp cloud --- litellm/llms/openai.py | 38 +++++++++++++++----- litellm/main.py | 31 +++------------- litellm/proxy/proxy_cli.py | 1 - litellm/router.py | 51 ++++++++++++++++++++++----- litellm/tests/test_loadtest_router.py | 20 +++++++++-- 5 files changed, 94 insertions(+), 47 deletions(-) diff --git a/litellm/llms/openai.py b/litellm/llms/openai.py index 3f2fc38b9..54da975a1 100644 --- a/litellm/llms/openai.py +++ b/litellm/llms/openai.py @@ -1,5 +1,5 @@ from typing import Optional, Union -import types +import types, time import httpx from .base import BaseLLM from litellm.utils import ModelResponse, Choices, Message, CustomStreamWrapper, convert_to_model_response_object, Usage @@ -160,6 +160,7 @@ class OpenAIChatCompletion(BaseLLM): super().__init__() self._client_session = self.create_client_session() self._aclient_session = self.create_aclient_session() + self._num_retry_httpx_errors = 3 # httpx throws random errors - e.g. ReadError, def validate_environment(self, api_key): headers = { @@ -168,6 +169,15 @@ class OpenAIChatCompletion(BaseLLM): if api_key: headers["Authorization"] = f"Bearer {api_key}" return headers + + def _retry_request(self, *args, **kwargs): + self._num_retry_httpx_errors -= 1 + + time.sleep(1) + + original_function = kwargs.pop("original_function") + + return original_function(*args, **kwargs) def completion(self, model_response: ModelResponse, @@ -253,15 +263,27 @@ class OpenAIChatCompletion(BaseLLM): api_base: str, data: dict, headers: dict, model_response: ModelResponse): + kwargs = locals() client = self._aclient_session - - response = await client.post(api_base, json=data, headers=headers) - response_json = response.json() - if response.status_code != 200: - raise OpenAIError(status_code=response.status_code, message=response.text, request=response.request, response=response) + try: + response = await client.post(api_base, json=data, headers=headers) + response_json = response.json() + if response.status_code != 200: + raise OpenAIError(status_code=response.status_code, message=response.text, request=response.request, response=response) - ## RESPONSE OBJECT - return convert_to_model_response_object(response_object=response_json, model_response_object=model_response) + ## RESPONSE OBJECT + return convert_to_model_response_object(response_object=response_json, model_response_object=model_response) + except httpx.ReadError or httpx.ReadTimeout: + if self._num_retry_httpx_errors > 0: + kwargs["original_function"] = self.acompletion + return self._retry_request(**kwargs) + else: + raise e + except Exception as e: + if response and hasattr(response, "text"): + raise OpenAIError(status_code=500, message=f"{str(e)}\n\nOriginal Response: {response.text}") + else: + raise OpenAIError(status_code=500, message=f"{str(e)}") def streaming(self, logging_obj, diff --git a/litellm/main.py b/litellm/main.py index d168422c8..f79fcc382 100644 --- a/litellm/main.py +++ b/litellm/main.py @@ -713,7 +713,7 @@ def completion( response = CustomStreamWrapper(model_response, model, custom_llm_provider="anthropic", logging_obj=logging) return response response = model_response - elif model in litellm.nlp_cloud_models or custom_llm_provider == "nlp_cloud": + elif custom_llm_provider == "nlp_cloud": nlp_cloud_key = ( api_key or litellm.nlp_cloud_key or get_secret("NLP_CLOUD_API_KEY") or litellm.api_key ) @@ -744,7 +744,7 @@ def completion( response = CustomStreamWrapper(model_response, model, custom_llm_provider="nlp_cloud", logging_obj=logging) return response response = model_response - elif model in litellm.aleph_alpha_models: + elif custom_llm_provider == "aleph_alpha": aleph_alpha_key = ( api_key or litellm.aleph_alpha_key or get_secret("ALEPH_ALPHA_API_KEY") or get_secret("ALEPHALPHA_API_KEY") or litellm.api_key ) @@ -909,7 +909,7 @@ def completion( ) return response response = model_response - elif model in litellm.openrouter_models or custom_llm_provider == "openrouter": + elif custom_llm_provider == "openrouter": api_base = ( api_base or litellm.api_base @@ -969,28 +969,6 @@ def completion( logging_obj=logging, acompletion=acompletion ) - - # if headers: - # response = openai.chat.completions.create( - # headers=headers, # type: ignore - # **data, # type: ignore - # ) - # else: - # openrouter_site_url = get_secret("OR_SITE_URL") - # openrouter_app_name = get_secret("OR_APP_NAME") - # # if openrouter_site_url is None, set it to https://litellm.ai - # if openrouter_site_url is None: - # openrouter_site_url = "https://litellm.ai" - # # if openrouter_app_name is None, set it to liteLLM - # if openrouter_app_name is None: - # openrouter_app_name = "liteLLM" - # response = openai.chat.completions.create( # type: ignore - # extra_headers=httpx.Headers({ # type: ignore - # "HTTP-Referer": openrouter_site_url, # type: ignore - # "X-Title": openrouter_app_name, # type: ignore - # }), # type: ignore - # **data, - # ) ## LOGGING logging.post_call( input=messages, api_key=openai.api_key, original_response=response @@ -1093,7 +1071,7 @@ def completion( ) return response response = model_response - elif model in litellm.ai21_models: + elif custom_llm_provider == "ai21": custom_llm_provider = "ai21" ai21_key = ( api_key @@ -1233,7 +1211,6 @@ def completion( ) else: prompt = prompt_factory(model=model, messages=messages, custom_llm_provider=custom_llm_provider) - ## LOGGING if kwargs.get('acompletion', False) == True: if optional_params.get("stream", False) == True: diff --git a/litellm/proxy/proxy_cli.py b/litellm/proxy/proxy_cli.py index cd1b4e7f4..52fd2827b 100644 --- a/litellm/proxy/proxy_cli.py +++ b/litellm/proxy/proxy_cli.py @@ -113,7 +113,6 @@ def run_server(host, port, api_base, api_version, model, alias, add_key, headers print("\033[1;32mDone successfully\033[0m") return if model and "ollama" in model: - print(f"ollama called") run_ollama_serve() if test != False: click.echo('\nLiteLLM: Making a test ChatCompletions request to your proxy') diff --git a/litellm/router.py b/litellm/router.py index bd3ac8767..2a200f8fc 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import Dict, List, Optional, Union, Literal import random, threading, time -import litellm +import litellm, openai import logging class Router: @@ -37,7 +37,7 @@ class Router: self.healthy_deployments: List = self.model_list if num_retries: - litellm.num_retries = num_retries + self.num_retries = num_retries self.routing_strategy = routing_strategy ### HEALTH CHECK THREAD ### @@ -131,6 +131,35 @@ class Router: return item or item[0] raise ValueError("No models available.") + + def function_with_retries(self, *args, **kwargs): + try: + import tenacity + except Exception as e: + raise Exception(f"tenacity import failed please run `pip install tenacity`. Error{e}") + + retry_info = {"attempts": 0, "final_result": None} + + def after_callback(retry_state): + retry_info["attempts"] = retry_state.attempt_number + retry_info["final_result"] = retry_state.outcome.result() + + try: + original_exception = kwargs.pop("original_exception") + original_function = kwargs.pop("original_function") + if isinstance(original_exception, openai.RateLimitError): + retryer = tenacity.Retrying(wait=tenacity.wait_exponential(multiplier=1, max=10), + stop=tenacity.stop_after_attempt(self.num_retries), + reraise=True, + after=after_callback) + elif isinstance(original_exception, openai.APIError): + retryer = tenacity.Retrying(stop=tenacity.stop_after_attempt(self.num_retries), + reraise=True, + after=after_callback) + + return retryer(original_function, *args, **kwargs) + except Exception as e: + raise Exception(f"Error in function_with_retries: {e}\n\nRetry Info: {retry_info}") ### COMPLETION + EMBEDDING FUNCTIONS @@ -148,9 +177,6 @@ class Router: # pick the one that is available (lowest TPM/RPM) deployment = self.get_available_deployment(model=model, messages=messages) data = deployment["litellm_params"] - # call via litellm.completion() - # return litellm.completion(**{**data, "messages": messages, "caching": self.cache_responses, **kwargs}) - # litellm.set_verbose = True return litellm.completion(**{**data, "messages": messages, "caching": self.cache_responses, **kwargs}) @@ -161,10 +187,17 @@ class Router: is_retry: Optional[bool] = False, is_fallback: Optional[bool] = False, **kwargs): - # pick the one that is available (lowest TPM/RPM) - deployment = self.get_available_deployment(model=model, messages=messages) - data = deployment["litellm_params"] - return await litellm.acompletion(**{**data, "messages": messages, "caching": self.cache_responses, **kwargs}) + try: + deployment = self.get_available_deployment(model=model, messages=messages) + data = deployment["litellm_params"] + response = await litellm.acompletion(**{**data, "messages": messages, "caching": self.cache_responses, **kwargs}) + return response + except Exception as e: + kwargs["model"] = model + kwargs["messages"] = messages + kwargs["original_exception"] = e + kwargs["original_function"] = self.acompletion + return self.function_with_retries(**kwargs) def text_completion(self, model: str, diff --git a/litellm/tests/test_loadtest_router.py b/litellm/tests/test_loadtest_router.py index 0d983172b..50a8b8693 100644 --- a/litellm/tests/test_loadtest_router.py +++ b/litellm/tests/test_loadtest_router.py @@ -25,6 +25,22 @@ async def main(): "model": "gpt-3.5-turbo", "api_key": os.getenv("OPENAI_API_KEY"), }, + }, { + "model_name": "gpt-3.5-turbo", + "litellm_params": { + "model": "azure/chatgpt-v-2", + "api_key": os.getenv("AZURE_API_KEY"), + "api_base": os.getenv("AZURE_API_BASE"), + "api_version": os.getenv("AZURE_API_VERSION") + }, + }, { + "model_name": "gpt-3.5-turbo", + "litellm_params": { + "model": "azure/chatgpt-functioncalling", + "api_key": os.getenv("AZURE_API_KEY"), + "api_base": os.getenv("AZURE_API_BASE"), + "api_version": os.getenv("AZURE_API_VERSION") + }, }] router = Router(model_list=model_list, num_retries=3) @@ -35,13 +51,13 @@ async def main(): tasks = [] # Launch 1000 tasks - for _ in range(1000): + for _ in range(100): task = asyncio.create_task(call_acompletion(semaphore, router, {"model": "gpt-3.5-turbo", "messages": [{"role":"user", "content": "Hey, how's it going?"}]})) tasks.append(task) # Wait for all tasks to complete responses = await asyncio.gather(*tasks) # Process responses as needed - + print(f"NUMBER OF COMPLETED TASKS: {len(responses)}") # Run the main function asyncio.run(main())