fix(utils.py): support cache logging for async router calls

This commit is contained in:
Krrish Dholakia 2023-12-13 19:11:43 -08:00
parent cffd190887
commit 73e5b96d8e
2 changed files with 52 additions and 3 deletions

View file

@ -5,7 +5,7 @@ from datetime import datetime
import pytest
sys.path.insert(0, os.path.abspath('../..'))
from typing import Optional, Literal, List
from litellm import Router
from litellm import Router, Cache
import litellm
from litellm.integrations.custom_logger import CustomLogger
@ -437,3 +437,52 @@ async def test_async_chat_azure_with_fallbacks():
print(f"Assertion Error: {traceback.format_exc()}")
pytest.fail(f"An exception occurred - {str(e)}")
# asyncio.run(test_async_chat_azure_with_fallbacks())
# CACHING
## Test Azure - completion, embedding
@pytest.mark.asyncio
async def test_async_completion_azure_caching():
customHandler_caching = CompletionCustomHandler()
litellm.cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD'])
litellm.callbacks = [customHandler_caching]
unique_time = time.time()
model_list = [
{
"model_name": "gpt-3.5-turbo", # openai model name
"litellm_params": { # params for litellm completion/embedding call
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 240000,
"rpm": 1800
},
{
"model_name": "gpt-3.5-turbo-16k",
"litellm_params": {
"model": "gpt-3.5-turbo-16k",
},
"tpm": 240000,
"rpm": 1800
}
]
router = Router(model_list=model_list) # type: ignore
response1 = await router.acompletion(model="gpt-3.5-turbo",
messages=[{
"role": "user",
"content": f"Hi 👋 - i'm async azure {unique_time}"
}],
caching=True)
await asyncio.sleep(1)
print(f"customHandler_caching.states pre-cache hit: {customHandler_caching.states}")
response2 = await router.acompletion(model="gpt-3.5-turbo",
messages=[{
"role": "user",
"content": f"Hi 👋 - i'm async azure {unique_time}"
}],
caching=True)
await asyncio.sleep(1) # success callbacks are done in parallel
print(f"customHandler_caching.states post-cache hit: {customHandler_caching.states}")
assert len(customHandler_caching.errors) == 0
assert len(customHandler_caching.states) == 4 # pre, post, success, success

View file

@ -1660,7 +1660,7 @@ def client(original_function):
end_time = datetime.datetime.now()
model, custom_llm_provider, dynamic_api_key, api_base = litellm.get_llm_provider(model=model, custom_llm_provider=kwargs.get('custom_llm_provider', None), api_base=kwargs.get('api_base', None), api_key=kwargs.get('api_key', None))
print_verbose(f"Async Wrapper: Completed Call, calling async_success_handler: {logging_obj.async_success_handler}")
logging_obj.update_environment_variables(model=model, user=kwargs.get('user', None), optional_params={}, litellm_params={"logger_fn": kwargs.get('logger_fn', None), "acompletion": True}, input=kwargs.get('messages', ""), api_key=kwargs.get('api_key', None), original_response=str(cached_result), additional_args=None, stream=kwargs.get('stream', False))
logging_obj.update_environment_variables(model=model, user=kwargs.get('user', None), optional_params={}, litellm_params={"logger_fn": kwargs.get('logger_fn', None), "acompletion": True, "metadata": kwargs.get("metadata", {}), "model_info": kwargs.get("model_info", {}), "proxy_server_request": kwargs.get("proxy_server_request", None), "preset_cache_key": kwargs.get("preset_cache_key", None), "stream_response": kwargs.get("stream_response", {})}, input=kwargs.get('messages', ""), api_key=kwargs.get('api_key', None), original_response=str(cached_result), additional_args=None, stream=kwargs.get('stream', False))
asyncio.create_task(logging_obj.async_success_handler(cached_result, start_time, end_time, cache_hit))
threading.Thread(target=logging_obj.success_handler, args=(cached_result, start_time, end_time, cache_hit)).start()
return cached_result