From 7b83238cb5e6e1a40c94f8f55a7ea4c6b7fc014e Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Tue, 5 Dec 2023 21:29:51 -0800 Subject: [PATCH] fix(router.py): log when a call is retried or fallback happens --- litellm/router.py | 69 +++++++++++++----- litellm/tests/test_router_fallbacks.py | 96 ++++++++++++++++++++------ 2 files changed, 126 insertions(+), 39 deletions(-) diff --git a/litellm/router.py b/litellm/router.py index 9d6ab5a10..38846f078 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -97,7 +97,7 @@ class Router: self.total_calls: defaultdict = defaultdict(int) # dict to store total calls made to each model self.fail_calls: defaultdict = defaultdict(int) # dict to store fail_calls made to each model self.success_calls: defaultdict = defaultdict(int) # dict to store success_calls made to each model - + self.previous_models: List = [] # list to store failed calls (passed in as metadata to next call) # make Router.chat.completions.create compatible for openai.chat.completions.create self.chat = litellm.Chat(params=default_litellm_params) @@ -393,6 +393,8 @@ class Router: Iterate through the model groups and try calling that deployment """ try: + ## LOGGING + kwargs = self.log_retry(kwargs=kwargs, e=original_exception) kwargs["model"] = mg kwargs["metadata"]["model_group"] = mg response = await self.async_function_with_retries(*args, **kwargs) @@ -436,6 +438,10 @@ class Router: else: raise original_exception + ## LOGGING + if len(num_retries) > 0: + kwargs = self.log_retry(kwargs=kwargs, e=original_exception) + for current_attempt in range(num_retries): self.print_verbose(f"retrying request. Current attempt - {current_attempt}; num retries: {num_retries}") try: @@ -446,6 +452,8 @@ class Router: return response except Exception as e: + ## LOGGING + kwargs = self.log_retry(kwargs=kwargs, e=e) remaining_retries = num_retries - current_attempt if "No models available" in str(e): timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries, min_timeout=1) @@ -471,13 +479,12 @@ class Router: try: response = self.function_with_retries(*args, **kwargs) return response - except Exception as e: + except Exception as e: original_exception = e self.print_verbose(f"An exception occurs {original_exception}") try: self.print_verbose(f"Trying to fallback b/w models. Initial model group: {model_group}") if isinstance(e, litellm.ContextWindowExceededError) and context_window_fallbacks is not None: - self.print_verbose(f"inside context window fallbacks: {context_window_fallbacks}") fallback_model_group = None for item in context_window_fallbacks: # [{"gpt-3.5-turbo": ["gpt-4"]}] @@ -493,6 +500,8 @@ class Router: Iterate through the model groups and try calling that deployment """ try: + ## LOGGING + kwargs = self.log_retry(kwargs=kwargs, e=original_exception) kwargs["model"] = mg response = self.function_with_fallbacks(*args, **kwargs) return response @@ -514,11 +523,13 @@ class Router: Iterate through the model groups and try calling that deployment """ try: + ## LOGGING + kwargs = self.log_retry(kwargs=kwargs, e=original_exception) kwargs["model"] = mg response = self.function_with_fallbacks(*args, **kwargs) return response except Exception as e: - pass + raise e except Exception as e: raise e raise original_exception @@ -528,7 +539,6 @@ class Router: Try calling the model 3 times. Shuffle between available deployments. """ self.print_verbose(f"Inside function with retries: args - {args}; kwargs - {kwargs}") - backoff_factor = 1 original_function = kwargs.pop("original_function") num_retries = kwargs.pop("num_retries") fallbacks = kwargs.pop("fallbacks", self.fallbacks) @@ -544,6 +554,9 @@ class Router: if ((isinstance(original_exception, litellm.ContextWindowExceededError) and context_window_fallbacks is None) or (isinstance(original_exception, openai.RateLimitError) and fallbacks is not None)): raise original_exception + ## LOGGING + if len(num_retries) > 0: + kwargs = self.log_retry(kwargs=kwargs, e=original_exception) ### RETRY for current_attempt in range(num_retries): self.print_verbose(f"retrying request. Current attempt - {current_attempt}; retries left: {num_retries}") @@ -552,19 +565,19 @@ class Router: response = original_function(*args, **kwargs) return response - except openai.RateLimitError as e: - if num_retries > 0: - remaining_retries = num_retries - current_attempt - timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries) - # on RateLimitError we'll wait for an exponential time before trying again + except Exception as e: + ## LOGGING + kwargs = self.log_retry(kwargs=kwargs, e=e) + remaining_retries = num_retries - current_attempt + if "No models available" in str(e): + timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries, min_timeout=1) + time.sleep(timeout) + elif hasattr(e, "status_code") and hasattr(e, "response") and litellm._should_retry(status_code=e.status_code): + if hasattr(e.response, "headers"): + timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries, response_headers=e.response.headers) + else: + timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries) time.sleep(timeout) - else: - raise e - - except Exception as e: - # for any other exception types, immediately retry - if num_retries > 0: - pass else: raise e raise original_exception @@ -627,6 +640,26 @@ class Router: except Exception as e: raise e + def log_retry(self, kwargs: dict, e: Exception) -> dict: + """ + When a retry or fallback happens, log the details of the just failed model call - similar to Sentry breadcrumbing + """ + try: + # Log failed model as the previous model + previous_model = {"exception_type": type(e).__name__, "exception_string": str(e)} + for k, v in kwargs.items(): # log everything in kwargs except the old previous_models value - prevent nesting + if k != "metadata": + previous_model[k] = v + elif k == "metadata" and isinstance(v, dict): + previous_model[k] = {} + for metadata_k, metadata_v in kwargs['metadata'].items(): + if metadata_k != "previous_models": + previous_model[k][metadata_k] = metadata_v + self.previous_models.append(previous_model) + kwargs["metadata"]["previous_models"] = self.previous_models + return kwargs + except Exception as e: + raise e def _set_cooldown_deployments(self, deployment: str): """ @@ -994,7 +1027,7 @@ class Router: self.deployment_names.append(model["litellm_params"]["model"]) model_id = "" for key in model["litellm_params"]: - if key != "api_key": + if key != "api_key" and key != "metadata": model_id+= str(model["litellm_params"][key]) model["litellm_params"]["model"] += "-ModelID-" + model_id diff --git a/litellm/tests/test_router_fallbacks.py b/litellm/tests/test_router_fallbacks.py index 0501ea8a2..3779dc09a 100644 --- a/litellm/tests/test_router_fallbacks.py +++ b/litellm/tests/test_router_fallbacks.py @@ -10,6 +10,30 @@ sys.path.insert( import litellm from litellm import Router +from litellm.integrations.custom_logger import CustomLogger + +class MyCustomHandler(CustomLogger): + success: bool = False + failure: bool = False + previous_models: int = 0 + + def log_pre_api_call(self, model, messages, kwargs): + print(f"Pre-API Call") + + def log_post_api_call(self, kwargs, response_obj, start_time, end_time): + print(f"Post-API Call") + + def log_stream_event(self, kwargs, response_obj, start_time, end_time): + print(f"On Stream") + + def log_success_event(self, kwargs, response_obj, start_time, end_time): + print(f"previous_models: {kwargs['litellm_params']['metadata']['previous_models']}") + self.previous_models += len(kwargs["litellm_params"]["metadata"]["previous_models"]) # {"previous_models": [{"model": litellm_model_name, "exception_type": AuthenticationError, "exception_string": }]} + print(f"self.previous_models: {self.previous_models}") + print(f"On Success") + + def log_failure_event(self, kwargs, response_obj, start_time, end_time): + print(f"On Failure") model_list = [ { # list of model deployments @@ -27,7 +51,7 @@ model_list = [ "model_name": "azure/gpt-3.5-turbo-context-fallback", # openai model name "litellm_params": { # params for litellm completion/embedding call "model": "azure/chatgpt-v-2", - "api_key": "bad-key", + "api_key": os.getenv("AZURE_API_KEY"), "api_version": os.getenv("AZURE_API_VERSION"), "api_base": os.getenv("AZURE_API_BASE") }, @@ -67,52 +91,74 @@ model_list = [ -router = Router(model_list=model_list, - fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}], - context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}], - set_verbose=True) - kwargs = {"model": "azure/gpt-3.5-turbo", "messages": [{"role": "user", "content":"Hey, how's it going?"}]} def test_sync_fallbacks(): try: litellm.set_verbose = True + customHandler = MyCustomHandler() + litellm.callbacks = [customHandler] + router = Router(model_list=model_list, + fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}], + context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}], + set_verbose=False) response = router.completion(**kwargs) print(f"response: {response}") - router.flush_cache() + time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread + assert customHandler.previous_models == 1 # 0 retries, 1 fallback + router.reset() except Exception as e: print(e) # test_sync_fallbacks() def test_async_fallbacks(): litellm.set_verbose = False + router = Router(model_list=model_list, + fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}], + context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}], + set_verbose=False) async def test_get_response(): + customHandler = MyCustomHandler() + litellm.callbacks = [customHandler] user_message = "Hello, how are you?" messages = [{"content": user_message, "role": "user"}] try: response = await router.acompletion(**kwargs) - # response = await response - print(f"response: {response}") - router.flush_cache() + print(f"customHandler.previous_models: {customHandler.previous_models}") + time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread + assert customHandler.previous_models == 1 # 0 retries, 1 fallback + router.reset() except litellm.Timeout as e: pass except Exception as e: pytest.fail(f"An exception occurred: {e}") - + finally: + router.reset() asyncio.run(test_get_response()) # test_async_fallbacks() -def test_sync_context_window_fallbacks(): - try: - sample_text = "Say error 50 times" * 10000 - kwargs["model"] = "azure/gpt-3.5-turbo-context-fallback" - kwargs["messages"] = [{"role": "user", "content": sample_text}] - response = router.completion(**kwargs) - print(f"response: {response}") - router.reset() - except Exception as e: - print(e) +## COMMENTING OUT as the context size exceeds both gpt-3.5-turbo and gpt-3.5-turbo-16k, need a better message here +# def test_sync_context_window_fallbacks(): +# try: +# customHandler = MyCustomHandler() +# litellm.callbacks = [customHandler] +# sample_text = "Say error 50 times" * 10000 +# kwargs["model"] = "azure/gpt-3.5-turbo-context-fallback" +# kwargs["messages"] = [{"role": "user", "content": sample_text}] +# router = Router(model_list=model_list, +# fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}], +# context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}], +# set_verbose=False) +# response = router.completion(**kwargs) +# print(f"response: {response}") +# time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread +# assert customHandler.previous_models == 1 # 0 retries, 1 fallback +# router.reset() +# except Exception as e: +# print(f"An exception occurred - {e}") +# finally: +# router.reset() # test_sync_context_window_fallbacks() @@ -121,6 +167,8 @@ def test_dynamic_fallbacks_sync(): Allow setting the fallback in the router.completion() call. """ try: + customHandler = MyCustomHandler() + litellm.callbacks = [customHandler] router = Router(model_list=model_list, set_verbose=True) kwargs = {} kwargs["model"] = "azure/gpt-3.5-turbo" @@ -128,6 +176,8 @@ def test_dynamic_fallbacks_sync(): kwargs["fallbacks"] = [{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}] response = router.completion(**kwargs) print(f"response: {response}") + time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread + assert customHandler.previous_models == 1 # 0 retries, 1 fallback router.reset() except Exception as e: pytest.fail(f"An exception occurred - {e}") @@ -140,6 +190,8 @@ def test_dynamic_fallbacks_async(): """ async def test_get_response(): try: + customHandler = MyCustomHandler() + litellm.callbacks = [customHandler] router = Router(model_list=model_list, set_verbose=True) kwargs = {} kwargs["model"] = "azure/gpt-3.5-turbo" @@ -147,6 +199,8 @@ def test_dynamic_fallbacks_async(): kwargs["fallbacks"] = [{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}] response = await router.acompletion(**kwargs) print(f"response: {response}") + time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread + assert customHandler.previous_models == 1 # 0 retries, 1 fallback router.reset() except Exception as e: pytest.fail(f"An exception occurred - {e}")