From 73e5b96d8e38f2aee5d23e4e9c8da055cf4368b3 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Wed, 13 Dec 2023 19:11:43 -0800 Subject: [PATCH] fix(utils.py): support cache logging for async router calls --- litellm/tests/test_custom_callback_router.py | 53 +++++++++++++++++++- litellm/utils.py | 2 +- 2 files changed, 52 insertions(+), 3 deletions(-) diff --git a/litellm/tests/test_custom_callback_router.py b/litellm/tests/test_custom_callback_router.py index 83fed89de..8e1d20943 100644 --- a/litellm/tests/test_custom_callback_router.py +++ b/litellm/tests/test_custom_callback_router.py @@ -5,7 +5,7 @@ from datetime import datetime import pytest sys.path.insert(0, os.path.abspath('../..')) from typing import Optional, Literal, List -from litellm import Router +from litellm import Router, Cache import litellm from litellm.integrations.custom_logger import CustomLogger @@ -436,4 +436,53 @@ async def test_async_chat_azure_with_fallbacks(): except Exception as e: print(f"Assertion Error: {traceback.format_exc()}") pytest.fail(f"An exception occurred - {str(e)}") -# asyncio.run(test_async_chat_azure_with_fallbacks()) \ No newline at end of file +# asyncio.run(test_async_chat_azure_with_fallbacks()) + +# CACHING +## Test Azure - completion, embedding +@pytest.mark.asyncio +async def test_async_completion_azure_caching(): + customHandler_caching = CompletionCustomHandler() + litellm.cache = Cache(type="redis", host=os.environ['REDIS_HOST'], port=os.environ['REDIS_PORT'], password=os.environ['REDIS_PASSWORD']) + litellm.callbacks = [customHandler_caching] + unique_time = time.time() + model_list = [ + { + "model_name": "gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-v-2", + "api_key": os.getenv("AZURE_API_KEY"), + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { + "model_name": "gpt-3.5-turbo-16k", + "litellm_params": { + "model": "gpt-3.5-turbo-16k", + }, + "tpm": 240000, + "rpm": 1800 + } + ] + router = Router(model_list=model_list) # type: ignore + response1 = await router.acompletion(model="gpt-3.5-turbo", + messages=[{ + "role": "user", + "content": f"Hi 👋 - i'm async azure {unique_time}" + }], + caching=True) + await asyncio.sleep(1) + print(f"customHandler_caching.states pre-cache hit: {customHandler_caching.states}") + response2 = await router.acompletion(model="gpt-3.5-turbo", + messages=[{ + "role": "user", + "content": f"Hi 👋 - i'm async azure {unique_time}" + }], + caching=True) + await asyncio.sleep(1) # success callbacks are done in parallel + print(f"customHandler_caching.states post-cache hit: {customHandler_caching.states}") + assert len(customHandler_caching.errors) == 0 + assert len(customHandler_caching.states) == 4 # pre, post, success, success diff --git a/litellm/utils.py b/litellm/utils.py index a686828a1..3ff1ce4b1 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -1660,7 +1660,7 @@ def client(original_function): end_time = datetime.datetime.now() model, custom_llm_provider, dynamic_api_key, api_base = litellm.get_llm_provider(model=model, custom_llm_provider=kwargs.get('custom_llm_provider', None), api_base=kwargs.get('api_base', None), api_key=kwargs.get('api_key', None)) print_verbose(f"Async Wrapper: Completed Call, calling async_success_handler: {logging_obj.async_success_handler}") - logging_obj.update_environment_variables(model=model, user=kwargs.get('user', None), optional_params={}, litellm_params={"logger_fn": kwargs.get('logger_fn', None), "acompletion": True}, input=kwargs.get('messages', ""), api_key=kwargs.get('api_key', None), original_response=str(cached_result), additional_args=None, stream=kwargs.get('stream', False)) + logging_obj.update_environment_variables(model=model, user=kwargs.get('user', None), optional_params={}, litellm_params={"logger_fn": kwargs.get('logger_fn', None), "acompletion": True, "metadata": kwargs.get("metadata", {}), "model_info": kwargs.get("model_info", {}), "proxy_server_request": kwargs.get("proxy_server_request", None), "preset_cache_key": kwargs.get("preset_cache_key", None), "stream_response": kwargs.get("stream_response", {})}, input=kwargs.get('messages', ""), api_key=kwargs.get('api_key', None), original_response=str(cached_result), additional_args=None, stream=kwargs.get('stream', False)) asyncio.create_task(logging_obj.async_success_handler(cached_result, start_time, end_time, cache_hit)) threading.Thread(target=logging_obj.success_handler, args=(cached_result, start_time, end_time, cache_hit)).start() return cached_result