mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-27 03:34:10 +00:00
fix(router.py): log when a call is retried or fallback happens
This commit is contained in:
parent
1365a536f3
commit
4ecd05df3e
2 changed files with 126 additions and 39 deletions
|
@ -97,7 +97,7 @@ class Router:
|
||||||
self.total_calls: defaultdict = defaultdict(int) # dict to store total calls made to each model
|
self.total_calls: defaultdict = defaultdict(int) # dict to store total calls made to each model
|
||||||
self.fail_calls: defaultdict = defaultdict(int) # dict to store fail_calls made to each model
|
self.fail_calls: defaultdict = defaultdict(int) # dict to store fail_calls made to each model
|
||||||
self.success_calls: defaultdict = defaultdict(int) # dict to store success_calls made to each model
|
self.success_calls: defaultdict = defaultdict(int) # dict to store success_calls made to each model
|
||||||
|
self.previous_models: List = [] # list to store failed calls (passed in as metadata to next call)
|
||||||
|
|
||||||
# make Router.chat.completions.create compatible for openai.chat.completions.create
|
# make Router.chat.completions.create compatible for openai.chat.completions.create
|
||||||
self.chat = litellm.Chat(params=default_litellm_params)
|
self.chat = litellm.Chat(params=default_litellm_params)
|
||||||
|
@ -393,6 +393,8 @@ class Router:
|
||||||
Iterate through the model groups and try calling that deployment
|
Iterate through the model groups and try calling that deployment
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
## LOGGING
|
||||||
|
kwargs = self.log_retry(kwargs=kwargs, e=original_exception)
|
||||||
kwargs["model"] = mg
|
kwargs["model"] = mg
|
||||||
kwargs["metadata"]["model_group"] = mg
|
kwargs["metadata"]["model_group"] = mg
|
||||||
response = await self.async_function_with_retries(*args, **kwargs)
|
response = await self.async_function_with_retries(*args, **kwargs)
|
||||||
|
@ -436,6 +438,10 @@ class Router:
|
||||||
else:
|
else:
|
||||||
raise original_exception
|
raise original_exception
|
||||||
|
|
||||||
|
## LOGGING
|
||||||
|
if len(num_retries) > 0:
|
||||||
|
kwargs = self.log_retry(kwargs=kwargs, e=original_exception)
|
||||||
|
|
||||||
for current_attempt in range(num_retries):
|
for current_attempt in range(num_retries):
|
||||||
self.print_verbose(f"retrying request. Current attempt - {current_attempt}; num retries: {num_retries}")
|
self.print_verbose(f"retrying request. Current attempt - {current_attempt}; num retries: {num_retries}")
|
||||||
try:
|
try:
|
||||||
|
@ -446,6 +452,8 @@ class Router:
|
||||||
return response
|
return response
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
## LOGGING
|
||||||
|
kwargs = self.log_retry(kwargs=kwargs, e=e)
|
||||||
remaining_retries = num_retries - current_attempt
|
remaining_retries = num_retries - current_attempt
|
||||||
if "No models available" in str(e):
|
if "No models available" in str(e):
|
||||||
timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries, min_timeout=1)
|
timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries, min_timeout=1)
|
||||||
|
@ -471,13 +479,12 @@ class Router:
|
||||||
try:
|
try:
|
||||||
response = self.function_with_retries(*args, **kwargs)
|
response = self.function_with_retries(*args, **kwargs)
|
||||||
return response
|
return response
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
original_exception = e
|
original_exception = e
|
||||||
self.print_verbose(f"An exception occurs {original_exception}")
|
self.print_verbose(f"An exception occurs {original_exception}")
|
||||||
try:
|
try:
|
||||||
self.print_verbose(f"Trying to fallback b/w models. Initial model group: {model_group}")
|
self.print_verbose(f"Trying to fallback b/w models. Initial model group: {model_group}")
|
||||||
if isinstance(e, litellm.ContextWindowExceededError) and context_window_fallbacks is not None:
|
if isinstance(e, litellm.ContextWindowExceededError) and context_window_fallbacks is not None:
|
||||||
self.print_verbose(f"inside context window fallbacks: {context_window_fallbacks}")
|
|
||||||
fallback_model_group = None
|
fallback_model_group = None
|
||||||
|
|
||||||
for item in context_window_fallbacks: # [{"gpt-3.5-turbo": ["gpt-4"]}]
|
for item in context_window_fallbacks: # [{"gpt-3.5-turbo": ["gpt-4"]}]
|
||||||
|
@ -493,6 +500,8 @@ class Router:
|
||||||
Iterate through the model groups and try calling that deployment
|
Iterate through the model groups and try calling that deployment
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
## LOGGING
|
||||||
|
kwargs = self.log_retry(kwargs=kwargs, e=original_exception)
|
||||||
kwargs["model"] = mg
|
kwargs["model"] = mg
|
||||||
response = self.function_with_fallbacks(*args, **kwargs)
|
response = self.function_with_fallbacks(*args, **kwargs)
|
||||||
return response
|
return response
|
||||||
|
@ -514,11 +523,13 @@ class Router:
|
||||||
Iterate through the model groups and try calling that deployment
|
Iterate through the model groups and try calling that deployment
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
## LOGGING
|
||||||
|
kwargs = self.log_retry(kwargs=kwargs, e=original_exception)
|
||||||
kwargs["model"] = mg
|
kwargs["model"] = mg
|
||||||
response = self.function_with_fallbacks(*args, **kwargs)
|
response = self.function_with_fallbacks(*args, **kwargs)
|
||||||
return response
|
return response
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pass
|
raise e
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise e
|
raise e
|
||||||
raise original_exception
|
raise original_exception
|
||||||
|
@ -528,7 +539,6 @@ class Router:
|
||||||
Try calling the model 3 times. Shuffle between available deployments.
|
Try calling the model 3 times. Shuffle between available deployments.
|
||||||
"""
|
"""
|
||||||
self.print_verbose(f"Inside function with retries: args - {args}; kwargs - {kwargs}")
|
self.print_verbose(f"Inside function with retries: args - {args}; kwargs - {kwargs}")
|
||||||
backoff_factor = 1
|
|
||||||
original_function = kwargs.pop("original_function")
|
original_function = kwargs.pop("original_function")
|
||||||
num_retries = kwargs.pop("num_retries")
|
num_retries = kwargs.pop("num_retries")
|
||||||
fallbacks = kwargs.pop("fallbacks", self.fallbacks)
|
fallbacks = kwargs.pop("fallbacks", self.fallbacks)
|
||||||
|
@ -544,6 +554,9 @@ class Router:
|
||||||
if ((isinstance(original_exception, litellm.ContextWindowExceededError) and context_window_fallbacks is None)
|
if ((isinstance(original_exception, litellm.ContextWindowExceededError) and context_window_fallbacks is None)
|
||||||
or (isinstance(original_exception, openai.RateLimitError) and fallbacks is not None)):
|
or (isinstance(original_exception, openai.RateLimitError) and fallbacks is not None)):
|
||||||
raise original_exception
|
raise original_exception
|
||||||
|
## LOGGING
|
||||||
|
if len(num_retries) > 0:
|
||||||
|
kwargs = self.log_retry(kwargs=kwargs, e=original_exception)
|
||||||
### RETRY
|
### RETRY
|
||||||
for current_attempt in range(num_retries):
|
for current_attempt in range(num_retries):
|
||||||
self.print_verbose(f"retrying request. Current attempt - {current_attempt}; retries left: {num_retries}")
|
self.print_verbose(f"retrying request. Current attempt - {current_attempt}; retries left: {num_retries}")
|
||||||
|
@ -552,19 +565,19 @@ class Router:
|
||||||
response = original_function(*args, **kwargs)
|
response = original_function(*args, **kwargs)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
except openai.RateLimitError as e:
|
except Exception as e:
|
||||||
if num_retries > 0:
|
## LOGGING
|
||||||
remaining_retries = num_retries - current_attempt
|
kwargs = self.log_retry(kwargs=kwargs, e=e)
|
||||||
timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries)
|
remaining_retries = num_retries - current_attempt
|
||||||
# on RateLimitError we'll wait for an exponential time before trying again
|
if "No models available" in str(e):
|
||||||
|
timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries, min_timeout=1)
|
||||||
|
time.sleep(timeout)
|
||||||
|
elif hasattr(e, "status_code") and hasattr(e, "response") and litellm._should_retry(status_code=e.status_code):
|
||||||
|
if hasattr(e.response, "headers"):
|
||||||
|
timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries, response_headers=e.response.headers)
|
||||||
|
else:
|
||||||
|
timeout = litellm._calculate_retry_after(remaining_retries=remaining_retries, max_retries=num_retries)
|
||||||
time.sleep(timeout)
|
time.sleep(timeout)
|
||||||
else:
|
|
||||||
raise e
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
# for any other exception types, immediately retry
|
|
||||||
if num_retries > 0:
|
|
||||||
pass
|
|
||||||
else:
|
else:
|
||||||
raise e
|
raise e
|
||||||
raise original_exception
|
raise original_exception
|
||||||
|
@ -627,6 +640,26 @@ class Router:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
def log_retry(self, kwargs: dict, e: Exception) -> dict:
|
||||||
|
"""
|
||||||
|
When a retry or fallback happens, log the details of the just failed model call - similar to Sentry breadcrumbing
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Log failed model as the previous model
|
||||||
|
previous_model = {"exception_type": type(e).__name__, "exception_string": str(e)}
|
||||||
|
for k, v in kwargs.items(): # log everything in kwargs except the old previous_models value - prevent nesting
|
||||||
|
if k != "metadata":
|
||||||
|
previous_model[k] = v
|
||||||
|
elif k == "metadata" and isinstance(v, dict):
|
||||||
|
previous_model[k] = {}
|
||||||
|
for metadata_k, metadata_v in kwargs['metadata'].items():
|
||||||
|
if metadata_k != "previous_models":
|
||||||
|
previous_model[k][metadata_k] = metadata_v
|
||||||
|
self.previous_models.append(previous_model)
|
||||||
|
kwargs["metadata"]["previous_models"] = self.previous_models
|
||||||
|
return kwargs
|
||||||
|
except Exception as e:
|
||||||
|
raise e
|
||||||
def _set_cooldown_deployments(self,
|
def _set_cooldown_deployments(self,
|
||||||
deployment: str):
|
deployment: str):
|
||||||
"""
|
"""
|
||||||
|
@ -994,7 +1027,7 @@ class Router:
|
||||||
self.deployment_names.append(model["litellm_params"]["model"])
|
self.deployment_names.append(model["litellm_params"]["model"])
|
||||||
model_id = ""
|
model_id = ""
|
||||||
for key in model["litellm_params"]:
|
for key in model["litellm_params"]:
|
||||||
if key != "api_key":
|
if key != "api_key" and key != "metadata":
|
||||||
model_id+= str(model["litellm_params"][key])
|
model_id+= str(model["litellm_params"][key])
|
||||||
model["litellm_params"]["model"] += "-ModelID-" + model_id
|
model["litellm_params"]["model"] += "-ModelID-" + model_id
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,30 @@ sys.path.insert(
|
||||||
|
|
||||||
import litellm
|
import litellm
|
||||||
from litellm import Router
|
from litellm import Router
|
||||||
|
from litellm.integrations.custom_logger import CustomLogger
|
||||||
|
|
||||||
|
class MyCustomHandler(CustomLogger):
|
||||||
|
success: bool = False
|
||||||
|
failure: bool = False
|
||||||
|
previous_models: int = 0
|
||||||
|
|
||||||
|
def log_pre_api_call(self, model, messages, kwargs):
|
||||||
|
print(f"Pre-API Call")
|
||||||
|
|
||||||
|
def log_post_api_call(self, kwargs, response_obj, start_time, end_time):
|
||||||
|
print(f"Post-API Call")
|
||||||
|
|
||||||
|
def log_stream_event(self, kwargs, response_obj, start_time, end_time):
|
||||||
|
print(f"On Stream")
|
||||||
|
|
||||||
|
def log_success_event(self, kwargs, response_obj, start_time, end_time):
|
||||||
|
print(f"previous_models: {kwargs['litellm_params']['metadata']['previous_models']}")
|
||||||
|
self.previous_models += len(kwargs["litellm_params"]["metadata"]["previous_models"]) # {"previous_models": [{"model": litellm_model_name, "exception_type": AuthenticationError, "exception_string": <complete_traceback>}]}
|
||||||
|
print(f"self.previous_models: {self.previous_models}")
|
||||||
|
print(f"On Success")
|
||||||
|
|
||||||
|
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
|
||||||
|
print(f"On Failure")
|
||||||
|
|
||||||
model_list = [
|
model_list = [
|
||||||
{ # list of model deployments
|
{ # list of model deployments
|
||||||
|
@ -27,7 +51,7 @@ model_list = [
|
||||||
"model_name": "azure/gpt-3.5-turbo-context-fallback", # openai model name
|
"model_name": "azure/gpt-3.5-turbo-context-fallback", # openai model name
|
||||||
"litellm_params": { # params for litellm completion/embedding call
|
"litellm_params": { # params for litellm completion/embedding call
|
||||||
"model": "azure/chatgpt-v-2",
|
"model": "azure/chatgpt-v-2",
|
||||||
"api_key": "bad-key",
|
"api_key": os.getenv("AZURE_API_KEY"),
|
||||||
"api_version": os.getenv("AZURE_API_VERSION"),
|
"api_version": os.getenv("AZURE_API_VERSION"),
|
||||||
"api_base": os.getenv("AZURE_API_BASE")
|
"api_base": os.getenv("AZURE_API_BASE")
|
||||||
},
|
},
|
||||||
|
@ -67,52 +91,74 @@ model_list = [
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
router = Router(model_list=model_list,
|
|
||||||
fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}],
|
|
||||||
context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}],
|
|
||||||
set_verbose=True)
|
|
||||||
|
|
||||||
kwargs = {"model": "azure/gpt-3.5-turbo", "messages": [{"role": "user", "content":"Hey, how's it going?"}]}
|
kwargs = {"model": "azure/gpt-3.5-turbo", "messages": [{"role": "user", "content":"Hey, how's it going?"}]}
|
||||||
|
|
||||||
def test_sync_fallbacks():
|
def test_sync_fallbacks():
|
||||||
try:
|
try:
|
||||||
litellm.set_verbose = True
|
litellm.set_verbose = True
|
||||||
|
customHandler = MyCustomHandler()
|
||||||
|
litellm.callbacks = [customHandler]
|
||||||
|
router = Router(model_list=model_list,
|
||||||
|
fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}],
|
||||||
|
context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}],
|
||||||
|
set_verbose=False)
|
||||||
response = router.completion(**kwargs)
|
response = router.completion(**kwargs)
|
||||||
print(f"response: {response}")
|
print(f"response: {response}")
|
||||||
router.flush_cache()
|
time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread
|
||||||
|
assert customHandler.previous_models == 1 # 0 retries, 1 fallback
|
||||||
|
router.reset()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(e)
|
||||||
# test_sync_fallbacks()
|
# test_sync_fallbacks()
|
||||||
|
|
||||||
def test_async_fallbacks():
|
def test_async_fallbacks():
|
||||||
litellm.set_verbose = False
|
litellm.set_verbose = False
|
||||||
|
router = Router(model_list=model_list,
|
||||||
|
fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}],
|
||||||
|
context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}],
|
||||||
|
set_verbose=False)
|
||||||
async def test_get_response():
|
async def test_get_response():
|
||||||
|
customHandler = MyCustomHandler()
|
||||||
|
litellm.callbacks = [customHandler]
|
||||||
user_message = "Hello, how are you?"
|
user_message = "Hello, how are you?"
|
||||||
messages = [{"content": user_message, "role": "user"}]
|
messages = [{"content": user_message, "role": "user"}]
|
||||||
try:
|
try:
|
||||||
response = await router.acompletion(**kwargs)
|
response = await router.acompletion(**kwargs)
|
||||||
# response = await response
|
print(f"customHandler.previous_models: {customHandler.previous_models}")
|
||||||
print(f"response: {response}")
|
time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread
|
||||||
router.flush_cache()
|
assert customHandler.previous_models == 1 # 0 retries, 1 fallback
|
||||||
|
router.reset()
|
||||||
except litellm.Timeout as e:
|
except litellm.Timeout as e:
|
||||||
pass
|
pass
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pytest.fail(f"An exception occurred: {e}")
|
pytest.fail(f"An exception occurred: {e}")
|
||||||
|
finally:
|
||||||
|
router.reset()
|
||||||
asyncio.run(test_get_response())
|
asyncio.run(test_get_response())
|
||||||
|
|
||||||
# test_async_fallbacks()
|
# test_async_fallbacks()
|
||||||
|
|
||||||
def test_sync_context_window_fallbacks():
|
## COMMENTING OUT as the context size exceeds both gpt-3.5-turbo and gpt-3.5-turbo-16k, need a better message here
|
||||||
try:
|
# def test_sync_context_window_fallbacks():
|
||||||
sample_text = "Say error 50 times" * 10000
|
# try:
|
||||||
kwargs["model"] = "azure/gpt-3.5-turbo-context-fallback"
|
# customHandler = MyCustomHandler()
|
||||||
kwargs["messages"] = [{"role": "user", "content": sample_text}]
|
# litellm.callbacks = [customHandler]
|
||||||
response = router.completion(**kwargs)
|
# sample_text = "Say error 50 times" * 10000
|
||||||
print(f"response: {response}")
|
# kwargs["model"] = "azure/gpt-3.5-turbo-context-fallback"
|
||||||
router.reset()
|
# kwargs["messages"] = [{"role": "user", "content": sample_text}]
|
||||||
except Exception as e:
|
# router = Router(model_list=model_list,
|
||||||
print(e)
|
# fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}],
|
||||||
|
# context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}],
|
||||||
|
# set_verbose=False)
|
||||||
|
# response = router.completion(**kwargs)
|
||||||
|
# print(f"response: {response}")
|
||||||
|
# time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread
|
||||||
|
# assert customHandler.previous_models == 1 # 0 retries, 1 fallback
|
||||||
|
# router.reset()
|
||||||
|
# except Exception as e:
|
||||||
|
# print(f"An exception occurred - {e}")
|
||||||
|
# finally:
|
||||||
|
# router.reset()
|
||||||
|
|
||||||
# test_sync_context_window_fallbacks()
|
# test_sync_context_window_fallbacks()
|
||||||
|
|
||||||
|
@ -121,6 +167,8 @@ def test_dynamic_fallbacks_sync():
|
||||||
Allow setting the fallback in the router.completion() call.
|
Allow setting the fallback in the router.completion() call.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
|
customHandler = MyCustomHandler()
|
||||||
|
litellm.callbacks = [customHandler]
|
||||||
router = Router(model_list=model_list, set_verbose=True)
|
router = Router(model_list=model_list, set_verbose=True)
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
kwargs["model"] = "azure/gpt-3.5-turbo"
|
kwargs["model"] = "azure/gpt-3.5-turbo"
|
||||||
|
@ -128,6 +176,8 @@ def test_dynamic_fallbacks_sync():
|
||||||
kwargs["fallbacks"] = [{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}]
|
kwargs["fallbacks"] = [{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}]
|
||||||
response = router.completion(**kwargs)
|
response = router.completion(**kwargs)
|
||||||
print(f"response: {response}")
|
print(f"response: {response}")
|
||||||
|
time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread
|
||||||
|
assert customHandler.previous_models == 1 # 0 retries, 1 fallback
|
||||||
router.reset()
|
router.reset()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pytest.fail(f"An exception occurred - {e}")
|
pytest.fail(f"An exception occurred - {e}")
|
||||||
|
@ -140,6 +190,8 @@ def test_dynamic_fallbacks_async():
|
||||||
"""
|
"""
|
||||||
async def test_get_response():
|
async def test_get_response():
|
||||||
try:
|
try:
|
||||||
|
customHandler = MyCustomHandler()
|
||||||
|
litellm.callbacks = [customHandler]
|
||||||
router = Router(model_list=model_list, set_verbose=True)
|
router = Router(model_list=model_list, set_verbose=True)
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
kwargs["model"] = "azure/gpt-3.5-turbo"
|
kwargs["model"] = "azure/gpt-3.5-turbo"
|
||||||
|
@ -147,6 +199,8 @@ def test_dynamic_fallbacks_async():
|
||||||
kwargs["fallbacks"] = [{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}]
|
kwargs["fallbacks"] = [{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}]
|
||||||
response = await router.acompletion(**kwargs)
|
response = await router.acompletion(**kwargs)
|
||||||
print(f"response: {response}")
|
print(f"response: {response}")
|
||||||
|
time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread
|
||||||
|
assert customHandler.previous_models == 1 # 0 retries, 1 fallback
|
||||||
router.reset()
|
router.reset()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
pytest.fail(f"An exception occurred - {e}")
|
pytest.fail(f"An exception occurred - {e}")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue