fix(router.py): support comma-separated model list for batch completion fastest response

This commit is contained in:
Krrish Dholakia 2024-05-28 21:34:37 -07:00
parent 20106715d5
commit 1ebae6e7b0
4 changed files with 94 additions and 39 deletions

View file

@ -36,7 +36,7 @@ model_list:
api_base: https://my-endpoint-europe-berri-992.openai.azure.com/ api_base: https://my-endpoint-europe-berri-992.openai.azure.com/
api_key: os.environ/AZURE_EUROPE_API_KEY api_key: os.environ/AZURE_EUROPE_API_KEY
model: azure/gpt-35-turbo model: azure/gpt-35-turbo
model_name: gpt-3.5-turbo model_name: gpt-3.5-turbo-fake-model
- litellm_params: - litellm_params:
api_base: https://openai-gpt-4-test-v-1.openai.azure.com/ api_base: https://openai-gpt-4-test-v-1.openai.azure.com/
api_key: os.environ/AZURE_API_KEY api_key: os.environ/AZURE_API_KEY

View file

@ -4039,18 +4039,14 @@ async def chat_completion(
if "api_key" in data: if "api_key" in data:
tasks.append(litellm.acompletion(**data)) tasks.append(litellm.acompletion(**data))
elif "," in data["model"] and llm_router is not None: elif "," in data["model"] and llm_router is not None:
_models_csv_string = data.pop("model")
_models = _models_csv_string.split(",")
if ( if (
data.get("fastest_response", None) is not None data.get("fastest_response", None) is not None
and data["fastest_response"] == True and data["fastest_response"] == True
): ):
tasks.append( tasks.append(llm_router.abatch_completion_fastest_response(**data))
llm_router.abatch_completion_fastest_response(
models=_models, **data
)
)
else: else:
_models_csv_string = data.pop("model")
_models = [model.strip() for model in _models_csv_string.split(",")]
tasks.append(llm_router.abatch_completion(models=_models, **data)) tasks.append(llm_router.abatch_completion(models=_models, **data))
elif "user_config" in data: elif "user_config" in data:
# initialize a new router instance. make request using this Router # initialize a new router instance. make request using this Router

View file

@ -742,7 +742,7 @@ class Router:
@overload @overload
async def abatch_completion_fastest_response( async def abatch_completion_fastest_response(
self, models: List[str], messages: List[Dict[str, str]], stream: Literal[True], **kwargs self, model: str, messages: List[Dict[str, str]], stream: Literal[True], **kwargs
) -> CustomStreamWrapper: ) -> CustomStreamWrapper:
... ...
@ -750,7 +750,7 @@ class Router:
@overload @overload
async def abatch_completion_fastest_response( async def abatch_completion_fastest_response(
self, models: List[str], messages: List[Dict[str, str]], stream: Literal[False] = False, **kwargs self, model: str, messages: List[Dict[str, str]], stream: Literal[False] = False, **kwargs
) -> ModelResponse: ) -> ModelResponse:
... ...
@ -758,39 +758,56 @@ class Router:
async def abatch_completion_fastest_response( async def abatch_completion_fastest_response(
self, self,
models: List[str], model: str,
messages: List[Dict[str, str]], messages: List[Dict[str, str]],
stream: bool = False, stream: bool = False,
**kwargs, **kwargs,
): ):
"""Send 1 completion call to many models: Return Fastest Response.""" """
model - List of comma-separated model names. E.g. model="gpt-4, gpt-3.5-turbo"
Returns fastest response from list of model names. OpenAI-compatible endpoint.
"""
models = [m.strip() for m in model.split(",")]
async def _async_completion_no_exceptions( async def _async_completion_no_exceptions(
model: str, messages: List[Dict[str, str]], **kwargs model: str, messages: List[Dict[str, str]], **kwargs: Any
): ) -> Union[ModelResponse, CustomStreamWrapper, Exception]:
""" """
Wrapper around self.async_completion that catches exceptions and returns them as a result Wrapper around self.acompletion that catches exceptions and returns them as a result
""" """
try: try:
return await self.acompletion(model=model, messages=messages, **kwargs) return await self.acompletion(model=model, messages=messages, **kwargs)
except asyncio.CancelledError:
verbose_router_logger.debug(
"Received 'task.cancel'. Cancelling call w/ model={}.".format(model)
)
raise
except Exception as e: except Exception as e:
return e return e
_tasks = []
pending_tasks = [] # type: ignore pending_tasks = [] # type: ignore
async def check_response(task): async def check_response(task: asyncio.Task):
nonlocal pending_tasks nonlocal pending_tasks
try:
result = await task result = await task
if isinstance(result, (ModelResponse, CustomStreamWrapper)): if isinstance(result, (ModelResponse, CustomStreamWrapper)):
verbose_router_logger.debug(
"Received successful response. Cancelling other LLM API calls."
)
# If a desired response is received, cancel all other pending tasks # If a desired response is received, cancel all other pending tasks
for t in pending_tasks: for t in pending_tasks:
t.cancel() t.cancel()
return result return result
else: except Exception:
# Ignore exceptions, let the loop handle them
pass
finally:
# Remove the task from pending tasks if it finishes
try: try:
pending_tasks.remove(task) pending_tasks.remove(task)
except Exception as e: except KeyError:
pass pass
for model in models: for model in models:
@ -799,21 +816,22 @@ class Router:
model=model, messages=messages, **kwargs model=model, messages=messages, **kwargs
) )
) )
task.add_done_callback(check_response)
_tasks.append(task)
pending_tasks.append(task) pending_tasks.append(task)
responses = await asyncio.gather(*_tasks, return_exceptions=True) # Await the first task to complete successfully
if isinstance(responses[0], Exception) or isinstance( while pending_tasks:
responses[0], BaseException done, pending_tasks = await asyncio.wait( # type: ignore
): pending_tasks, return_when=asyncio.FIRST_COMPLETED
raise responses[0] )
_response: Union[ModelResponse, CustomStreamWrapper] = responses[ for completed_task in done:
0 result = await check_response(completed_task)
] # return first value from list if result is not None:
# Return the first successful result
result._hidden_params["fastest_response_batch_completion"] = True
return result
_response._hidden_params["fastest_response_batch_completion"] = True # If we exit the loop without returning, all tasks failed
return _response raise Exception("All tasks failed")
def image_generation(self, prompt: str, model: str, **kwargs): def image_generation(self, prompt: str, model: str, **kwargs):
try: try:
@ -3624,7 +3642,6 @@ class Router:
## get healthy deployments ## get healthy deployments
### get all deployments ### get all deployments
healthy_deployments = [m for m in self.model_list if m["model_name"] == model] healthy_deployments = [m for m in self.model_list if m["model_name"] == model]
if len(healthy_deployments) == 0: if len(healthy_deployments) == 0:
# check if the user sent in a deployment name instead # check if the user sent in a deployment name instead
healthy_deployments = [ healthy_deployments = [

View file

@ -64,7 +64,7 @@ async def test_batch_completion_multiple_models(mode):
from openai.types.chat.chat_completion import ChatCompletion from openai.types.chat.chat_completion import ChatCompletion
response = await router.abatch_completion_fastest_response( response = await router.abatch_completion_fastest_response(
models=["gpt-3.5-turbo", "groq-llama"], model="gpt-3.5-turbo, groq-llama",
messages=[ messages=[
{"role": "user", "content": "is litellm becoming a better product ?"} {"role": "user", "content": "is litellm becoming a better product ?"}
], ],
@ -72,3 +72,45 @@ async def test_batch_completion_multiple_models(mode):
) )
ChatCompletion.model_validate(response.model_dump(), strict=True) ChatCompletion.model_validate(response.model_dump(), strict=True)
@pytest.mark.asyncio
async def test_batch_completion_fastest_response_unit_test():
"""
Unit test to confirm fastest response will always return the response which arrives earliest.
2 models -> 1 is cached, the other is a real llm api call => assert cached response always returned
"""
litellm.set_verbose = True
router = litellm.Router(
model_list=[
{
"model_name": "gpt-4",
"litellm_params": {
"model": "gpt-4",
},
"model_info": {"id": "1"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"mock_response": "This is a fake response",
},
"model_info": {"id": "2"},
},
]
)
response = await router.abatch_completion_fastest_response(
model="gpt-4, gpt-3.5-turbo",
messages=[
{"role": "user", "content": "is litellm becoming a better product ?"}
],
max_tokens=500,
)
assert response._hidden_params["model_id"] == "2"
assert response.choices[0].message.content == "This is a fake response"
print(f"response: {response}")