mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-27 03:34:10 +00:00
Litellm dev 02 18 2025 p2 (#8639)
* fix(parallel_request_limiter.py): improve single instance rate limiting by updating in-memory cache instantly Fixes issue where parallel request limiter had a leak * fix(parallel_request_limiter.py): fix parallel request limiter to not decrement val on max limit being reached * test(test_parallel_request_limiter.py): fix test * test: fix test * fix(parallel_request_limiter.py): move to using common enum * test: fix test
This commit is contained in:
parent
c088442658
commit
bf6c013de0
3 changed files with 25 additions and 31 deletions
|
@ -1972,6 +1972,9 @@ class CommonProxyErrors(str, enum.Enum):
|
|||
no_llm_router = "No models configured on proxy"
|
||||
not_allowed_access = "Admin-only endpoint. Not allowed to access this."
|
||||
not_premium_user = "You must be a LiteLLM Enterprise user to use this feature. If you have a license please set `LITELLM_LICENSE` in your env. Get a 7 day trial key here: https://www.litellm.ai/#trial. \nPricing: https://www.litellm.ai/#pricing"
|
||||
max_parallel_request_limit_reached = (
|
||||
"Crossed TPM / RPM / Max Parallel Request Limit"
|
||||
)
|
||||
|
||||
|
||||
class SpendCalculateRequest(LiteLLMPydanticObjectBase):
|
||||
|
|
|
@ -11,7 +11,7 @@ from litellm import DualCache, ModelResponse
|
|||
from litellm._logging import verbose_proxy_logger
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.litellm_core_utils.core_helpers import _get_parent_otel_span_from_kwargs
|
||||
from litellm.proxy._types import CurrentItemRateLimit, UserAPIKeyAuth
|
||||
from litellm.proxy._types import CommonProxyErrors, CurrentItemRateLimit, UserAPIKeyAuth
|
||||
from litellm.proxy.auth.auth_utils import (
|
||||
get_key_model_rpm_limit,
|
||||
get_key_model_tpm_limit,
|
||||
|
@ -65,11 +65,14 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
rate_limit_type: Literal["key", "model_per_key", "user", "customer", "team"],
|
||||
values_to_update_in_cache: List[Tuple[Any, Any]],
|
||||
) -> dict:
|
||||
verbose_proxy_logger.info(
|
||||
f"Current Usage of {rate_limit_type} in this minute: {current}"
|
||||
)
|
||||
if current is None:
|
||||
if max_parallel_requests == 0 or tpm_limit == 0 or rpm_limit == 0:
|
||||
# base case
|
||||
raise self.raise_rate_limit_error(
|
||||
additional_details=f"Hit limit for {rate_limit_type}. Current limits: max_parallel_requests: {max_parallel_requests}, tpm_limit: {tpm_limit}, rpm_limit: {rpm_limit}"
|
||||
additional_details=f"{CommonProxyErrors.max_parallel_request_limit_reached.value}. Hit limit for {rate_limit_type}. Current limits: max_parallel_requests: {max_parallel_requests}, tpm_limit: {tpm_limit}, rpm_limit: {rpm_limit}"
|
||||
)
|
||||
new_val = {
|
||||
"current_requests": 1,
|
||||
|
@ -93,9 +96,16 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
else:
|
||||
raise HTTPException(
|
||||
status_code=429,
|
||||
detail=f"LiteLLM Rate Limit Handler for rate limit type = {rate_limit_type}. Crossed TPM / RPM / Max Parallel Request Limit. current rpm: {current['current_rpm']}, rpm limit: {rpm_limit}, current tpm: {current['current_tpm']}, tpm limit: {tpm_limit}, current max_parallel_requests: {current['current_requests']}, max_parallel_requests: {max_parallel_requests}",
|
||||
detail=f"LiteLLM Rate Limit Handler for rate limit type = {rate_limit_type}. {CommonProxyErrors.max_parallel_request_limit_reached.value}. current rpm: {current['current_rpm']}, rpm limit: {rpm_limit}, current tpm: {current['current_tpm']}, tpm limit: {tpm_limit}, current max_parallel_requests: {current['current_requests']}, max_parallel_requests: {max_parallel_requests}",
|
||||
headers={"retry-after": str(self.time_to_next_minute())},
|
||||
)
|
||||
|
||||
await self.internal_usage_cache.async_batch_set_cache(
|
||||
cache_list=values_to_update_in_cache,
|
||||
ttl=60,
|
||||
litellm_parent_otel_span=user_api_key_dict.parent_otel_span,
|
||||
local_only=True,
|
||||
)
|
||||
return new_val
|
||||
|
||||
def time_to_next_minute(self) -> float:
|
||||
|
@ -680,8 +690,15 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
if user_api_key is None:
|
||||
return
|
||||
|
||||
verbose_proxy_logger.info("ENTERS FAILURE LOG EVENT")
|
||||
|
||||
## decrement call count if call failed
|
||||
if "Max parallel request limit reached" in str(kwargs["exception"]):
|
||||
if CommonProxyErrors.max_parallel_request_limit_reached.value in str(
|
||||
kwargs["exception"]
|
||||
):
|
||||
verbose_proxy_logger.info(
|
||||
"IGNORE FAILED CALLS DUE TO MAX LIMIT BEING REACHED"
|
||||
)
|
||||
pass # ignore failed calls due to max limit being reached
|
||||
else:
|
||||
# ------------
|
||||
|
|
|
@ -146,7 +146,7 @@ async def test_pre_call_hook_rpm_limits():
|
|||
_api_key = "sk-12345"
|
||||
_api_key = hash_token(_api_key)
|
||||
user_api_key_dict = UserAPIKeyAuth(
|
||||
api_key=_api_key, max_parallel_requests=1, tpm_limit=9, rpm_limit=1
|
||||
api_key=_api_key, max_parallel_requests=10, tpm_limit=9, rpm_limit=1
|
||||
)
|
||||
local_cache = DualCache()
|
||||
parallel_request_handler = MaxParallelRequestsHandler(
|
||||
|
@ -157,16 +157,6 @@ async def test_pre_call_hook_rpm_limits():
|
|||
user_api_key_dict=user_api_key_dict, cache=local_cache, data={}, call_type=""
|
||||
)
|
||||
|
||||
kwargs = {"litellm_params": {"metadata": {"user_api_key": _api_key}}}
|
||||
|
||||
## Expected cache val: {"current_requests": 0, "current_tpm": 0, "current_rpm": 1}
|
||||
await parallel_request_handler.async_pre_call_hook(
|
||||
user_api_key_dict=user_api_key_dict,
|
||||
cache=local_cache,
|
||||
data={},
|
||||
call_type="",
|
||||
)
|
||||
|
||||
await asyncio.sleep(2)
|
||||
|
||||
try:
|
||||
|
@ -202,15 +192,6 @@ async def test_pre_call_hook_rpm_limits_retry_after():
|
|||
user_api_key_dict=user_api_key_dict, cache=local_cache, data={}, call_type=""
|
||||
)
|
||||
|
||||
kwargs = {"litellm_params": {"metadata": {"user_api_key": _api_key}}}
|
||||
|
||||
await parallel_request_handler.async_pre_call_hook(
|
||||
user_api_key_dict=user_api_key_dict,
|
||||
cache=local_cache,
|
||||
data={},
|
||||
call_type="",
|
||||
)
|
||||
|
||||
await asyncio.sleep(2)
|
||||
|
||||
## Expected cache val: {"current_requests": 0, "current_tpm": 0, "current_rpm": 1}
|
||||
|
@ -261,13 +242,6 @@ async def test_pre_call_hook_team_rpm_limits():
|
|||
}
|
||||
}
|
||||
|
||||
await parallel_request_handler.async_pre_call_hook(
|
||||
user_api_key_dict=user_api_key_dict,
|
||||
cache=local_cache,
|
||||
data={},
|
||||
call_type="",
|
||||
)
|
||||
|
||||
await asyncio.sleep(2)
|
||||
|
||||
## Expected cache val: {"current_requests": 0, "current_tpm": 0, "current_rpm": 1}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue