fix(router.py): reset caching correctly

This commit is contained in:
Krrish Dholakia 2023-12-11 19:57:34 -08:00
parent d34a8184c5
commit bbf094dcf5
6 changed files with 119 additions and 91 deletions

View file

@ -1222,11 +1222,14 @@ class Router:
raise ValueError("No models available.") raise ValueError("No models available.")
def flush_cache(self): def flush_cache(self):
litellm.cache = None
self.cache.flush_cache() self.cache.flush_cache()
def reset(self): def reset(self):
## clean up on close ## clean up on close
litellm.success_callback = [] litellm.success_callback = []
litellm.__async_success_callback = []
litellm.failure_callback = [] litellm.failure_callback = []
litellm._async_failure_callback = []
self.flush_cache() self.flush_cache()

Binary file not shown.

View file

@ -19,3 +19,27 @@ model_list:
model_info: model_info:
description: this is a test openai model description: this is a test openai model
model_name: test_openai_models model_name: test_openai_models
- litellm_params:
model: gpt-3.5-turbo
model_info:
description: this is a test openai model
id: 56f1bd94-3b54-4b67-9ea2-7c70e9a3a709
model_name: test_openai_models
- litellm_params:
model: gpt-3.5-turbo
model_info:
description: this is a test openai model
id: 4d1ee26c-abca-450c-8744-8e87fd6755e9
model_name: test_openai_models
- litellm_params:
model: gpt-3.5-turbo
model_info:
description: this is a test openai model
id: 00e19c0f-b63d-42bb-88e9-016fb0c60764
model_name: test_openai_models
- litellm_params:
model: gpt-3.5-turbo
model_info:
description: this is a test openai model
id: 79fc75bf-8e1b-47d5-8d24-9365a854af03
model_name: test_openai_models

View file

