From 0bc08063e17ef0c984513f8cd641c79f2aaf3568 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Mon, 1 Jul 2024 23:08:54 -0700 Subject: [PATCH] fix(dynamic_rate_limiter.py): support setting priority + reserving tpm/rpm --- litellm/__init__.py | 2 + litellm/proxy/hooks/dynamic_rate_limiter.py | 46 ++++++++++++++--- .../tests/test_dynamic_rate_limit_handler.py | 50 +++++++++++++++++++ 3 files changed, 91 insertions(+), 7 deletions(-) diff --git a/litellm/__init__.py b/litellm/__init__.py index 0fa822a98e..cc31ea9990 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -237,6 +237,8 @@ default_user_params: Optional[Dict] = None default_team_settings: Optional[List] = None max_user_budget: Optional[float] = None max_end_user_budget: Optional[float] = None +#### REQUEST PRIORITIZATION #### +priority_reservation: Optional[Dict[str, float]] = None #### RELIABILITY #### request_timeout: float = 6000 module_level_aclient = AsyncHTTPHandler(timeout=request_timeout) diff --git a/litellm/proxy/hooks/dynamic_rate_limiter.py b/litellm/proxy/hooks/dynamic_rate_limiter.py index bb6bef0cd7..8d0effc896 100644 --- a/litellm/proxy/hooks/dynamic_rate_limiter.py +++ b/litellm/proxy/hooks/dynamic_rate_limiter.py @@ -3,6 +3,7 @@ ## Tracks num active projects per minute import asyncio +import os import sys import traceback from datetime import datetime @@ -82,13 +83,17 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): self.llm_router = llm_router async def check_available_usage( - self, model: str + self, model: str, priority: Optional[str] = None ) -> Tuple[ Optional[int], Optional[int], Optional[int], Optional[int], Optional[int] ]: """ For a given model, get its available tpm + Params: + - model: str, the name of the model in the router model_list + - priority: Optional[str], the priority for the request. + Returns - Tuple[available_tpm, available_tpm, model_tpm, model_rpm, active_projects] - available_tpm: int or null - always 0 or positive. @@ -97,6 +102,23 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): - remaining_model_rpm: int or null. If available rpm is int, then this will be too. - active_projects: int or null """ + weight: float = 1 + if ( + litellm.priority_reservation is None + or priority not in litellm.priority_reservation + ): + verbose_proxy_logger.error( + "Priority Reservation not set. priority={}, but litellm.priority_reservation is {}.".format( + priority, litellm.priority_reservation + ) + ) + elif priority is not None and litellm.priority_reservation is not None: + if os.getenv("LITELLM_LICENSE", None) is None: + verbose_proxy_logger.error( + "PREMIUM FEATURE: Reserving tpm/rpm by priority is a premium feature. Please add a 'LITELLM_LICENSE' to your .env to enable this.\nGet a license: https://docs.litellm.ai/docs/proxy/enterprise." + ) + else: + weight = litellm.priority_reservation[priority] active_projects = await self.internal_usage_cache.async_get_cache(model=model) current_model_tpm, current_model_rpm = ( await self.llm_router.get_model_group_usage(model_group=model) @@ -128,9 +150,9 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): if remaining_model_tpm is not None: if active_projects is not None: - available_tpm = int(remaining_model_tpm / active_projects) + available_tpm = int(remaining_model_tpm * weight / active_projects) else: - available_tpm = remaining_model_tpm + available_tpm = int(remaining_model_tpm * weight) if available_tpm is not None and available_tpm < 0: available_tpm = 0 @@ -139,9 +161,9 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): if remaining_model_rpm is not None: if active_projects is not None: - available_rpm = int(remaining_model_rpm / active_projects) + available_rpm = int(remaining_model_rpm * weight / active_projects) else: - available_rpm = remaining_model_rpm + available_rpm = int(remaining_model_rpm * weight) if available_rpm is not None and available_rpm < 0: available_rpm = 0 @@ -175,8 +197,13 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): - Raise RateLimitError if no tpm/rpm available """ if "model" in data: + key_priority: Optional[str] = user_api_key_dict.metadata.get( + "priority", None + ) available_tpm, available_rpm, model_tpm, model_rpm, active_projects = ( - await self.check_available_usage(model=data["model"]) + await self.check_available_usage( + model=data["model"], priority=key_priority + ) ) ### CHECK TPM ### if available_tpm is not None and available_tpm == 0: @@ -227,8 +254,13 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): ), "Model info for model with id={} is None".format( response._hidden_params["model_id"] ) + key_priority: Optional[str] = user_api_key_dict.metadata.get( + "priority", None + ) available_tpm, available_rpm, model_tpm, model_rpm, active_projects = ( - await self.check_available_usage(model=model_info["model_name"]) + await self.check_available_usage( + model=model_info["model_name"], priority=key_priority + ) ) response._hidden_params["additional_headers"] = ( { # Add additional response headers - easier debugging diff --git a/litellm/tests/test_dynamic_rate_limit_handler.py b/litellm/tests/test_dynamic_rate_limit_handler.py index 129a71de0a..bc50fc16f5 100644 --- a/litellm/tests/test_dynamic_rate_limit_handler.py +++ b/litellm/tests/test_dynamic_rate_limit_handler.py @@ -438,6 +438,56 @@ async def test_multiple_projects( assert availability == 0 +@pytest.mark.parametrize("num_projects", [1, 2, 100]) +@pytest.mark.asyncio +async def test_priority_reservation(num_projects, dynamic_rate_limit_handler): + """ + If reservation is set + `mock_testing_reservation` passed in + + assert correct rpm is reserved + """ + model = "my-fake-model" + ## SET CACHE W/ ACTIVE PROJECTS + projects = [str(uuid.uuid4()) for _ in range(num_projects)] + + await dynamic_rate_limit_handler.internal_usage_cache.async_set_cache_sadd( + model=model, value=projects + ) + + litellm.priority_reservation = {"dev": 0.1, "prod": 0.9} + + model_usage = 100 + + llm_router = Router( + model_list=[ + { + "model_name": model, + "litellm_params": { + "model": "gpt-3.5-turbo", + "api_key": "my-key", + "api_base": "my-base", + "rpm": model_usage, + }, + } + ] + ) + dynamic_rate_limit_handler.update_variables(llm_router=llm_router) + + ## CHECK AVAILABLE TPM PER PROJECT + + resp = await dynamic_rate_limit_handler.check_available_usage( + model=model, priority="prod" + ) + + availability = resp[1] + + expected_availability = int( + model_usage * litellm.priority_reservation["prod"] / num_projects + ) + + assert availability == expected_availability + + @pytest.mark.skip( reason="Unstable on ci/cd due to curr minute changes. Refactor to handle minute changing" )