diff --git a/litellm/proxy/_super_secret_config.yaml b/litellm/proxy/_super_secret_config.yaml index ca108e631..f0a7ba827 100644 --- a/litellm/proxy/_super_secret_config.yaml +++ b/litellm/proxy/_super_secret_config.yaml @@ -36,7 +36,7 @@ model_list: api_base: https://my-endpoint-europe-berri-992.openai.azure.com/ api_key: os.environ/AZURE_EUROPE_API_KEY model: azure/gpt-35-turbo - model_name: gpt-3.5-turbo + model_name: gpt-3.5-turbo-fake-model - litellm_params: api_base: https://openai-gpt-4-test-v-1.openai.azure.com/ api_key: os.environ/AZURE_API_KEY diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index ee1cd7a64..083452089 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -4039,18 +4039,14 @@ async def chat_completion( if "api_key" in data: tasks.append(litellm.acompletion(**data)) elif "," in data["model"] and llm_router is not None: - _models_csv_string = data.pop("model") - _models = _models_csv_string.split(",") if ( data.get("fastest_response", None) is not None and data["fastest_response"] == True ): - tasks.append( - llm_router.abatch_completion_fastest_response( - models=_models, **data - ) - ) + tasks.append(llm_router.abatch_completion_fastest_response(**data)) else: + _models_csv_string = data.pop("model") + _models = [model.strip() for model in _models_csv_string.split(",")] tasks.append(llm_router.abatch_completion(models=_models, **data)) elif "user_config" in data: # initialize a new router instance. make request using this Router diff --git a/litellm/router.py b/litellm/router.py index b87d0dded..1ed6854cd 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -742,7 +742,7 @@ class Router: @overload async def abatch_completion_fastest_response( - self, models: List[str], messages: List[Dict[str, str]], stream: Literal[True], **kwargs + self, model: str, messages: List[Dict[str, str]], stream: Literal[True], **kwargs ) -> CustomStreamWrapper: ... @@ -750,7 +750,7 @@ class Router: @overload async def abatch_completion_fastest_response( - self, models: List[str], messages: List[Dict[str, str]], stream: Literal[False] = False, **kwargs + self, model: str, messages: List[Dict[str, str]], stream: Literal[False] = False, **kwargs ) -> ModelResponse: ... @@ -758,39 +758,56 @@ class Router: async def abatch_completion_fastest_response( self, - models: List[str], + model: str, messages: List[Dict[str, str]], stream: bool = False, **kwargs, ): - """Send 1 completion call to many models: Return Fastest Response.""" + """ + model - List of comma-separated model names. E.g. model="gpt-4, gpt-3.5-turbo" + + Returns fastest response from list of model names. OpenAI-compatible endpoint. + """ + models = [m.strip() for m in model.split(",")] async def _async_completion_no_exceptions( - model: str, messages: List[Dict[str, str]], **kwargs - ): + model: str, messages: List[Dict[str, str]], **kwargs: Any + ) -> Union[ModelResponse, CustomStreamWrapper, Exception]: """ - Wrapper around self.async_completion that catches exceptions and returns them as a result + Wrapper around self.acompletion that catches exceptions and returns them as a result """ try: return await self.acompletion(model=model, messages=messages, **kwargs) + except asyncio.CancelledError: + verbose_router_logger.debug( + "Received 'task.cancel'. Cancelling call w/ model={}.".format(model) + ) + raise except Exception as e: return e - _tasks = [] pending_tasks = [] # type: ignore - async def check_response(task): + async def check_response(task: asyncio.Task): nonlocal pending_tasks - result = await task - if isinstance(result, (ModelResponse, CustomStreamWrapper)): - # If a desired response is received, cancel all other pending tasks - for t in pending_tasks: - t.cancel() - return result - else: + try: + result = await task + if isinstance(result, (ModelResponse, CustomStreamWrapper)): + verbose_router_logger.debug( + "Received successful response. Cancelling other LLM API calls." + ) + # If a desired response is received, cancel all other pending tasks + for t in pending_tasks: + t.cancel() + return result + except Exception: + # Ignore exceptions, let the loop handle them + pass + finally: + # Remove the task from pending tasks if it finishes try: pending_tasks.remove(task) - except Exception as e: + except KeyError: pass for model in models: @@ -799,21 +816,22 @@ class Router: model=model, messages=messages, **kwargs ) ) - task.add_done_callback(check_response) - _tasks.append(task) pending_tasks.append(task) - responses = await asyncio.gather(*_tasks, return_exceptions=True) - if isinstance(responses[0], Exception) or isinstance( - responses[0], BaseException - ): - raise responses[0] - _response: Union[ModelResponse, CustomStreamWrapper] = responses[ - 0 - ] # return first value from list + # Await the first task to complete successfully + while pending_tasks: + done, pending_tasks = await asyncio.wait( # type: ignore + pending_tasks, return_when=asyncio.FIRST_COMPLETED + ) + for completed_task in done: + result = await check_response(completed_task) + if result is not None: + # Return the first successful result + result._hidden_params["fastest_response_batch_completion"] = True + return result - _response._hidden_params["fastest_response_batch_completion"] = True - return _response + # If we exit the loop without returning, all tasks failed + raise Exception("All tasks failed") def image_generation(self, prompt: str, model: str, **kwargs): try: @@ -3624,7 +3642,6 @@ class Router: ## get healthy deployments ### get all deployments healthy_deployments = [m for m in self.model_list if m["model_name"] == model] - if len(healthy_deployments) == 0: # check if the user sent in a deployment name instead healthy_deployments = [ diff --git a/litellm/tests/test_router_batch_completion.py b/litellm/tests/test_router_batch_completion.py index 219881dcb..c74892814 100644 --- a/litellm/tests/test_router_batch_completion.py +++ b/litellm/tests/test_router_batch_completion.py @@ -64,7 +64,7 @@ async def test_batch_completion_multiple_models(mode): from openai.types.chat.chat_completion import ChatCompletion response = await router.abatch_completion_fastest_response( - models=["gpt-3.5-turbo", "groq-llama"], + model="gpt-3.5-turbo, groq-llama", messages=[ {"role": "user", "content": "is litellm becoming a better product ?"} ], @@ -72,3 +72,45 @@ async def test_batch_completion_multiple_models(mode): ) ChatCompletion.model_validate(response.model_dump(), strict=True) + + +@pytest.mark.asyncio +async def test_batch_completion_fastest_response_unit_test(): + """ + Unit test to confirm fastest response will always return the response which arrives earliest. + + 2 models -> 1 is cached, the other is a real llm api call => assert cached response always returned + """ + litellm.set_verbose = True + + router = litellm.Router( + model_list=[ + { + "model_name": "gpt-4", + "litellm_params": { + "model": "gpt-4", + }, + "model_info": {"id": "1"}, + }, + { + "model_name": "gpt-3.5-turbo", + "litellm_params": { + "model": "gpt-3.5-turbo", + "mock_response": "This is a fake response", + }, + "model_info": {"id": "2"}, + }, + ] + ) + + response = await router.abatch_completion_fastest_response( + model="gpt-4, gpt-3.5-turbo", + messages=[ + {"role": "user", "content": "is litellm becoming a better product ?"} + ], + max_tokens=500, + ) + + assert response._hidden_params["model_id"] == "2" + assert response.choices[0].message.content == "This is a fake response" + print(f"response: {response}")