Merge branch 'main' into litellm_support_dynamic_rpm_limiting

This commit is contained in:
Krish Dholakia 2024-07-02 17:51:18 -07:00 committed by GitHub
commit 21d3a28e51
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 1067 additions and 133 deletions

View file

@ -156,6 +156,7 @@ class Router:
cooldown_time: Optional[
float
] = None, # (seconds) time to cooldown a deployment after failure
disable_cooldowns: Optional[bool] = None,
routing_strategy: Literal[
"simple-shuffle",
"least-busy",
@ -307,6 +308,7 @@ class Router:
self.allowed_fails = allowed_fails or litellm.allowed_fails
self.cooldown_time = cooldown_time or 60
self.disable_cooldowns = disable_cooldowns
self.failed_calls = (
InMemoryCache()
) # cache to track failed call per deployment, if num failed calls within 1 minute > allowed fails, then add it to cooldown
@ -2990,6 +2992,8 @@ class Router:
the exception is not one that should be immediately retried (e.g. 401)
"""
if self.disable_cooldowns is True:
return
if deployment is None:
return
@ -3030,24 +3034,50 @@ class Router:
exception_status = 500
_should_retry = litellm._should_retry(status_code=exception_status)
if updated_fails > allowed_fails or _should_retry == False:
if updated_fails > allowed_fails or _should_retry is False:
# get the current cooldown list for that minute
cooldown_key = f"{current_minute}:cooldown_models" # group cooldown models by minute to reduce number of redis calls
cached_value = self.cache.get_cache(key=cooldown_key)
cached_value = self.cache.get_cache(
key=cooldown_key
) # [(deployment_id, {last_error_str, last_error_status_code})]
cached_value_deployment_ids = []
if (
cached_value is not None
and isinstance(cached_value, list)
and len(cached_value) > 0
and isinstance(cached_value[0], tuple)
):
cached_value_deployment_ids = [cv[0] for cv in cached_value]
verbose_router_logger.debug(f"adding {deployment} to cooldown models")
# update value
try:
if deployment in cached_value:
if cached_value is not None and len(cached_value_deployment_ids) > 0:
if deployment in cached_value_deployment_ids:
pass
else:
cached_value = cached_value + [deployment]
cached_value = cached_value + [
(
deployment,
{
"Exception Received": str(original_exception),
"Status Code": str(exception_status),
},
)
]
# save updated value
self.cache.set_cache(
value=cached_value, key=cooldown_key, ttl=cooldown_time
)
except:
cached_value = [deployment]
else:
cached_value = [
(
deployment,
{
"Exception Received": str(original_exception),
"Status Code": str(exception_status),
},
)
]
# save updated value
self.cache.set_cache(
value=cached_value, key=cooldown_key, ttl=cooldown_time
@ -3063,7 +3093,33 @@ class Router:
key=deployment, value=updated_fails, ttl=cooldown_time
)
async def _async_get_cooldown_deployments(self):
async def _async_get_cooldown_deployments(self) -> List[str]:
"""
Async implementation of '_get_cooldown_deployments'
"""
dt = get_utc_datetime()
current_minute = dt.strftime("%H-%M")
# get the current cooldown list for that minute
cooldown_key = f"{current_minute}:cooldown_models"
# ----------------------
# Return cooldown models
# ----------------------
cooldown_models = await self.cache.async_get_cache(key=cooldown_key) or []
cached_value_deployment_ids = []
if (
cooldown_models is not None
and isinstance(cooldown_models, list)
and len(cooldown_models) > 0
and isinstance(cooldown_models[0], tuple)
):
cached_value_deployment_ids = [cv[0] for cv in cooldown_models]
verbose_router_logger.debug(f"retrieve cooldown models: {cooldown_models}")
return cached_value_deployment_ids
async def _async_get_cooldown_deployments_with_debug_info(self) -> List[tuple]:
"""
Async implementation of '_get_cooldown_deployments'
"""
@ -3080,7 +3136,7 @@ class Router:
verbose_router_logger.debug(f"retrieve cooldown models: {cooldown_models}")
return cooldown_models
def _get_cooldown_deployments(self):
def _get_cooldown_deployments(self) -> List[str]:
"""
Get the list of models being cooled down for this minute
"""
@ -3094,8 +3150,17 @@ class Router:
# ----------------------
cooldown_models = self.cache.get_cache(key=cooldown_key) or []
cached_value_deployment_ids = []
if (
cooldown_models is not None
and isinstance(cooldown_models, list)
and len(cooldown_models) > 0
and isinstance(cooldown_models[0], tuple)
):
cached_value_deployment_ids = [cv[0] for cv in cooldown_models]
verbose_router_logger.debug(f"retrieve cooldown models: {cooldown_models}")
return cooldown_models
return cached_value_deployment_ids
def _get_healthy_deployments(self, model: str):
_all_deployments: list = []
@ -4737,7 +4802,7 @@ class Router:
if _allowed_model_region is None:
_allowed_model_region = "n/a"
raise ValueError(
f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}. pre-call-checks={self.enable_pre_call_checks}, allowed_model_region={_allowed_model_region}"
f"{RouterErrors.no_deployments_available.value}, Try again in {self.cooldown_time} seconds. Passed model={model}. pre-call-checks={self.enable_pre_call_checks}, allowed_model_region={_allowed_model_region}, cooldown_list={await self._async_get_cooldown_deployments_with_debug_info()}"
)
if (