fix(router.py): log when a call is retried or fallback happens

This commit is contained in:
Krrish Dholakia 2023-12-05 21:29:51 -08:00
parent 642c62f7b7
commit 7b83238cb5
2 changed files with 126 additions and 39 deletions

View file

@ -97,7 +97,7 @@ class Router:
self.total_calls: defaultdict = defaultdict(int) # dict to store total calls made to each model
self.fail_calls: defaultdict = defaultdict(int) # dict to store fail_calls made to each model
self.success_calls: defaultdict = defaultdict(int) # dict to store success_calls made to each model
self.previous_models: List = [] # list to store failed calls (passed in as metadata to next call)
# make Router.chat.completions.create compatible for openai.chat.completions.create
self.chat = litellm.Chat(params=default_litellm_params)
@ -393,6 +393,8 @@ class Router:
Iterate through the model groups and try calling that deployment
"""
try:
## LOGGING
kwargs = self.log_retry(kwargs=kwargs, e=original_exception)
kwargs["model"] = mg
kwargs["metadata"]["model_group"] = mg
response = await self.async_function_with_retries(*args, **kwargs)
@ -436,6 +438,10 @@ class Router:
else:
raise original_exception
## LOGGING
if len(num_retries) > 0:
kwargs = self.log_retry(kwargs=kwargs, e=original_exception)
for current_attempt in range(num_retries):
self.print_verbose(f"retrying request. Current attempt - {current_attempt}; num retries: {num_retries}")
try:
@ -446,6 +452,8 @@ class Router:
return response
except Exception as e:
## LOGGING
kwargs = self.log_retry(kwargs=kwargs, e=e)
remaining_retries = num_retries - current_attempt
if "No models available" in str(e):
timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries, min_timeout=1)
@ -471,13 +479,12 @@ class Router:
try:
response = self.function_with_retries(*args, **kwargs)
return response
except Exception as e:
except Exception as e:
original_exception = e
self.print_verbose(f"An exception occurs {original_exception}")
try:
self.print_verbose(f"Trying to fallback b/w models. Initial model group: {model_group}")
if isinstance(e, litellm.ContextWindowExceededError) and context_window_fallbacks is not None:
self.print_verbose(f"inside context window fallbacks: {context_window_fallbacks}")
fallback_model_group = None
for item in context_window_fallbacks: # [{"gpt-3.5-turbo": ["gpt-4"]}]
@ -493,6 +500,8 @@ class Router:
Iterate through the model groups and try calling that deployment
"""
try:
## LOGGING
kwargs = self.log_retry(kwargs=kwargs, e=original_exception)
kwargs["model"] = mg
response = self.function_with_fallbacks(*args, **kwargs)
return response
@ -514,11 +523,13 @@ class Router:
Iterate through the model groups and try calling that deployment
"""
try:
## LOGGING
kwargs = self.log_retry(kwargs=kwargs, e=original_exception)
kwargs["model"] = mg
response = self.function_with_fallbacks(*args, **kwargs)
return response
except Exception as e:
pass
raise e
except Exception as e:
raise e
raise original_exception
@ -528,7 +539,6 @@ class Router:
Try calling the model 3 times. Shuffle between available deployments.
"""
self.print_verbose(f"Inside function with retries: args - {args}; kwargs - {kwargs}")
backoff_factor = 1
original_function = kwargs.pop("original_function")
num_retries = kwargs.pop("num_retries")
fallbacks = kwargs.pop("fallbacks", self.fallbacks)
@ -544,6 +554,9 @@ class Router:
if ((isinstance(original_exception, litellm.ContextWindowExceededError) and context_window_fallbacks is None)
or (isinstance(original_exception, openai.RateLimitError) and fallbacks is not None)):
raise original_exception
## LOGGING
if len(num_retries) > 0:
kwargs = self.log_retry(kwargs=kwargs, e=original_exception)
### RETRY
for current_attempt in range(num_retries):
self.print_verbose(f"retrying request. Current attempt - {current_attempt}; retries left: {num_retries}")
@ -552,19 +565,19 @@ class Router:
response = original_function(*args, **kwargs)
return response
except openai.RateLimitError as e:
if num_retries > 0:
remaining_retries = num_retries - current_attempt
timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries)
# on RateLimitError we'll wait for an exponential time before trying again
except Exception as e:
## LOGGING
kwargs = self.log_retry(kwargs=kwargs, e=e)
remaining_retries = num_retries - current_attempt
if "No models available" in str(e):
timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries, min_timeout=1)
time.sleep(timeout)
elif hasattr(e, "status_code") and hasattr(e, "response") and litellm._should_retry(status_code=e.status_code):
if hasattr(e.response, "headers"):
timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries, response_headers=e.response.headers)
else:
timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries)
time.sleep(timeout)
else:
raise e
except Exception as e:
# for any other exception types, immediately retry
if num_retries > 0:
pass
else:
raise e
raise original_exception
@ -627,6 +640,26 @@ class Router:
except Exception as e:
raise e
def log_retry(self, kwargs: dict, e: Exception) -> dict:
"""
When a retry or fallback happens, log the details of the just failed model call - similar to Sentry breadcrumbing
"""
try:
# Log failed model as the previous model
previous_model = {"exception_type": type(e).__name__, "exception_string": str(e)}
for k, v in kwargs.items(): # log everything in kwargs except the old previous_models value - prevent nesting
if k != "metadata":
previous_model[k] = v
elif k == "metadata" and isinstance(v, dict):
previous_model[k] = {}
for metadata_k, metadata_v in kwargs['metadata'].items():
if metadata_k != "previous_models":
previous_model[k][metadata_k] = metadata_v
self.previous_models.append(previous_model)
kwargs["metadata"]["previous_models"] = self.previous_models
return kwargs
except Exception as e:
raise e
def _set_cooldown_deployments(self,
deployment: str):
"""
@ -994,7 +1027,7 @@ class Router:
self.deployment_names.append(model["litellm_params"]["model"])
model_id = ""
for key in model["litellm_params"]:
if key != "api_key":
if key != "api_key" and key != "metadata":
model_id+= str(model["litellm_params"][key])
model["litellm_params"]["model"] += "-ModelID-" + model_id

View file

@ -10,6 +10,30 @@ sys.path.insert(
import litellm
from litellm import Router
from litellm.integrations.custom_logger import CustomLogger
class MyCustomHandler(CustomLogger):
success: bool = False
failure: bool = False
previous_models: int = 0
def log_pre_api_call(self, model, messages, kwargs):
print(f"Pre-API Call")
def log_post_api_call(self, kwargs, response_obj, start_time, end_time):
print(f"Post-API Call")
def log_stream_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Stream")
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"previous_models: {kwargs['litellm_params']['metadata']['previous_models']}")
self.previous_models += len(kwargs["litellm_params"]["metadata"]["previous_models"]) # {"previous_models": [{"model": litellm_model_name, "exception_type": AuthenticationError, "exception_string": <complete_traceback>}]}
print(f"self.previous_models: {self.previous_models}")
print(f"On Success")
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Failure")
model_list = [
{ # list of model deployments
@ -27,7 +51,7 @@ model_list = [
"model_name": "azure/gpt-3.5-turbo-context-fallback", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
@ -67,52 +91,74 @@ model_list = [
router = Router(model_list=model_list,
fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}],
context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}],
set_verbose=True)
kwargs = {"model": "azure/gpt-3.5-turbo", "messages": [{"role": "user", "content":"Hey, how's it going?"}]}
def test_sync_fallbacks():
try:
litellm.set_verbose = True
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
router = Router(model_list=model_list,
fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}],
context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}],
set_verbose=False)
response = router.completion(**kwargs)
print(f"response: {response}")
router.flush_cache()
time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread
assert customHandler.previous_models == 1 # 0 retries, 1 fallback
router.reset()
except Exception as e:
print(e)
# test_sync_fallbacks()
def test_async_fallbacks():
litellm.set_verbose = False
router = Router(model_list=model_list,
fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}],
context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}],
set_verbose=False)
async def test_get_response():
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
user_message = "Hello, how are you?"
messages = [{"content": user_message, "role": "user"}]
try:
response = await router.acompletion(**kwargs)
# response = await response
print(f"response: {response}")
router.flush_cache()
print(f"customHandler.previous_models: {customHandler.previous_models}")
time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread
assert customHandler.previous_models == 1 # 0 retries, 1 fallback
router.reset()
except litellm.Timeout as e:
pass
except Exception as e:
pytest.fail(f"An exception occurred: {e}")
finally:
router.reset()
asyncio.run(test_get_response())
# test_async_fallbacks()
def test_sync_context_window_fallbacks():
try:
sample_text = "Say error 50 times" * 10000
kwargs["model"] = "azure/gpt-3.5-turbo-context-fallback"
kwargs["messages"] = [{"role": "user", "content": sample_text}]
response = router.completion(**kwargs)
print(f"response: {response}")
router.reset()
except Exception as e:
print(e)
## COMMENTING OUT as the context size exceeds both gpt-3.5-turbo and gpt-3.5-turbo-16k, need a better message here
# def test_sync_context_window_fallbacks():
# try:
# customHandler = MyCustomHandler()
# litellm.callbacks = [customHandler]
# sample_text = "Say error 50 times" * 10000
# kwargs["model"] = "azure/gpt-3.5-turbo-context-fallback"
# kwargs["messages"] = [{"role": "user", "content": sample_text}]
# router = Router(model_list=model_list,
# fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}],
# context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}],
# set_verbose=False)
# response = router.completion(**kwargs)
# print(f"response: {response}")
# time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread
# assert customHandler.previous_models == 1 # 0 retries, 1 fallback
# router.reset()
# except Exception as e:
# print(f"An exception occurred - {e}")
# finally:
# router.reset()
# test_sync_context_window_fallbacks()
@ -121,6 +167,8 @@ def test_dynamic_fallbacks_sync():
Allow setting the fallback in the router.completion() call.
"""
try:
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
router = Router(model_list=model_list, set_verbose=True)
kwargs = {}
kwargs["model"] = "azure/gpt-3.5-turbo"
@ -128,6 +176,8 @@ def test_dynamic_fallbacks_sync():
kwargs["fallbacks"] = [{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}]
response = router.completion(**kwargs)
print(f"response: {response}")
time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread
assert customHandler.previous_models == 1 # 0 retries, 1 fallback
router.reset()
except Exception as e:
pytest.fail(f"An exception occurred - {e}")
@ -140,6 +190,8 @@ def test_dynamic_fallbacks_async():
"""
async def test_get_response():
try:
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
router = Router(model_list=model_list, set_verbose=True)
kwargs = {}
kwargs["model"] = "azure/gpt-3.5-turbo"
@ -147,6 +199,8 @@ def test_dynamic_fallbacks_async():
kwargs["fallbacks"] = [{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}]
response = await router.acompletion(**kwargs)
print(f"response: {response}")
time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread
assert customHandler.previous_models == 1 # 0 retries, 1 fallback
router.reset()
except Exception as e:
pytest.fail(f"An exception occurred - {e}")