@ -1,13 +1,12 @@
### What this tests #### ### What this tests ####
import sys, os, time, inspect, asyncio import sys, os, time, inspect, asyncio, traceback
import pytest import pytest
sys.path.insert(0, os.path.abspath('../..')) sys.path.insert(0, os.path.abspath('../..'))
from litellm import completion, embedding from litellm import completion, embedding
import litellm import litellm
from litellm.integrations.custom_logger import CustomLogger from litellm.integrations.custom_logger import CustomLogger
async_success = False
class MyCustomHandler(CustomLogger): class MyCustomHandler(CustomLogger):
complete_streaming_response_in_callback = "" complete_streaming_response_in_callback = ""
def __init__(self): def __init__(self):
@ -51,8 +50,6 @@ class MyCustomHandler(CustomLogger):
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Async success") print(f"On Async success")
self.async_success = True self.async_success = True
print("Value of async success: ", self.async_success)
print("\n kwargs: ", kwargs)
if kwargs.get("model") == "text-embedding-ada-002": if kwargs.get("model") == "text-embedding-ada-002":
self.async_success_embedding = True self.async_success_embedding = True
self.async_embedding_kwargs = kwargs self.async_embedding_kwargs = kwargs
@ -64,8 +61,6 @@ class MyCustomHandler(CustomLogger):
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Async Failure") print(f"On Async Failure")
self.async_failure = True self.async_failure = True
print("Value of async failure: ", self.async_failure)
print("\n kwargs: ", kwargs)
if kwargs.get("model") == "text-embedding-ada-002": if kwargs.get("model") == "text-embedding-ada-002":
self.async_failure_embedding = True self.async_failure_embedding = True
self.async_embedding_kwargs_fail = kwargs self.async_embedding_kwargs_fail = kwargs
@ -218,11 +213,26 @@ def test_azure_completion_stream():
except Exception as e: except Exception as e:
pytest.fail(f"Error occurred: {e}") pytest.fail(f"Error occurred: {e}")
def test_async_custom_handler(): @pytest.mark.asyncio
try: async def test_async_custom_handler_completion():
customHandler2 = MyCustomHandler() try:
litellm.callbacks = [customHandler2] customHandler_success = MyCustomHandler()
litellm.set_verbose = True customHandler_failure = MyCustomHandler()
# success
assert customHandler_success.async_success == False
litellm.callbacks = [customHandler_success]
response = await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{
"role": "user",
"content": "hello from litellm test",
}]
)
await asyncio.sleep(1)
assert customHandler_success.async_success == True, "async success is not set to True even after success"
assert customHandler_success.async_completion_kwargs.get("model") == "gpt-3.5-turbo"
# failure
litellm.callbacks = [customHandler_failure]
messages = [ messages = [
{"role": "system", "content": "You are a helpful assistant."}, {"role": "system", "content": "You are a helpful assistant."},
{ {
@ -230,75 +240,57 @@ def test_async_custom_handler():
"content": "how do i kill someone", "content": "how do i kill someone",
}, },
] ]
async def test_1():
try:
response = await litellm.acompletion(
model="gpt-3.5-turbo",
messages=messages,
api_key="test",
)
except:
pass
assert customHandler2.async_failure == False assert customHandler_failure.async_failure == False
asyncio.run(test_1()) try:
assert customHandler2.async_failure == True, "async failure is not set to True even after failure"
assert customHandler2.async_completion_kwargs_fail.get("model") == "gpt-3.5-turbo"
assert len(str(customHandler2.async_completion_kwargs_fail.get("exception"))) > 10 # exppect APIError("OpenAIException - Error code: 401 - {'error': {'message': 'Incorrect API key provided: test. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}"), 'traceback_exception': 'Traceback (most recent call last):\n File "/Users/ishaanjaffer/Github/litellm/litellm/llms/openai.py", line 269, in acompletion\n response = await openai_aclient.chat.completions.create(**data)\n File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/openai/resources/chat/completions.py", line 119
print("Passed setting async failure")
async def test_2():
response = await litellm.acompletion( response = await litellm.acompletion(
model="gpt-3.5-turbo", model="gpt-3.5-turbo",
messages=[{ messages=messages,
"role": "user", api_key="my-bad-key",
"content": "hello from litellm test", )
}] except:
) pass
print("\n response", response) assert customHandler_failure.async_failure == True, "async failure is not set to True even after failure"
assert customHandler2.async_success == False assert customHandler_failure.async_completion_kwargs_fail.get("model") == "gpt-3.5-turbo"
asyncio.run(test_2()) assert len(str(customHandler_failure.async_completion_kwargs_fail.get("exception"))) > 10 # expect APIError("OpenAIException - Error code: 401 - {'error': {'message': 'Incorrect API key provided: test. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}"), 'traceback_exception': 'Traceback (most recent call last):\n File "/Users/ishaanjaffer/Github/litellm/litellm/llms/openai.py", line 269, in acompletion\n response = await openai_aclient.chat.completions.create(**data)\n File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/openai/resources/chat/completions.py", line 119
assert customHandler2.async_success == True, "async success is not set to True even after success" litellm.callbacks = []
assert customHandler2.async_completion_kwargs.get("model") == "gpt-3.5-turbo" print("Passed setting async failure")
except Exception as e:
pytest.fail(f"An exception occurred - {str(e)}")
# asyncio.run(test_async_custom_handler_completion())
@pytest.mark.asyncio
async def test_3(): async def test_async_custom_handler_embedding():
response = await litellm.aembedding( try:
customHandler_embedding = MyCustomHandler()
litellm.callbacks = [customHandler_embedding]
# success
assert customHandler_embedding.async_success_embedding == False
response = await litellm.aembedding(
model="text-embedding-ada-002", model="text-embedding-ada-002",
input = ["hello world"], input = ["hello world"],
) )
print("\n response", response) await asyncio.sleep(1)
assert customHandler2.async_success_embedding == False assert customHandler_embedding.async_success_embedding == True, "async_success_embedding is not set to True even after success"
asyncio.run(test_3()) assert customHandler_embedding.async_embedding_kwargs.get("model") == "text-embedding-ada-002"
assert customHandler2.async_success_embedding == True, "async_success_embedding is not set to True even after success" assert customHandler_embedding.async_embedding_response["usage"]["prompt_tokens"] ==2
assert customHandler2.async_embedding_kwargs.get("model") == "text-embedding-ada-002"
assert customHandler2.async_embedding_response["usage"]["prompt_tokens"] ==2
print("Passed setting async success: Embedding") print("Passed setting async success: Embedding")
# failure
assert customHandler_embedding.async_failure_embedding == False
print("Testing custom failure callback for embedding") try:
response = await litellm.aembedding(
async def test_4(): model="text-embedding-ada-002",
try: input = ["hello world"],
response = await litellm.aembedding( api_key="my-bad-key",
model="text-embedding-ada-002", )
input = ["hello world"], except:
api_key="test", pass
) assert customHandler_embedding.async_failure_embedding == True, "async failure embedding is not set to True even after failure"
except: assert customHandler_embedding.async_embedding_kwargs_fail.get("model") == "text-embedding-ada-002"
pass assert len(str(customHandler_embedding.async_embedding_kwargs_fail.get("exception"))) > 10 # exppect APIError("OpenAIException - Error code: 401 - {'error': {'message': 'Incorrect API key provided: test. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}"), 'traceback_exception': 'Traceback (most recent call last):\n File "/Users/ishaanjaffer/Github/litellm/litellm/llms/openai.py", line 269, in acompletion\n response = await openai_aclient.chat.completions.create(**data)\n File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/openai/resources/chat/completions.py", line 119
assert customHandler2.async_failure_embedding == False
asyncio.run(test_4())
assert customHandler2.async_failure_embedding == True, "async failure embedding is not set to True even after failure"
assert customHandler2.async_embedding_kwargs_fail.get("model") == "text-embedding-ada-002"
assert len(str(customHandler2.async_embedding_kwargs_fail.get("exception"))) > 10 # exppect APIError("OpenAIException - Error code: 401 - {'error': {'message': 'Incorrect API key provided: test. You can find your API key at https://platform.openai.com/account/api-keys.', 'type': 'invalid_request_error', 'param': None, 'code': 'invalid_api_key'}}"), 'traceback_exception': 'Traceback (most recent call last):\n File "/Users/ishaanjaffer/Github/litellm/litellm/llms/openai.py", line 269, in acompletion\n response = await openai_aclient.chat.completions.create(**data)\n File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/site-packages/openai/resources/chat/completions.py", line 119
print("Passed setting async failure")
except Exception as e: except Exception as e:
pytest.fail(f"Error occurred: {e}") pytest.fail(f"An exception occurred - {str(e)}")
# test_async_custom_handler() asyncio.run(test_async_custom_handler_embedding())
from litellm import Cache from litellm import Cache
def test_redis_cache_completion_stream(): def test_redis_cache_completion_stream():
# Important Test - This tests if we can add to streaming cache, when custom callbacks are set # Important Test - This tests if we can add to streaming cache, when custom callbacks are set

