From 2531701a2a7267a0e954838b1e9d77b39f1e31b3 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Wed, 10 Apr 2024 16:57:01 -0700 Subject: [PATCH] fix(router.py): make get_cooldown_deployment logic async --- litellm/router.py | 143 +++++++++++++++--- litellm/router_strategy/lowest_tpm_rpm_v2.py | 144 +++++++++++++------ 2 files changed, 228 insertions(+), 59 deletions(-) diff --git a/litellm/router.py b/litellm/router.py index b3b090cf2..e343b71d7 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -13,7 +13,7 @@ from typing import Dict, List, Optional, Union, Literal, Any, BinaryIO import random, threading, time, traceback, uuid import litellm, openai, hashlib, json from litellm.caching import RedisCache, InMemoryCache, DualCache - +import datetime as datetime_og import logging, asyncio import inspect, concurrent from openai import AsyncOpenAI @@ -414,7 +414,7 @@ class Router: verbose_router_logger.debug( f"Inside _acompletion()- model: {model}; kwargs: {kwargs}" ) - deployment = self.get_available_deployment( + deployment = await self.async_get_available_deployment( model=model, messages=messages, specific_deployment=kwargs.pop("specific_deployment", None), @@ -1605,7 +1605,7 @@ class Router: if deployment is None: return - current_minute = datetime.now().strftime("%H-%M") + current_minute = datetime.now(datetime_og.UTC).strftime("%H-%M") # get current fails for deployment # update the number of failed calls # if it's > allowed fails @@ -1643,6 +1643,22 @@ class Router: key=deployment, value=updated_fails, ttl=cooldown_time ) + async def _async_get_cooldown_deployments(self): + """ + Async implementation of '_get_cooldown_deployments' + """ + current_minute = datetime.now(datetime_og.UTC).strftime("%H-%M") + # get the current cooldown list for that minute + cooldown_key = f"{current_minute}:cooldown_models" + + # ---------------------- + # Return cooldown models + # ---------------------- + cooldown_models = await self.cache.async_get_cache(key=cooldown_key) or [] + + verbose_router_logger.debug(f"retrieve cooldown models: {cooldown_models}") + return cooldown_models + def _get_cooldown_deployments(self): """ Get the list of models being cooled down for this minute @@ -2405,7 +2421,7 @@ class Router: return _returned_deployments - def get_available_deployment( + def _common_checks_available_deployment( self, model: str, messages: Optional[List[Dict[str, str]]] = None, @@ -2413,10 +2429,8 @@ class Router: specific_deployment: Optional[bool] = False, ): """ - Returns the deployment based on routing strategy + Common checks for 'get_available_deployment' across sync + async call. """ - # users need to explicitly call a specific deployment, by setting `specific_deployment = True` as completion()/embedding() kwarg - # When this was no explicit we had several issues with fallbacks timing out if specific_deployment == True: # users can also specify a specific deployment name. At this point we should check if they are just trying to call a specific deployment for deployment in self.model_list: @@ -2456,6 +2470,111 @@ class Router: f"initial list of deployments: {healthy_deployments}" ) + verbose_router_logger.debug( + f"healthy deployments: length {len(healthy_deployments)} {healthy_deployments}" + ) + if len(healthy_deployments) == 0: + raise ValueError(f"No healthy deployment available, passed model={model}") + if litellm.model_alias_map and model in litellm.model_alias_map: + model = litellm.model_alias_map[ + model + ] # update the model to the actual value if an alias has been passed in + + return model, healthy_deployments + + async def async_get_available_deployment( + self, + model: str, + messages: Optional[List[Dict[str, str]]] = None, + input: Optional[Union[str, List]] = None, + specific_deployment: Optional[bool] = False, + ): + """ + Async implementation of 'get_available_deployments'. + + Allows all cache calls to be made async => 10x perf impact (8rps -> 100 rps). + """ + if ( + self.routing_strategy != "usage-based-routing-v2" + ): # prevent regressions for other routing strategies, that don't have async get available deployments implemented. + return self.get_available_deployment( + model=model, + messages=messages, + input=input, + specific_deployment=specific_deployment, + ) + model, healthy_deployments = self._common_checks_available_deployment( + model=model, + messages=messages, + input=input, + specific_deployment=specific_deployment, + ) + + # filter out the deployments currently cooling down + deployments_to_remove = [] + # cooldown_deployments is a list of model_id's cooling down, cooldown_deployments = ["16700539-b3cd-42f4-b426-6a12a1bb706a", "16700539-b3cd-42f4-b426-7899"] + cooldown_deployments = await self._async_get_cooldown_deployments() + verbose_router_logger.debug( + f"async cooldown deployments: {cooldown_deployments}" + ) + # Find deployments in model_list whose model_id is cooling down + for deployment in healthy_deployments: + deployment_id = deployment["model_info"]["id"] + if deployment_id in cooldown_deployments: + deployments_to_remove.append(deployment) + # remove unhealthy deployments from healthy deployments + for deployment in deployments_to_remove: + healthy_deployments.remove(deployment) + + # filter pre-call checks + if self.enable_pre_call_checks and messages is not None: + healthy_deployments = self._pre_call_checks( + model=model, healthy_deployments=healthy_deployments, messages=messages + ) + + if ( + self.routing_strategy == "usage-based-routing-v2" + and self.lowesttpm_logger_v2 is not None + ): + deployment = await self.lowesttpm_logger_v2.async_get_available_deployments( + model_group=model, + healthy_deployments=healthy_deployments, + messages=messages, + input=input, + ) + + if deployment is None: + verbose_router_logger.info( + f"get_available_deployment for model: {model}, No deployment available" + ) + raise ValueError( + f"No deployments available for selected model, passed model={model}" + ) + verbose_router_logger.info( + f"get_available_deployment for model: {model}, Selected deployment: {self.print_deployment(deployment)} for model: {model}" + ) + return deployment + + def get_available_deployment( + self, + model: str, + messages: Optional[List[Dict[str, str]]] = None, + input: Optional[Union[str, List]] = None, + specific_deployment: Optional[bool] = False, + ): + """ + Returns the deployment based on routing strategy + """ + # users need to explicitly call a specific deployment, by setting `specific_deployment = True` as completion()/embedding() kwarg + # When this was no explicit we had several issues with fallbacks timing out + + model, healthy_deployments = self._common_checks_available_deployment( + model=model, + messages=messages, + input=input, + specific_deployment=specific_deployment, + ) + # filter out the deployments currently cooling down deployments_to_remove = [] # cooldown_deployments is a list of model_id's cooling down, cooldown_deployments = ["16700539-b3cd-42f4-b426-6a12a1bb706a", "16700539-b3cd-42f4-b426-7899"] @@ -2476,16 +2595,6 @@ class Router: model=model, healthy_deployments=healthy_deployments, messages=messages ) - verbose_router_logger.debug( - f"healthy deployments: length {len(healthy_deployments)} {healthy_deployments}" - ) - if len(healthy_deployments) == 0: - raise ValueError(f"No healthy deployment available, passed model={model}") - if litellm.model_alias_map and model in litellm.model_alias_map: - model = litellm.model_alias_map[ - model - ] # update the model to the actual value if an alias has been passed in - if self.routing_strategy == "least-busy" and self.leastbusy_logger is not None: deployment = self.leastbusy_logger.get_available_deployments( model_group=model, healthy_deployments=healthy_deployments diff --git a/litellm/router_strategy/lowest_tpm_rpm_v2.py b/litellm/router_strategy/lowest_tpm_rpm_v2.py index 1e1f79efa..8f9f57fd9 100644 --- a/litellm/router_strategy/lowest_tpm_rpm_v2.py +++ b/litellm/router_strategy/lowest_tpm_rpm_v2.py @@ -136,56 +136,20 @@ class LowestTPMLoggingHandler_v2(CustomLogger): traceback.print_exc() pass - async def async_get_available_deployments( + def _common_checks_available_deployment( self, model_group: str, healthy_deployments: list, + tpm_keys: list, + tpm_values: list, + rpm_keys: list, + rpm_values: list, messages: Optional[List[Dict[str, str]]] = None, input: Optional[Union[str, List]] = None, ): """ - Async implementation of get deployments. - - Reduces time to retrieve the tpm/rpm values from cache + Common checks for get available deployment, across sync + async implementations """ - pass - - def get_available_deployments( - self, - model_group: str, - healthy_deployments: list, - messages: Optional[List[Dict[str, str]]] = None, - input: Optional[Union[str, List]] = None, - ): - """ - Returns a deployment with the lowest TPM/RPM usage. - """ - # get list of potential deployments - verbose_router_logger.debug( - f"get_available_deployments - Usage Based. model_group: {model_group}, healthy_deployments: {healthy_deployments}" - ) - - current_minute = datetime.now(datetime_og.UTC).strftime("%H-%M") - tpm_keys = [] - rpm_keys = [] - for m in healthy_deployments: - if isinstance(m, dict): - id = m.get("model_info", {}).get( - "id" - ) # a deployment should always have an 'id'. this is set in router.py - tpm_key = "{}:tpm:{}".format(id, current_minute) - rpm_key = "{}:rpm:{}".format(id, current_minute) - - tpm_keys.append(tpm_key) - rpm_keys.append(rpm_key) - - tpm_values = self.router_cache.batch_get_cache( - keys=tpm_keys - ) # [1, 2, None, ..] - rpm_values = self.router_cache.batch_get_cache( - keys=rpm_keys - ) # [1, 2, None, ..] - tpm_dict = {} # {model_id: 1, ..} for idx, key in enumerate(tpm_keys): tpm_dict[tpm_keys[idx]] = tpm_values[idx] @@ -259,3 +223,99 @@ class LowestTPMLoggingHandler_v2(CustomLogger): deployment = _deployment print_verbose("returning picked lowest tpm/rpm deployment.") return deployment + + async def async_get_available_deployments( + self, + model_group: str, + healthy_deployments: list, + messages: Optional[List[Dict[str, str]]] = None, + input: Optional[Union[str, List]] = None, + ): + """ + Async implementation of get deployments. + + Reduces time to retrieve the tpm/rpm values from cache + """ + # get list of potential deployments + verbose_router_logger.debug( + f"get_available_deployments - Usage Based. model_group: {model_group}, healthy_deployments: {healthy_deployments}" + ) + + current_minute = datetime.now(datetime_og.UTC).strftime("%H-%M") + tpm_keys = [] + rpm_keys = [] + for m in healthy_deployments: + if isinstance(m, dict): + id = m.get("model_info", {}).get( + "id" + ) # a deployment should always have an 'id'. this is set in router.py + tpm_key = "{}:tpm:{}".format(id, current_minute) + rpm_key = "{}:rpm:{}".format(id, current_minute) + + tpm_keys.append(tpm_key) + rpm_keys.append(rpm_key) + + tpm_values = await self.router_cache.async_batch_get_cache( + keys=tpm_keys + ) # [1, 2, None, ..] + rpm_values = await self.router_cache.async_batch_get_cache( + keys=rpm_keys + ) # [1, 2, None, ..] + + return self._common_checks_available_deployment( + model_group=model_group, + healthy_deployments=healthy_deployments, + tpm_keys=tpm_keys, + tpm_values=tpm_values, + rpm_keys=rpm_keys, + rpm_values=rpm_values, + messages=messages, + input=input, + ) + + def get_available_deployments( + self, + model_group: str, + healthy_deployments: list, + messages: Optional[List[Dict[str, str]]] = None, + input: Optional[Union[str, List]] = None, + ): + """ + Returns a deployment with the lowest TPM/RPM usage. + """ + # get list of potential deployments + verbose_router_logger.debug( + f"get_available_deployments - Usage Based. model_group: {model_group}, healthy_deployments: {healthy_deployments}" + ) + + current_minute = datetime.now(datetime_og.UTC).strftime("%H-%M") + tpm_keys = [] + rpm_keys = [] + for m in healthy_deployments: + if isinstance(m, dict): + id = m.get("model_info", {}).get( + "id" + ) # a deployment should always have an 'id'. this is set in router.py + tpm_key = "{}:tpm:{}".format(id, current_minute) + rpm_key = "{}:rpm:{}".format(id, current_minute) + + tpm_keys.append(tpm_key) + rpm_keys.append(rpm_key) + + tpm_values = self.router_cache.batch_get_cache( + keys=tpm_keys + ) # [1, 2, None, ..] + rpm_values = self.router_cache.batch_get_cache( + keys=rpm_keys + ) # [1, 2, None, ..] + + return self._common_checks_available_deployment( + model_group=model_group, + healthy_deployments=healthy_deployments, + tpm_keys=tpm_keys, + tpm_values=tpm_values, + rpm_keys=rpm_keys, + rpm_values=rpm_values, + messages=messages, + input=input, + )