From 0cf0c2d6ddd23b20c7ed47f6074900e6738e3a68 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Tue, 12 Dec 2023 09:53:35 -0800 Subject: [PATCH] fix(router.py): deepcopy initial model list, don't mutate it --- litellm/llms/openai.py | 55 ++- litellm/main.py | 2 +- litellm/router.py | 3 +- litellm/tests/langfuse.log | 2 +- .../test_configs/test_config_no_auth.yaml | 6 + litellm/tests/test_router_fallbacks.py | 314 +++++++++++++----- 6 files changed, 280 insertions(+), 102 deletions(-) diff --git a/litellm/llms/openai.py b/litellm/llms/openai.py index 33d3504bb1..9489e44c3e 100644 --- a/litellm/llms/openai.py +++ b/litellm/llms/openai.py @@ -1,3 +1,4 @@ +from tkinter import N from typing import Optional, Union, Any import types, time, json import httpx @@ -195,23 +196,23 @@ class OpenAIChatCompletion(BaseLLM): **optional_params } - ## LOGGING - logging_obj.pre_call( - input=messages, - api_key=api_key, - additional_args={"headers": headers, "api_base": api_base, "acompletion": acompletion, "complete_input_dict": data}, - ) - try: max_retries = data.pop("max_retries", 2) if acompletion is True: if optional_params.get("stream", False): - return self.async_streaming(logging_obj=logging_obj, data=data, model=model, api_base=api_base, api_key=api_key, timeout=timeout, client=client, max_retries=max_retries) + return self.async_streaming(logging_obj=logging_obj, headers=headers, data=data, model=model, api_base=api_base, api_key=api_key, timeout=timeout, client=client, max_retries=max_retries) else: - return self.acompletion(data=data, model_response=model_response, api_base=api_base, api_key=api_key, timeout=timeout, client=client, max_retries=max_retries) + return self.acompletion(data=data, headers=headers, logging_obj=logging_obj, model_response=model_response, api_base=api_base, api_key=api_key, timeout=timeout, client=client, max_retries=max_retries) elif optional_params.get("stream", False): - return self.streaming(logging_obj=logging_obj, data=data, model=model, api_base=api_base, api_key=api_key, timeout=timeout, client=client, max_retries=max_retries) + return self.streaming(logging_obj=logging_obj, headers=headers, data=data, model=model, api_base=api_base, api_key=api_key, timeout=timeout, client=client, max_retries=max_retries) else: + ## LOGGING + logging_obj.pre_call( + input=messages, + api_key=api_key, + additional_args={"headers": headers, "api_base": api_base, "acompletion": acompletion, "complete_input_dict": data}, + ) + if not isinstance(max_retries, int): raise OpenAIError(status_code=422, message="max retries must be an int") if client is None: @@ -260,6 +261,8 @@ class OpenAIChatCompletion(BaseLLM): api_base: Optional[str]=None, client=None, max_retries=None, + logging_obj=None, + headers=None ): response = None try: @@ -267,8 +270,21 @@ class OpenAIChatCompletion(BaseLLM): openai_aclient = AsyncOpenAI(api_key=api_key, base_url=api_base, http_client=litellm.aclient_session, timeout=timeout, max_retries=max_retries) else: openai_aclient = client + ## LOGGING + logging_obj.pre_call( + input=data['messages'], + api_key=api_key, + additional_args={"headers": headers, "api_base": api_base, "acompletion": True, "complete_input_dict": data}, + ) response = await openai_aclient.chat.completions.create(**data) - return convert_to_model_response_object(response_object=json.loads(response.model_dump_json()), model_response_object=model_response) + stringified_response = response.model_dump_json() + logging_obj.post_call( + input=data['messages'], + api_key=api_key, + original_response=stringified_response, + additional_args={"complete_input_dict": data}, + ) + return convert_to_model_response_object(response_object=json.loads(stringified_response), model_response_object=model_response) except Exception as e: if response and hasattr(response, "text"): raise OpenAIError(status_code=500, message=f"{str(e)}\n\nOriginal Response: {response.text}") @@ -286,12 +302,19 @@ class OpenAIChatCompletion(BaseLLM): api_key: Optional[str]=None, api_base: Optional[str]=None, client = None, - max_retries=None + max_retries=None, + headers=None ): if client is None: openai_client = OpenAI(api_key=api_key, base_url=api_base, http_client=litellm.client_session, timeout=timeout, max_retries=max_retries) else: openai_client = client + ## LOGGING + logging_obj.pre_call( + input=data['messages'], + api_key=api_key, + additional_args={"headers": headers, "api_base": api_base, "acompletion": False, "complete_input_dict": data}, + ) response = openai_client.chat.completions.create(**data) streamwrapper = CustomStreamWrapper(completion_stream=response, model=model, custom_llm_provider="openai",logging_obj=logging_obj) return streamwrapper @@ -305,6 +328,7 @@ class OpenAIChatCompletion(BaseLLM): api_base: Optional[str]=None, client=None, max_retries=None, + headers=None ): response = None try: @@ -312,6 +336,13 @@ class OpenAIChatCompletion(BaseLLM): openai_aclient = AsyncOpenAI(api_key=api_key, base_url=api_base, http_client=litellm.aclient_session, timeout=timeout, max_retries=max_retries) else: openai_aclient = client + ## LOGGING + logging_obj.pre_call( + input=data['messages'], + api_key=api_key, + additional_args={"headers": headers, "api_base": api_base, "acompletion": True, "complete_input_dict": data}, + ) + response = await openai_aclient.chat.completions.create(**data) streamwrapper = CustomStreamWrapper(completion_stream=response, model=model, custom_llm_provider="openai",logging_obj=logging_obj) async for transformed_chunk in streamwrapper: diff --git a/litellm/main.py b/litellm/main.py index 8284978200..f2b1a1fc02 100644 --- a/litellm/main.py +++ b/litellm/main.py @@ -607,7 +607,7 @@ def completion( ) raise e - if optional_params.get("stream", False) or acompletion == True: + if optional_params.get("stream", False): ## LOGGING logging.post_call( input=messages, diff --git a/litellm/router.py b/litellm/router.py index 47ede91105..6c2a9baf00 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -7,6 +7,7 @@ # # Thank you ! We ❤️ you! - Krrish & Ishaan +import copy from datetime import datetime from typing import Dict, List, Optional, Union, Literal, Any import random, threading, time, traceback, uuid @@ -879,7 +880,7 @@ class Router: return chosen_item def set_model_list(self, model_list: list): - self.model_list = model_list + self.model_list = copy.deepcopy(model_list) # we add api_base/api_key each model so load balancing between azure/gpt on api_base1 and api_base2 works import os for model in self.model_list: diff --git a/litellm/tests/langfuse.log b/litellm/tests/langfuse.log index 9311e915ae..bbb7577382 100644 --- a/litellm/tests/langfuse.log +++ b/litellm/tests/langfuse.log @@ -1,5 +1,5 @@ Task exception was never retrieved -future: exception=RuntimeError('Event loop is closed')> +future: exception=RuntimeError('Event loop is closed')> Traceback (most recent call last): File "/opt/homebrew/lib/python3.11/site-packages/prisma/engine/query.py", line 112, in aclose await self._close_session() diff --git a/litellm/tests/test_configs/test_config_no_auth.yaml b/litellm/tests/test_configs/test_config_no_auth.yaml index f934f21f45..47b83d71cb 100644 --- a/litellm/tests/test_configs/test_config_no_auth.yaml +++ b/litellm/tests/test_configs/test_config_no_auth.yaml @@ -61,3 +61,9 @@ model_list: description: this is a test openai model id: 34339b1e-e030-4bcc-a531-c48559f10ce4 model_name: test_openai_models +- litellm_params: + model: gpt-3.5-turbo + model_info: + description: this is a test openai model + id: f6f74e14-ac64-4403-9365-319e584dcdc5 + model_name: test_openai_models diff --git a/litellm/tests/test_router_fallbacks.py b/litellm/tests/test_router_fallbacks.py index 513caa7b85..22b5f121e6 100644 --- a/litellm/tests/test_router_fallbacks.py +++ b/litellm/tests/test_router_fallbacks.py @@ -21,10 +21,14 @@ class MyCustomHandler(CustomLogger): print(f"Pre-API Call") def log_post_api_call(self, kwargs, response_obj, start_time, end_time): - print(f"Post-API Call") + print(f"Post-API Call - response object: {response_obj}; model: {kwargs['model']}") + def log_stream_event(self, kwargs, response_obj, start_time, end_time): print(f"On Stream") + + def async_log_stream_event(self, kwargs, response_obj, start_time, end_time): + print(f"On Stream") def log_success_event(self, kwargs, response_obj, start_time, end_time): print(f"previous_models: {kwargs['litellm_params']['metadata']['previous_models']}") @@ -41,67 +45,65 @@ class MyCustomHandler(CustomLogger): def log_failure_event(self, kwargs, response_obj, start_time, end_time): print(f"On Failure") -model_list = [ - { # list of model deployments - "model_name": "azure/gpt-3.5-turbo", # openai model name - "litellm_params": { # params for litellm completion/embedding call - "model": "azure/chatgpt-v-2", - "api_key": "bad-key", - "api_version": os.getenv("AZURE_API_VERSION"), - "api_base": os.getenv("AZURE_API_BASE") - }, - "tpm": 240000, - "rpm": 1800 - }, - { # list of model deployments - "model_name": "azure/gpt-3.5-turbo-context-fallback", # openai model name - "litellm_params": { # params for litellm completion/embedding call - "model": "azure/chatgpt-v-2", - "api_key": os.getenv("AZURE_API_KEY"), - "api_version": os.getenv("AZURE_API_VERSION"), - "api_base": os.getenv("AZURE_API_BASE") - }, - "tpm": 240000, - "rpm": 1800 - }, - { - "model_name": "azure/gpt-3.5-turbo", # openai model name - "litellm_params": { # params for litellm completion/embedding call - "model": "azure/chatgpt-functioncalling", - "api_key": "bad-key", - "api_version": os.getenv("AZURE_API_VERSION"), - "api_base": os.getenv("AZURE_API_BASE") - }, - "tpm": 240000, - "rpm": 1800 - }, - { - "model_name": "gpt-3.5-turbo", # openai model name - "litellm_params": { # params for litellm completion/embedding call - "model": "gpt-3.5-turbo", - "api_key": os.getenv("OPENAI_API_KEY"), - }, - "tpm": 1000000, - "rpm": 9000 - }, - { - "model_name": "gpt-3.5-turbo-16k", # openai model name - "litellm_params": { # params for litellm completion/embedding call - "model": "gpt-3.5-turbo-16k", - "api_key": os.getenv("OPENAI_API_KEY"), - }, - "tpm": 1000000, - "rpm": 9000 - } -] - - kwargs = {"model": "azure/gpt-3.5-turbo", "messages": [{"role": "user", "content":"Hey, how's it going?"}]} def test_sync_fallbacks(): try: - print("Test router_fallbacks: test_sync_fallbacks()") + model_list = [ + { # list of model deployments + "model_name": "azure/gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-v-2", + "api_key": "bad-key", + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { # list of model deployments + "model_name": "azure/gpt-3.5-turbo-context-fallback", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-v-2", + "api_key": os.getenv("AZURE_API_KEY"), + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { + "model_name": "azure/gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-functioncalling", + "api_key": "bad-key", + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { + "model_name": "gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "gpt-3.5-turbo", + "api_key": os.getenv("OPENAI_API_KEY"), + }, + "tpm": 1000000, + "rpm": 9000 + }, + { + "model_name": "gpt-3.5-turbo-16k", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "gpt-3.5-turbo-16k", + "api_key": os.getenv("OPENAI_API_KEY"), + }, + "tpm": 1000000, + "rpm": 9000 + } + ] + litellm.set_verbose = True customHandler = MyCustomHandler() litellm.callbacks = [customHandler] @@ -123,6 +125,60 @@ def test_sync_fallbacks(): @pytest.mark.asyncio async def test_async_fallbacks(): litellm.set_verbose = False + model_list = [ + { # list of model deployments + "model_name": "azure/gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-v-2", + "api_key": "bad-key", + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { # list of model deployments + "model_name": "azure/gpt-3.5-turbo-context-fallback", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-v-2", + "api_key": os.getenv("AZURE_API_KEY"), + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { + "model_name": "azure/gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-functioncalling", + "api_key": "bad-key", + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { + "model_name": "gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "gpt-3.5-turbo", + "api_key": os.getenv("OPENAI_API_KEY"), + }, + "tpm": 1000000, + "rpm": 9000 + }, + { + "model_name": "gpt-3.5-turbo-16k", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "gpt-3.5-turbo-16k", + "api_key": os.getenv("OPENAI_API_KEY"), + }, + "tpm": 1000000, + "rpm": 9000 + } + ] + router = Router(model_list=model_list, fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}], context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}], @@ -146,30 +202,6 @@ async def test_async_fallbacks(): # test_async_fallbacks() -## COMMENTING OUT as the context size exceeds both gpt-3.5-turbo and gpt-3.5-turbo-16k, need a better message here -# def test_sync_context_window_fallbacks(): -# try: -# customHandler = MyCustomHandler() -# litellm.callbacks = [customHandler] -# sample_text = "Say error 50 times" * 10000 -# kwargs["model"] = "azure/gpt-3.5-turbo-context-fallback" -# kwargs["messages"] = [{"role": "user", "content": sample_text}] -# router = Router(model_list=model_list, -# fallbacks=[{"azure/gpt-3.5-turbo": ["gpt-3.5-turbo"]}], -# context_window_fallbacks=[{"azure/gpt-3.5-turbo-context-fallback": ["gpt-3.5-turbo-16k"]}, {"gpt-3.5-turbo": ["gpt-3.5-turbo-16k"]}], -# set_verbose=False) -# response = router.completion(**kwargs) -# print(f"response: {response}") -# time.sleep(0.05) # allow a delay as success_callbacks are on a separate thread -# assert customHandler.previous_models == 1 # 0 retries, 1 fallback -# router.reset() -# except Exception as e: -# print(f"An exception occurred - {e}") -# finally: -# router.reset() - -# test_sync_context_window_fallbacks() - def test_dynamic_fallbacks_sync(): """ Allow setting the fallback in the router.completion() call. @@ -177,6 +209,60 @@ def test_dynamic_fallbacks_sync(): try: customHandler = MyCustomHandler() litellm.callbacks = [customHandler] + model_list = [ + { # list of model deployments + "model_name": "azure/gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-v-2", + "api_key": "bad-key", + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { # list of model deployments + "model_name": "azure/gpt-3.5-turbo-context-fallback", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-v-2", + "api_key": os.getenv("AZURE_API_KEY"), + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { + "model_name": "azure/gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-functioncalling", + "api_key": "bad-key", + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { + "model_name": "gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "gpt-3.5-turbo", + "api_key": os.getenv("OPENAI_API_KEY"), + }, + "tpm": 1000000, + "rpm": 9000 + }, + { + "model_name": "gpt-3.5-turbo-16k", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "gpt-3.5-turbo-16k", + "api_key": os.getenv("OPENAI_API_KEY"), + }, + "tpm": 1000000, + "rpm": 9000 + } + ] + router = Router(model_list=model_list, set_verbose=True) kwargs = {} kwargs["model"] = "azure/gpt-3.5-turbo" @@ -198,11 +284,65 @@ async def test_dynamic_fallbacks_async(): Allow setting the fallback in the router.completion() call. """ try: - print("Router - test_dynamic_fallbacks_async") - print("Callbacks in test_dynamic_fallbacks_async: ", litellm.callbacks) - print("Success callbacks in test_dynamic_fallbacks_async: ", litellm.success_callback) - print("Async Success callbacks in test_dynamic_fallbacks_async: ", litellm._async_success_callback) - litellm.set_verbose=True + model_list = [ + { # list of model deployments + "model_name": "azure/gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-v-2", + "api_key": "bad-key", + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { # list of model deployments + "model_name": "azure/gpt-3.5-turbo-context-fallback", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-v-2", + "api_key": os.getenv("AZURE_API_KEY"), + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { + "model_name": "azure/gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-functioncalling", + "api_key": "bad-key", + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE") + }, + "tpm": 240000, + "rpm": 1800 + }, + { + "model_name": "gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "gpt-3.5-turbo", + "api_key": os.getenv("OPENAI_API_KEY"), + }, + "tpm": 1000000, + "rpm": 9000 + }, + { + "model_name": "gpt-3.5-turbo-16k", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "gpt-3.5-turbo-16k", + "api_key": os.getenv("OPENAI_API_KEY"), + }, + "tpm": 1000000, + "rpm": 9000 + } + ] + + print() + print() + print() + print() + print(f"STARTING DYNAMIC ASYNC") customHandler = MyCustomHandler() litellm.callbacks = [customHandler] router = Router(model_list=model_list, set_verbose=True) @@ -217,4 +357,4 @@ async def test_dynamic_fallbacks_async(): router.reset() except Exception as e: pytest.fail(f"An exception occurred - {e}") -# test_dynamic_fallbacks_async() \ No newline at end of file +# asyncio.run(test_dynamic_fallbacks_async()) \ No newline at end of file