View file

@ -507,7 +507,6 @@ def test_aembedding_on_router():
model="text-embedding-ada-002", model="text-embedding-ada-002",
input=["good morning from litellm 2"], input=["good morning from litellm 2"],
) )
print("sync embedding response: ", response)
router.reset() router.reset()
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()

View file

@ -964,7 +964,7 @@ class Logging:
end_time=end_time, end_time=end_time,
print_verbose=print_verbose, print_verbose=print_verbose,
) )
if callback == "cache": if callback == "cache" and litellm.cache is not None:
# this only logs streaming once, complete_streaming_response exists i.e when stream ends # this only logs streaming once, complete_streaming_response exists i.e when stream ends
print_verbose("success_callback: reaches cache for logging!") print_verbose("success_callback: reaches cache for logging!")
kwargs = self.model_call_details kwargs = self.model_call_details
@ -1052,7 +1052,7 @@ class Logging:
start_time, end_time, result = self._success_handler_helper_fn(start_time=start_time, end_time=end_time, result=result) start_time, end_time, result = self._success_handler_helper_fn(start_time=start_time, end_time=end_time, result=result)
for callback in litellm._async_success_callback: for callback in litellm._async_success_callback:
try: try:
if callback == "cache": if callback == "cache" and litellm.cache is not None:
# set_cache once complete streaming response is built # set_cache once complete streaming response is built
print_verbose("async success_callback: reaches cache for logging!") print_verbose("async success_callback: reaches cache for logging!")
kwargs = self.model_call_details kwargs = self.model_call_details
@ -1238,7 +1238,7 @@ class Logging:
print_verbose=print_verbose, print_verbose=print_verbose,
callback_func=callback callback_func=callback
) )
except: except Exception as e:
print_verbose( print_verbose(
f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {traceback.format_exc()}" f"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success logging {traceback.format_exc()}"
) )
@ -1649,6 +1649,19 @@ def client(original_function):
result._response_ms = (end_time - start_time).total_seconds() * 1000 # return response latency in ms like openai result._response_ms = (end_time - start_time).total_seconds() * 1000 # return response latency in ms like openai
return result return result
except Exception as e: except Exception as e:
traceback_exception = traceback.format_exc()
crash_reporting(*args, **kwargs, exception=traceback_exception)
end_time = datetime.datetime.now()
if logging_obj:
try:
logging_obj.failure_handler(e, traceback_exception, start_time, end_time) # DO NOT MAKE THREADED - router retry fallback relies on this!
except Exception as e:
raise e
try:
await logging_obj.async_failure_handler(e, traceback_exception, start_time, end_time)
except Exception as e:
raise e
call_type = original_function.__name__ call_type = original_function.__name__
if call_type == CallTypes.acompletion.value: if call_type == CallTypes.acompletion.value:
num_retries = ( num_retries = (
@ -1658,27 +1671,24 @@ def client(original_function):
) )
litellm.num_retries = None # set retries to None to prevent infinite loops litellm.num_retries = None # set retries to None to prevent infinite loops
context_window_fallback_dict = kwargs.get("context_window_fallback_dict", {}) context_window_fallback_dict = kwargs.get("context_window_fallback_dict", {})
if num_retries: if num_retries:
kwargs["num_retries"] = num_retries try:
kwargs["original_function"] = original_function kwargs["num_retries"] = num_retries
if (isinstance(e, openai.RateLimitError)): # rate limiting specific error kwargs["original_function"] = original_function
kwargs["retry_strategy"] = "exponential_backoff_retry" if (isinstance(e, openai.RateLimitError)): # rate limiting specific error
elif (isinstance(e, openai.APIError)): # generic api error kwargs["retry_strategy"] = "exponential_backoff_retry"
kwargs["retry_strategy"] = "constant_retry" elif (isinstance(e, openai.APIError)): # generic api error
return await litellm.acompletion_with_retries(*args, **kwargs) kwargs["retry_strategy"] = "constant_retry"
return await litellm.acompletion_with_retries(*args, **kwargs)
except:
pass
elif isinstance(e, litellm.exceptions.ContextWindowExceededError) and context_window_fallback_dict and model in context_window_fallback_dict: elif isinstance(e, litellm.exceptions.ContextWindowExceededError) and context_window_fallback_dict and model in context_window_fallback_dict:
if len(args) > 0: if len(args) > 0:
args[0] = context_window_fallback_dict[model] args[0] = context_window_fallback_dict[model]
else: else:
kwargs["model"] = context_window_fallback_dict[model] kwargs["model"] = context_window_fallback_dict[model]
return await original_function(*args, **kwargs) return await original_function(*args, **kwargs)
traceback_exception = traceback.format_exc()
crash_reporting(*args, **kwargs, exception=traceback_exception)
end_time = datetime.datetime.now()
if logging_obj:
logging_obj.failure_handler(e, traceback_exception, start_time, end_time) # DO NOT MAKE THREADED - router retry fallback relies on this!
await logging_obj.async_failure_handler(e, traceback_exception, start_time, end_time)
raise e raise e
is_coroutine = inspect.iscoroutinefunction(original_function) is_coroutine = inspect.iscoroutinefunction(original_function)