From f23b17091d50155e5c458c9e2923df28caf0c19b Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Mon, 1 Jul 2024 17:45:10 -0700 Subject: [PATCH 1/5] fix(dynamic_rate_limiter.py): support dynamic rate limiting on rpm --- litellm/proxy/hooks/dynamic_rate_limiter.py | 91 ++++++++++++++++----- litellm/router.py | 38 +++++++-- 2 files changed, 101 insertions(+), 28 deletions(-) diff --git a/litellm/proxy/hooks/dynamic_rate_limiter.py b/litellm/proxy/hooks/dynamic_rate_limiter.py index 95f0ccc13..fecaf3186 100644 --- a/litellm/proxy/hooks/dynamic_rate_limiter.py +++ b/litellm/proxy/hooks/dynamic_rate_limiter.py @@ -81,28 +81,36 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): def update_variables(self, llm_router: Router): self.llm_router = llm_router - async def check_available_tpm( + async def check_available_usage( self, model: str - ) -> Tuple[Optional[int], Optional[int], Optional[int]]: + ) -> Tuple[ + Optional[int], Optional[int], Optional[int], Optional[int], Optional[int] + ]: """ For a given model, get its available tpm Returns - - Tuple[available_tpm, model_tpm, active_projects] + - Tuple[available_tpm, available_tpm, model_tpm, model_rpm, active_projects] + - available_tpm: int or null - always 0 or positive. - available_tpm: int or null - always 0 or positive. - remaining_model_tpm: int or null. If available tpm is int, then this will be too. + - remaining_model_rpm: int or null. If available rpm is int, then this will be too. - active_projects: int or null """ active_projects = await self.internal_usage_cache.async_get_cache(model=model) - current_model_tpm: Optional[int] = await self.llm_router.get_model_group_usage( - model_group=model + current_model_tpm, current_model_rpm = ( + await self.llm_router.get_model_group_usage(model_group=model) ) model_group_info: Optional[ModelGroupInfo] = ( self.llm_router.get_model_group_info(model_group=model) ) total_model_tpm: Optional[int] = None - if model_group_info is not None and model_group_info.tpm is not None: - total_model_tpm = model_group_info.tpm + total_model_rpm: Optional[int] = None + if model_group_info is not None: + if model_group_info.tpm is not None: + total_model_tpm = model_group_info.tpm + if model_group_info.rpm is not None: + total_model_rpm = model_group_info.rpm remaining_model_tpm: Optional[int] = None if total_model_tpm is not None and current_model_tpm is not None: @@ -110,6 +118,12 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): elif total_model_tpm is not None: remaining_model_tpm = total_model_tpm + remaining_model_rpm: Optional[int] = None + if total_model_rpm is not None and current_model_rpm is not None: + remaining_model_rpm = total_model_rpm - current_model_rpm + elif total_model_rpm is not None: + remaining_model_rpm = total_model_rpm + available_tpm: Optional[int] = None if remaining_model_tpm is not None: @@ -120,7 +134,24 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): if available_tpm is not None and available_tpm < 0: available_tpm = 0 - return available_tpm, remaining_model_tpm, active_projects + + available_rpm: Optional[int] = None + + if remaining_model_rpm is not None: + if active_projects is not None: + available_rpm = int(remaining_model_rpm / active_projects) + else: + available_rpm = remaining_model_rpm + + if available_rpm is not None and available_rpm < 0: + available_rpm = 0 + return ( + available_tpm, + available_rpm, + remaining_model_tpm, + remaining_model_rpm, + active_projects, + ) async def async_pre_call_hook( self, @@ -140,13 +171,14 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): ]: # raise exception if invalid, return a str for the user to receive - if rejected, or return a modified dictionary for passing into litellm """ - For a model group - - Check if tpm available - - Raise RateLimitError if no tpm available + - Check if tpm/rpm available + - Raise RateLimitError if no tpm/rpm available """ if "model" in data: - available_tpm, model_tpm, active_projects = await self.check_available_tpm( - model=data["model"] + available_tpm, available_rpm, model_tpm, model_rpm, active_projects = ( + await self.check_available_usage(model=data["model"]) ) + ### CHECK TPM ### if available_tpm is not None and available_tpm == 0: raise HTTPException( status_code=429, @@ -159,7 +191,20 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): ) }, ) - elif available_tpm is not None: + ### CHECK RPM ### + elif available_rpm is not None and available_rpm == 0: + raise HTTPException( + status_code=429, + detail={ + "error": "Key={} over available RPM={}. Model RPM={}, Active keys={}".format( + user_api_key_dict.api_key, + available_rpm, + model_rpm, + active_projects, + ) + }, + ) + elif available_rpm is not None or available_tpm is not None: ## UPDATE CACHE WITH ACTIVE PROJECT asyncio.create_task( self.internal_usage_cache.async_set_cache_sadd( # this is a set @@ -182,15 +227,19 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): ), "Model info for model with id={} is None".format( response._hidden_params["model_id"] ) - available_tpm, remaining_model_tpm, active_projects = ( - await self.check_available_tpm(model=model_info["model_name"]) + available_tpm, available_rpm, model_tpm, model_rpm, active_projects = ( + await self.check_available_usage(model=model_info["model_name"]) + ) + response._hidden_params["additional_headers"] = ( + { # Add additional response headers - easier debugging + "x-litellm-model_group": model_info["model_name"], + "x-ratelimit-remaining-litellm-project-tokens": available_tpm, + "x-ratelimit-remaining-litellm-project-requests": available_rpm, + "x-ratelimit-remaining-model-tokens": model_tpm, + "x-ratelimit-remaining-model-requests": model_tpm, + "x-ratelimit-current-active-projects": active_projects, + } ) - response._hidden_params["additional_headers"] = { - "x-litellm-model_group": model_info["model_name"], - "x-ratelimit-remaining-litellm-project-tokens": available_tpm, - "x-ratelimit-remaining-model-tokens": remaining_model_tpm, - "x-ratelimit-current-active-projects": active_projects, - } return response return await super().async_post_call_success_hook( diff --git a/litellm/router.py b/litellm/router.py index ba3f13b8e..39cc92ab1 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -4191,25 +4191,42 @@ class Router: return model_group_info - async def get_model_group_usage(self, model_group: str) -> Optional[int]: + async def get_model_group_usage( + self, model_group: str + ) -> Tuple[Optional[int], Optional[int]]: """ - Returns remaining tpm quota for model group + Returns remaining tpm/rpm quota for model group + + Returns: + - usage: Tuple[tpm, rpm] """ dt = get_utc_datetime() current_minute = dt.strftime( "%H-%M" ) # use the same timezone regardless of system clock tpm_keys: List[str] = [] + rpm_keys: List[str] = [] for model in self.model_list: if "model_name" in model and model["model_name"] == model_group: tpm_keys.append( f"global_router:{model['model_info']['id']}:tpm:{current_minute}" ) + rpm_keys.append( + f"global_router:{model['model_info']['id']}:rpm:{current_minute}" + ) + combined_tpm_rpm_keys = tpm_keys + rpm_keys + + combined_tpm_rpm_values = await self.cache.async_batch_get_cache( + keys=combined_tpm_rpm_keys + ) + + if combined_tpm_rpm_values is None: + return None, None + + tpm_usage_list: Optional[List] = combined_tpm_rpm_values[: len(tpm_keys)] + rpm_usage_list: Optional[List] = combined_tpm_rpm_values[len(tpm_keys) :] ## TPM - tpm_usage_list: Optional[List] = await self.cache.async_batch_get_cache( - keys=tpm_keys - ) tpm_usage: Optional[int] = None if tpm_usage_list is not None: for t in tpm_usage_list: @@ -4217,8 +4234,15 @@ class Router: if tpm_usage is None: tpm_usage = 0 tpm_usage += t - - return tpm_usage + ## RPM + rpm_usage: Optional[int] = None + if rpm_usage_list is not None: + for t in rpm_usage_list: + if isinstance(t, int): + if rpm_usage is None: + rpm_usage = 0 + rpm_usage += t + return tpm_usage, rpm_usage def get_model_ids(self) -> List[str]: """ From 0781014706023491eddea94d6844ef753495b301 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Mon, 1 Jul 2024 20:16:10 -0700 Subject: [PATCH 2/5] test(test_dynamic_rate_limit_handler.py): refactor tests for rpm suppprt --- litellm/proxy/hooks/dynamic_rate_limiter.py | 2 +- .../tests/test_dynamic_rate_limit_handler.py | 55 ++++++++++--------- 2 files changed, 29 insertions(+), 28 deletions(-) diff --git a/litellm/proxy/hooks/dynamic_rate_limiter.py b/litellm/proxy/hooks/dynamic_rate_limiter.py index fecaf3186..bb6bef0cd 100644 --- a/litellm/proxy/hooks/dynamic_rate_limiter.py +++ b/litellm/proxy/hooks/dynamic_rate_limiter.py @@ -236,7 +236,7 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): "x-ratelimit-remaining-litellm-project-tokens": available_tpm, "x-ratelimit-remaining-litellm-project-requests": available_rpm, "x-ratelimit-remaining-model-tokens": model_tpm, - "x-ratelimit-remaining-model-requests": model_tpm, + "x-ratelimit-remaining-model-requests": model_rpm, "x-ratelimit-current-active-projects": active_projects, } ) diff --git a/litellm/tests/test_dynamic_rate_limit_handler.py b/litellm/tests/test_dynamic_rate_limit_handler.py index 4f49abff8..d4386c26d 100644 --- a/litellm/tests/test_dynamic_rate_limit_handler.py +++ b/litellm/tests/test_dynamic_rate_limit_handler.py @@ -109,9 +109,9 @@ async def test_available_tpm(num_projects, dynamic_rate_limit_handler): ## CHECK AVAILABLE TPM PER PROJECT - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[0] expected_availability = int(model_tpm / num_projects) @@ -151,9 +151,9 @@ async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth): ## CHECK AVAILABLE TPM PER PROJECT - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[0] expected_availability = int(model_tpm / 1) @@ -217,9 +217,9 @@ async def test_base_case(dynamic_rate_limit_handler, mock_response): for _ in range(2): try: # check availability - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[0] print( "prev_availability={}, availability={}".format( @@ -273,9 +273,9 @@ async def test_update_cache( dynamic_rate_limit_handler.update_variables(llm_router=llm_router) ## INITIAL ACTIVE PROJECTS - ASSERT NONE - _, _, active_projects = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + active_projects = resp[-1] assert active_projects is None @@ -289,9 +289,9 @@ async def test_update_cache( await asyncio.sleep(2) ## INITIAL ACTIVE PROJECTS - ASSERT 1 - _, _, active_projects = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + active_projects = resp[-1] assert active_projects == 1 @@ -357,9 +357,9 @@ async def test_multiple_projects( for i in range(expected_runs + 1): # check availability - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[0] ## assert availability updated if prev_availability is not None and availability is not None: @@ -389,9 +389,10 @@ async def test_multiple_projects( await asyncio.sleep(3) # check availability - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[0] + assert availability == 0 @@ -456,9 +457,9 @@ async def test_multiple_projects_e2e( print("expected_runs: {}".format(expected_runs)) for i in range(expected_runs + 1): # check availability - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[0] ## assert availability updated if prev_availability is not None and availability is not None: @@ -488,7 +489,7 @@ async def test_multiple_projects_e2e( await asyncio.sleep(3) # check availability - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[0] assert availability == 0 From 460c33f70f22e14ff461f6190caba29d2b9b0d13 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Mon, 1 Jul 2024 20:20:24 -0700 Subject: [PATCH 3/5] test(test_dynamic_rate_limit_handler.py): add unit tests for dynamic rpm limits --- .../tests/test_dynamic_rate_limit_handler.py | 52 +++++++++++++++++-- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/litellm/tests/test_dynamic_rate_limit_handler.py b/litellm/tests/test_dynamic_rate_limit_handler.py index d4386c26d..129a71de0 100644 --- a/litellm/tests/test_dynamic_rate_limit_handler.py +++ b/litellm/tests/test_dynamic_rate_limit_handler.py @@ -118,8 +118,47 @@ async def test_available_tpm(num_projects, dynamic_rate_limit_handler): assert availability == expected_availability +@pytest.mark.parametrize("num_projects", [1, 2, 100]) @pytest.mark.asyncio -async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth): +async def test_available_rpm(num_projects, dynamic_rate_limit_handler): + model = "my-fake-model" + ## SET CACHE W/ ACTIVE PROJECTS + projects = [str(uuid.uuid4()) for _ in range(num_projects)] + + await dynamic_rate_limit_handler.internal_usage_cache.async_set_cache_sadd( + model=model, value=projects + ) + + model_rpm = 100 + llm_router = Router( + model_list=[ + { + "model_name": model, + "litellm_params": { + "model": "gpt-3.5-turbo", + "api_key": "my-key", + "api_base": "my-base", + "rpm": model_rpm, + }, + } + ] + ) + dynamic_rate_limit_handler.update_variables(llm_router=llm_router) + + ## CHECK AVAILABLE rpm PER PROJECT + + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[1] + + expected_availability = int(model_rpm / num_projects) + + assert availability == expected_availability + + +@pytest.mark.parametrize("usage", ["rpm", "tpm"]) +@pytest.mark.asyncio +async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth, usage): """ Unit test. Tests if rate limit error raised when quota exhausted. """ @@ -133,7 +172,7 @@ async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth): model=model, value=projects ) - model_tpm = 0 + model_usage = 0 llm_router = Router( model_list=[ { @@ -142,7 +181,7 @@ async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth): "model": "gpt-3.5-turbo", "api_key": "my-key", "api_base": "my-base", - "tpm": model_tpm, + usage: model_usage, }, } ] @@ -153,9 +192,12 @@ async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth): resp = await dynamic_rate_limit_handler.check_available_usage(model=model) - availability = resp[0] + if usage == "tpm": + availability = resp[0] + else: + availability = resp[1] - expected_availability = int(model_tpm / 1) + expected_availability = 0 assert availability == expected_availability From 6b529d4e0e3825f8a84b747225414f8b7a196e07 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Mon, 1 Jul 2024 23:08:54 -0700 Subject: [PATCH 4/5] fix(dynamic_rate_limiter.py): support setting priority + reserving tpm/rpm --- litellm/__init__.py | 2 + litellm/proxy/hooks/dynamic_rate_limiter.py | 46 ++++++++++++++--- .../tests/test_dynamic_rate_limit_handler.py | 50 +++++++++++++++++++ 3 files changed, 91 insertions(+), 7 deletions(-) diff --git a/litellm/__init__.py b/litellm/__init__.py index 0fa822a98..cc31ea999 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -237,6 +237,8 @@ default_user_params: Optional[Dict] = None default_team_settings: Optional[List] = None max_user_budget: Optional[float] = None max_end_user_budget: Optional[float] = None +#### REQUEST PRIORITIZATION #### +priority_reservation: Optional[Dict[str, float]] = None #### RELIABILITY #### request_timeout: float = 6000 module_level_aclient = AsyncHTTPHandler(timeout=request_timeout) diff --git a/litellm/proxy/hooks/dynamic_rate_limiter.py b/litellm/proxy/hooks/dynamic_rate_limiter.py index bb6bef0cd..8d0effc89 100644 --- a/litellm/proxy/hooks/dynamic_rate_limiter.py +++ b/litellm/proxy/hooks/dynamic_rate_limiter.py @@ -3,6 +3,7 @@ ## Tracks num active projects per minute import asyncio +import os import sys import traceback from datetime import datetime @@ -82,13 +83,17 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): self.llm_router = llm_router async def check_available_usage( - self, model: str + self, model: str, priority: Optional[str] = None ) -> Tuple[ Optional[int], Optional[int], Optional[int], Optional[int], Optional[int] ]: """ For a given model, get its available tpm + Params: + - model: str, the name of the model in the router model_list + - priority: Optional[str], the priority for the request. + Returns - Tuple[available_tpm, available_tpm, model_tpm, model_rpm, active_projects] - available_tpm: int or null - always 0 or positive. @@ -97,6 +102,23 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): - remaining_model_rpm: int or null. If available rpm is int, then this will be too. - active_projects: int or null """ + weight: float = 1 + if ( + litellm.priority_reservation is None + or priority not in litellm.priority_reservation + ): + verbose_proxy_logger.error( + "Priority Reservation not set. priority={}, but litellm.priority_reservation is {}.".format( + priority, litellm.priority_reservation + ) + ) + elif priority is not None and litellm.priority_reservation is not None: + if os.getenv("LITELLM_LICENSE", None) is None: + verbose_proxy_logger.error( + "PREMIUM FEATURE: Reserving tpm/rpm by priority is a premium feature. Please add a 'LITELLM_LICENSE' to your .env to enable this.\nGet a license: https://docs.litellm.ai/docs/proxy/enterprise." + ) + else: + weight = litellm.priority_reservation[priority] active_projects = await self.internal_usage_cache.async_get_cache(model=model) current_model_tpm, current_model_rpm = ( await self.llm_router.get_model_group_usage(model_group=model) @@ -128,9 +150,9 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): if remaining_model_tpm is not None: if active_projects is not None: - available_tpm = int(remaining_model_tpm / active_projects) + available_tpm = int(remaining_model_tpm * weight / active_projects) else: - available_tpm = remaining_model_tpm + available_tpm = int(remaining_model_tpm * weight) if available_tpm is not None and available_tpm < 0: available_tpm = 0 @@ -139,9 +161,9 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): if remaining_model_rpm is not None: if active_projects is not None: - available_rpm = int(remaining_model_rpm / active_projects) + available_rpm = int(remaining_model_rpm * weight / active_projects) else: - available_rpm = remaining_model_rpm + available_rpm = int(remaining_model_rpm * weight) if available_rpm is not None and available_rpm < 0: available_rpm = 0 @@ -175,8 +197,13 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): - Raise RateLimitError if no tpm/rpm available """ if "model" in data: + key_priority: Optional[str] = user_api_key_dict.metadata.get( + "priority", None + ) available_tpm, available_rpm, model_tpm, model_rpm, active_projects = ( - await self.check_available_usage(model=data["model"]) + await self.check_available_usage( + model=data["model"], priority=key_priority + ) ) ### CHECK TPM ### if available_tpm is not None and available_tpm == 0: @@ -227,8 +254,13 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): ), "Model info for model with id={} is None".format( response._hidden_params["model_id"] ) + key_priority: Optional[str] = user_api_key_dict.metadata.get( + "priority", None + ) available_tpm, available_rpm, model_tpm, model_rpm, active_projects = ( - await self.check_available_usage(model=model_info["model_name"]) + await self.check_available_usage( + model=model_info["model_name"], priority=key_priority + ) ) response._hidden_params["additional_headers"] = ( { # Add additional response headers - easier debugging diff --git a/litellm/tests/test_dynamic_rate_limit_handler.py b/litellm/tests/test_dynamic_rate_limit_handler.py index 129a71de0..bc50fc16f 100644 --- a/litellm/tests/test_dynamic_rate_limit_handler.py +++ b/litellm/tests/test_dynamic_rate_limit_handler.py @@ -438,6 +438,56 @@ async def test_multiple_projects( assert availability == 0 +@pytest.mark.parametrize("num_projects", [1, 2, 100]) +@pytest.mark.asyncio +async def test_priority_reservation(num_projects, dynamic_rate_limit_handler): + """ + If reservation is set + `mock_testing_reservation` passed in + + assert correct rpm is reserved + """ + model = "my-fake-model" + ## SET CACHE W/ ACTIVE PROJECTS + projects = [str(uuid.uuid4()) for _ in range(num_projects)] + + await dynamic_rate_limit_handler.internal_usage_cache.async_set_cache_sadd( + model=model, value=projects + ) + + litellm.priority_reservation = {"dev": 0.1, "prod": 0.9} + + model_usage = 100 + + llm_router = Router( + model_list=[ + { + "model_name": model, + "litellm_params": { + "model": "gpt-3.5-turbo", + "api_key": "my-key", + "api_base": "my-base", + "rpm": model_usage, + }, + } + ] + ) + dynamic_rate_limit_handler.update_variables(llm_router=llm_router) + + ## CHECK AVAILABLE TPM PER PROJECT + + resp = await dynamic_rate_limit_handler.check_available_usage( + model=model, priority="prod" + ) + + availability = resp[1] + + expected_availability = int( + model_usage * litellm.priority_reservation["prod"] / num_projects + ) + + assert availability == expected_availability + + @pytest.mark.skip( reason="Unstable on ci/cd due to curr minute changes. Refactor to handle minute changing" ) From 196b94455e4ab9725298aaef4e198adb07bcb094 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Mon, 1 Jul 2024 23:35:42 -0700 Subject: [PATCH 5/5] fix(dynamic_rate_limiter.py): add rpm allocation, priority + quota reservation to docs --- docs/my-website/docs/proxy/team_budgets.md | 92 +++++++++++- litellm/proxy/_new_secret_config.yaml | 22 +-- litellm/proxy/hooks/dynamic_rate_limiter.py | 149 +++++++++++--------- 3 files changed, 181 insertions(+), 82 deletions(-) diff --git a/docs/my-website/docs/proxy/team_budgets.md b/docs/my-website/docs/proxy/team_budgets.md index 7d5284de7..d38526497 100644 --- a/docs/my-website/docs/proxy/team_budgets.md +++ b/docs/my-website/docs/proxy/team_budgets.md @@ -152,11 +152,11 @@ litellm_remaining_team_budget_metric{team_alias="QA Prod Bot",team_id="de35b29e- ``` -### Dynamic TPM Allocation +### Dynamic TPM/RPM Allocation -Prevent projects from gobbling too much quota. +Prevent projects from gobbling too much tpm/rpm. -Dynamically allocate TPM quota to api keys, based on active keys in that minute. [**See Code**](https://github.com/BerriAI/litellm/blob/9bffa9a48e610cc6886fc2dce5c1815aeae2ad46/litellm/proxy/hooks/dynamic_rate_limiter.py#L125) +Dynamically allocate TPM/RPM quota to api keys, based on active keys in that minute. [**See Code**](https://github.com/BerriAI/litellm/blob/9bffa9a48e610cc6886fc2dce5c1815aeae2ad46/litellm/proxy/hooks/dynamic_rate_limiter.py#L125) 1. Setup config.yaml @@ -247,4 +247,90 @@ except RateLimitError as e: ``` This was rate limited b/c - Error code: 429 - {'error': {'message': {'error': 'Key= over available TPM=0. Model TPM=0, Active keys=2'}, 'type': 'None', 'param': 'None', 'code': 429}} +``` + + +#### ✨ [BETA] Set Priority / Reserve Quota + +Reserve tpm/rpm capacity for projects in prod. + +:::tip + +Reserving tpm/rpm on keys based on priority is a premium feature. Please [get an enterprise license](./enterprise.md) for it. +::: + + +1. Setup config.yaml + +```yaml +model_list: + - model_name: gpt-3.5-turbo + litellm_params: + model: "gpt-3.5-turbo" + api_key: os.environ/OPENAI_API_KEY + rpm: 100 + +litellm_settings: + callbacks: ["dynamic_rate_limiter"] + priority_reservation: {"dev": 0, "prod": 1} + +general_settings: + master_key: sk-1234 # OR set `LITELLM_MASTER_KEY=".."` in your .env + database_url: postgres://.. # OR set `DATABASE_URL=".."` in your .env +``` + + +priority_reservation: +- Dict[str, float] + - str: can be any string + - float: from 0 to 1. Specify the % of tpm/rpm to reserve for keys of this priority. + +**Start Proxy** + +``` +litellm --config /path/to/config.yaml +``` + +2. Create a key with that priority + +```bash +curl -X POST 'http://0.0.0.0:4000/key/generate' \ +-H 'Authorization: Bearer ' \ +-H 'Content-Type: application/json' \ +-D '{ + "metadata": {"priority": "dev"} # 👈 KEY CHANGE +}' +``` + +**Expected Response** + +``` +{ + ... + "key": "sk-.." +} +``` + + +3. Test it! + +```bash +curl -X POST 'http://0.0.0.0:4000/chat/completions' \ + -H 'Content-Type: application/json' \ + -H 'Authorization: sk-...' \ # 👈 key from step 2. + -D '{ + "model": "gpt-3.5-turbo", + "messages": [ + { + "role": "user", + "content": "what llm are you" + } + ], +}' +``` + +**Expected Response** + +``` +Key=... over available RPM=0. Model RPM=100, Active keys=None ``` \ No newline at end of file diff --git a/litellm/proxy/_new_secret_config.yaml b/litellm/proxy/_new_secret_config.yaml index f21a9e832..f26337891 100644 --- a/litellm/proxy/_new_secret_config.yaml +++ b/litellm/proxy/_new_secret_config.yaml @@ -1,14 +1,16 @@ model_list: - - model_name: "*" # all requests where model not in your config go to this deployment + - model_name: gpt-3.5-turbo # all requests where model not in your config go to this deployment litellm_params: - model: "openai/*" + model: "gpt-3.5-turbo" + rpm: 100 litellm_settings: - success_callback: ["s3"] - s3_callback_params: - s3_bucket_name: my-test-bucket-22-litellm # AWS Bucket Name for S3 - s3_region_name: us-west-2 # AWS Region Name for S3 - s3_aws_access_key_id: os.environ/AWS_ACCESS_KEY_ID # us os.environ/ to pass environment variables. This is AWS Access Key ID for S3 - s3_aws_secret_access_key: os.environ/AWS_SECRET_ACCESS_KEY # AWS Secret Access Key for S3 - s3_path: my-test-path - + callbacks: ["dynamic_rate_limiter"] + priority_reservation: {"dev": 0, "prod": 1} +# success_callback: ["s3"] +# s3_callback_params: +# s3_bucket_name: my-test-bucket-22-litellm # AWS Bucket Name for S3 +# s3_region_name: us-west-2 # AWS Region Name for S3 +# s3_aws_access_key_id: os.environ/AWS_ACCESS_KEY_ID # us os.environ/ to pass environment variables. This is AWS Access Key ID for S3 +# s3_aws_secret_access_key: os.environ/AWS_SECRET_ACCESS_KEY # AWS Secret Access Key for S3 +# s3_path: my-test-path diff --git a/litellm/proxy/hooks/dynamic_rate_limiter.py b/litellm/proxy/hooks/dynamic_rate_limiter.py index 8d0effc89..33b5d2eb9 100644 --- a/litellm/proxy/hooks/dynamic_rate_limiter.py +++ b/litellm/proxy/hooks/dynamic_rate_limiter.py @@ -102,78 +102,89 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): - remaining_model_rpm: int or null. If available rpm is int, then this will be too. - active_projects: int or null """ - weight: float = 1 - if ( - litellm.priority_reservation is None - or priority not in litellm.priority_reservation - ): + try: + weight: float = 1 + if ( + litellm.priority_reservation is None + or priority not in litellm.priority_reservation + ): + verbose_proxy_logger.error( + "Priority Reservation not set. priority={}, but litellm.priority_reservation is {}.".format( + priority, litellm.priority_reservation + ) + ) + elif priority is not None and litellm.priority_reservation is not None: + if os.getenv("LITELLM_LICENSE", None) is None: + verbose_proxy_logger.error( + "PREMIUM FEATURE: Reserving tpm/rpm by priority is a premium feature. Please add a 'LITELLM_LICENSE' to your .env to enable this.\nGet a license: https://docs.litellm.ai/docs/proxy/enterprise." + ) + else: + weight = litellm.priority_reservation[priority] + + active_projects = await self.internal_usage_cache.async_get_cache( + model=model + ) + current_model_tpm, current_model_rpm = ( + await self.llm_router.get_model_group_usage(model_group=model) + ) + model_group_info: Optional[ModelGroupInfo] = ( + self.llm_router.get_model_group_info(model_group=model) + ) + total_model_tpm: Optional[int] = None + total_model_rpm: Optional[int] = None + if model_group_info is not None: + if model_group_info.tpm is not None: + total_model_tpm = model_group_info.tpm + if model_group_info.rpm is not None: + total_model_rpm = model_group_info.rpm + + remaining_model_tpm: Optional[int] = None + if total_model_tpm is not None and current_model_tpm is not None: + remaining_model_tpm = total_model_tpm - current_model_tpm + elif total_model_tpm is not None: + remaining_model_tpm = total_model_tpm + + remaining_model_rpm: Optional[int] = None + if total_model_rpm is not None and current_model_rpm is not None: + remaining_model_rpm = total_model_rpm - current_model_rpm + elif total_model_rpm is not None: + remaining_model_rpm = total_model_rpm + + available_tpm: Optional[int] = None + + if remaining_model_tpm is not None: + if active_projects is not None: + available_tpm = int(remaining_model_tpm * weight / active_projects) + else: + available_tpm = int(remaining_model_tpm * weight) + + if available_tpm is not None and available_tpm < 0: + available_tpm = 0 + + available_rpm: Optional[int] = None + + if remaining_model_rpm is not None: + if active_projects is not None: + available_rpm = int(remaining_model_rpm * weight / active_projects) + else: + available_rpm = int(remaining_model_rpm * weight) + + if available_rpm is not None and available_rpm < 0: + available_rpm = 0 + return ( + available_tpm, + available_rpm, + remaining_model_tpm, + remaining_model_rpm, + active_projects, + ) + except Exception as e: verbose_proxy_logger.error( - "Priority Reservation not set. priority={}, but litellm.priority_reservation is {}.".format( - priority, litellm.priority_reservation + "litellm.proxy.hooks.dynamic_rate_limiter.py::check_available_usage: Exception occurred - {}\n{}".format( + str(e), traceback.format_exc() ) ) - elif priority is not None and litellm.priority_reservation is not None: - if os.getenv("LITELLM_LICENSE", None) is None: - verbose_proxy_logger.error( - "PREMIUM FEATURE: Reserving tpm/rpm by priority is a premium feature. Please add a 'LITELLM_LICENSE' to your .env to enable this.\nGet a license: https://docs.litellm.ai/docs/proxy/enterprise." - ) - else: - weight = litellm.priority_reservation[priority] - active_projects = await self.internal_usage_cache.async_get_cache(model=model) - current_model_tpm, current_model_rpm = ( - await self.llm_router.get_model_group_usage(model_group=model) - ) - model_group_info: Optional[ModelGroupInfo] = ( - self.llm_router.get_model_group_info(model_group=model) - ) - total_model_tpm: Optional[int] = None - total_model_rpm: Optional[int] = None - if model_group_info is not None: - if model_group_info.tpm is not None: - total_model_tpm = model_group_info.tpm - if model_group_info.rpm is not None: - total_model_rpm = model_group_info.rpm - - remaining_model_tpm: Optional[int] = None - if total_model_tpm is not None and current_model_tpm is not None: - remaining_model_tpm = total_model_tpm - current_model_tpm - elif total_model_tpm is not None: - remaining_model_tpm = total_model_tpm - - remaining_model_rpm: Optional[int] = None - if total_model_rpm is not None and current_model_rpm is not None: - remaining_model_rpm = total_model_rpm - current_model_rpm - elif total_model_rpm is not None: - remaining_model_rpm = total_model_rpm - - available_tpm: Optional[int] = None - - if remaining_model_tpm is not None: - if active_projects is not None: - available_tpm = int(remaining_model_tpm * weight / active_projects) - else: - available_tpm = int(remaining_model_tpm * weight) - - if available_tpm is not None and available_tpm < 0: - available_tpm = 0 - - available_rpm: Optional[int] = None - - if remaining_model_rpm is not None: - if active_projects is not None: - available_rpm = int(remaining_model_rpm * weight / active_projects) - else: - available_rpm = int(remaining_model_rpm * weight) - - if available_rpm is not None and available_rpm < 0: - available_rpm = 0 - return ( - available_tpm, - available_rpm, - remaining_model_tpm, - remaining_model_rpm, - active_projects, - ) + return None, None, None, None, None async def async_pre_call_hook( self,