diff --git a/docs/my-website/docs/proxy/team_budgets.md b/docs/my-website/docs/proxy/team_budgets.md index 7d5284de7..d38526497 100644 --- a/docs/my-website/docs/proxy/team_budgets.md +++ b/docs/my-website/docs/proxy/team_budgets.md @@ -152,11 +152,11 @@ litellm_remaining_team_budget_metric{team_alias="QA Prod Bot",team_id="de35b29e- ``` -### Dynamic TPM Allocation +### Dynamic TPM/RPM Allocation -Prevent projects from gobbling too much quota. +Prevent projects from gobbling too much tpm/rpm. -Dynamically allocate TPM quota to api keys, based on active keys in that minute. [**See Code**](https://github.com/BerriAI/litellm/blob/9bffa9a48e610cc6886fc2dce5c1815aeae2ad46/litellm/proxy/hooks/dynamic_rate_limiter.py#L125) +Dynamically allocate TPM/RPM quota to api keys, based on active keys in that minute. [**See Code**](https://github.com/BerriAI/litellm/blob/9bffa9a48e610cc6886fc2dce5c1815aeae2ad46/litellm/proxy/hooks/dynamic_rate_limiter.py#L125) 1. Setup config.yaml @@ -247,4 +247,90 @@ except RateLimitError as e: ``` This was rate limited b/c - Error code: 429 - {'error': {'message': {'error': 'Key= over available TPM=0. Model TPM=0, Active keys=2'}, 'type': 'None', 'param': 'None', 'code': 429}} +``` + + +#### ✨ [BETA] Set Priority / Reserve Quota + +Reserve tpm/rpm capacity for projects in prod. + +:::tip + +Reserving tpm/rpm on keys based on priority is a premium feature. Please [get an enterprise license](./enterprise.md) for it. +::: + + +1. Setup config.yaml + +```yaml +model_list: + - model_name: gpt-3.5-turbo + litellm_params: + model: "gpt-3.5-turbo" + api_key: os.environ/OPENAI_API_KEY + rpm: 100 + +litellm_settings: + callbacks: ["dynamic_rate_limiter"] + priority_reservation: {"dev": 0, "prod": 1} + +general_settings: + master_key: sk-1234 # OR set `LITELLM_MASTER_KEY=".."` in your .env + database_url: postgres://.. # OR set `DATABASE_URL=".."` in your .env +``` + + +priority_reservation: +- Dict[str, float] + - str: can be any string + - float: from 0 to 1. Specify the % of tpm/rpm to reserve for keys of this priority. + +**Start Proxy** + +``` +litellm --config /path/to/config.yaml +``` + +2. Create a key with that priority + +```bash +curl -X POST 'http://0.0.0.0:4000/key/generate' \ +-H 'Authorization: Bearer ' \ +-H 'Content-Type: application/json' \ +-D '{ + "metadata": {"priority": "dev"} # 👈 KEY CHANGE +}' +``` + +**Expected Response** + +``` +{ + ... + "key": "sk-.." +} +``` + + +3. Test it! + +```bash +curl -X POST 'http://0.0.0.0:4000/chat/completions' \ + -H 'Content-Type: application/json' \ + -H 'Authorization: sk-...' \ # 👈 key from step 2. + -D '{ + "model": "gpt-3.5-turbo", + "messages": [ + { + "role": "user", + "content": "what llm are you" + } + ], +}' +``` + +**Expected Response** + +``` +Key=... over available RPM=0. Model RPM=100, Active keys=None ``` \ No newline at end of file diff --git a/litellm/__init__.py b/litellm/__init__.py index a9e6b69ae..29b5bc360 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -240,6 +240,8 @@ default_user_params: Optional[Dict] = None default_team_settings: Optional[List] = None max_user_budget: Optional[float] = None max_end_user_budget: Optional[float] = None +#### REQUEST PRIORITIZATION #### +priority_reservation: Optional[Dict[str, float]] = None #### RELIABILITY #### request_timeout: float = 6000 module_level_aclient = AsyncHTTPHandler(timeout=request_timeout) diff --git a/litellm/proxy/_new_secret_config.yaml b/litellm/proxy/_new_secret_config.yaml index 31c96a066..f135922ea 100644 --- a/litellm/proxy/_new_secret_config.yaml +++ b/litellm/proxy/_new_secret_config.yaml @@ -6,4 +6,4 @@ model_list: general_settings: alerting: ["slack"] - alerting_threshold: 10 \ No newline at end of file + alerting_threshold: 10 diff --git a/litellm/proxy/hooks/dynamic_rate_limiter.py b/litellm/proxy/hooks/dynamic_rate_limiter.py index 95f0ccc13..33b5d2eb9 100644 --- a/litellm/proxy/hooks/dynamic_rate_limiter.py +++ b/litellm/proxy/hooks/dynamic_rate_limiter.py @@ -3,6 +3,7 @@ ## Tracks num active projects per minute import asyncio +import os import sys import traceback from datetime import datetime @@ -81,46 +82,109 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): def update_variables(self, llm_router: Router): self.llm_router = llm_router - async def check_available_tpm( - self, model: str - ) -> Tuple[Optional[int], Optional[int], Optional[int]]: + async def check_available_usage( + self, model: str, priority: Optional[str] = None + ) -> Tuple[ + Optional[int], Optional[int], Optional[int], Optional[int], Optional[int] + ]: """ For a given model, get its available tpm + Params: + - model: str, the name of the model in the router model_list + - priority: Optional[str], the priority for the request. + Returns - - Tuple[available_tpm, model_tpm, active_projects] + - Tuple[available_tpm, available_tpm, model_tpm, model_rpm, active_projects] + - available_tpm: int or null - always 0 or positive. - available_tpm: int or null - always 0 or positive. - remaining_model_tpm: int or null. If available tpm is int, then this will be too. + - remaining_model_rpm: int or null. If available rpm is int, then this will be too. - active_projects: int or null """ - active_projects = await self.internal_usage_cache.async_get_cache(model=model) - current_model_tpm: Optional[int] = await self.llm_router.get_model_group_usage( - model_group=model - ) - model_group_info: Optional[ModelGroupInfo] = ( - self.llm_router.get_model_group_info(model_group=model) - ) - total_model_tpm: Optional[int] = None - if model_group_info is not None and model_group_info.tpm is not None: - total_model_tpm = model_group_info.tpm + try: + weight: float = 1 + if ( + litellm.priority_reservation is None + or priority not in litellm.priority_reservation + ): + verbose_proxy_logger.error( + "Priority Reservation not set. priority={}, but litellm.priority_reservation is {}.".format( + priority, litellm.priority_reservation + ) + ) + elif priority is not None and litellm.priority_reservation is not None: + if os.getenv("LITELLM_LICENSE", None) is None: + verbose_proxy_logger.error( + "PREMIUM FEATURE: Reserving tpm/rpm by priority is a premium feature. Please add a 'LITELLM_LICENSE' to your .env to enable this.\nGet a license: https://docs.litellm.ai/docs/proxy/enterprise." + ) + else: + weight = litellm.priority_reservation[priority] - remaining_model_tpm: Optional[int] = None - if total_model_tpm is not None and current_model_tpm is not None: - remaining_model_tpm = total_model_tpm - current_model_tpm - elif total_model_tpm is not None: - remaining_model_tpm = total_model_tpm + active_projects = await self.internal_usage_cache.async_get_cache( + model=model + ) + current_model_tpm, current_model_rpm = ( + await self.llm_router.get_model_group_usage(model_group=model) + ) + model_group_info: Optional[ModelGroupInfo] = ( + self.llm_router.get_model_group_info(model_group=model) + ) + total_model_tpm: Optional[int] = None + total_model_rpm: Optional[int] = None + if model_group_info is not None: + if model_group_info.tpm is not None: + total_model_tpm = model_group_info.tpm + if model_group_info.rpm is not None: + total_model_rpm = model_group_info.rpm - available_tpm: Optional[int] = None + remaining_model_tpm: Optional[int] = None + if total_model_tpm is not None and current_model_tpm is not None: + remaining_model_tpm = total_model_tpm - current_model_tpm + elif total_model_tpm is not None: + remaining_model_tpm = total_model_tpm - if remaining_model_tpm is not None: - if active_projects is not None: - available_tpm = int(remaining_model_tpm / active_projects) - else: - available_tpm = remaining_model_tpm + remaining_model_rpm: Optional[int] = None + if total_model_rpm is not None and current_model_rpm is not None: + remaining_model_rpm = total_model_rpm - current_model_rpm + elif total_model_rpm is not None: + remaining_model_rpm = total_model_rpm - if available_tpm is not None and available_tpm < 0: - available_tpm = 0 - return available_tpm, remaining_model_tpm, active_projects + available_tpm: Optional[int] = None + + if remaining_model_tpm is not None: + if active_projects is not None: + available_tpm = int(remaining_model_tpm * weight / active_projects) + else: + available_tpm = int(remaining_model_tpm * weight) + + if available_tpm is not None and available_tpm < 0: + available_tpm = 0 + + available_rpm: Optional[int] = None + + if remaining_model_rpm is not None: + if active_projects is not None: + available_rpm = int(remaining_model_rpm * weight / active_projects) + else: + available_rpm = int(remaining_model_rpm * weight) + + if available_rpm is not None and available_rpm < 0: + available_rpm = 0 + return ( + available_tpm, + available_rpm, + remaining_model_tpm, + remaining_model_rpm, + active_projects, + ) + except Exception as e: + verbose_proxy_logger.error( + "litellm.proxy.hooks.dynamic_rate_limiter.py::check_available_usage: Exception occurred - {}\n{}".format( + str(e), traceback.format_exc() + ) + ) + return None, None, None, None, None async def async_pre_call_hook( self, @@ -140,13 +204,19 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): ]: # raise exception if invalid, return a str for the user to receive - if rejected, or return a modified dictionary for passing into litellm """ - For a model group - - Check if tpm available - - Raise RateLimitError if no tpm available + - Check if tpm/rpm available + - Raise RateLimitError if no tpm/rpm available """ if "model" in data: - available_tpm, model_tpm, active_projects = await self.check_available_tpm( - model=data["model"] + key_priority: Optional[str] = user_api_key_dict.metadata.get( + "priority", None ) + available_tpm, available_rpm, model_tpm, model_rpm, active_projects = ( + await self.check_available_usage( + model=data["model"], priority=key_priority + ) + ) + ### CHECK TPM ### if available_tpm is not None and available_tpm == 0: raise HTTPException( status_code=429, @@ -159,7 +229,20 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): ) }, ) - elif available_tpm is not None: + ### CHECK RPM ### + elif available_rpm is not None and available_rpm == 0: + raise HTTPException( + status_code=429, + detail={ + "error": "Key={} over available RPM={}. Model RPM={}, Active keys={}".format( + user_api_key_dict.api_key, + available_rpm, + model_rpm, + active_projects, + ) + }, + ) + elif available_rpm is not None or available_tpm is not None: ## UPDATE CACHE WITH ACTIVE PROJECT asyncio.create_task( self.internal_usage_cache.async_set_cache_sadd( # this is a set @@ -182,15 +265,24 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger): ), "Model info for model with id={} is None".format( response._hidden_params["model_id"] ) - available_tpm, remaining_model_tpm, active_projects = ( - await self.check_available_tpm(model=model_info["model_name"]) + key_priority: Optional[str] = user_api_key_dict.metadata.get( + "priority", None + ) + available_tpm, available_rpm, model_tpm, model_rpm, active_projects = ( + await self.check_available_usage( + model=model_info["model_name"], priority=key_priority + ) + ) + response._hidden_params["additional_headers"] = ( + { # Add additional response headers - easier debugging + "x-litellm-model_group": model_info["model_name"], + "x-ratelimit-remaining-litellm-project-tokens": available_tpm, + "x-ratelimit-remaining-litellm-project-requests": available_rpm, + "x-ratelimit-remaining-model-tokens": model_tpm, + "x-ratelimit-remaining-model-requests": model_rpm, + "x-ratelimit-current-active-projects": active_projects, + } ) - response._hidden_params["additional_headers"] = { - "x-litellm-model_group": model_info["model_name"], - "x-ratelimit-remaining-litellm-project-tokens": available_tpm, - "x-ratelimit-remaining-model-tokens": remaining_model_tpm, - "x-ratelimit-current-active-projects": active_projects, - } return response return await super().async_post_call_success_hook( diff --git a/litellm/router.py b/litellm/router.py index 0d082de5d..ac61ec729 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -4256,25 +4256,42 @@ class Router: return model_group_info - async def get_model_group_usage(self, model_group: str) -> Optional[int]: + async def get_model_group_usage( + self, model_group: str + ) -> Tuple[Optional[int], Optional[int]]: """ - Returns remaining tpm quota for model group + Returns remaining tpm/rpm quota for model group + + Returns: + - usage: Tuple[tpm, rpm] """ dt = get_utc_datetime() current_minute = dt.strftime( "%H-%M" ) # use the same timezone regardless of system clock tpm_keys: List[str] = [] + rpm_keys: List[str] = [] for model in self.model_list: if "model_name" in model and model["model_name"] == model_group: tpm_keys.append( f"global_router:{model['model_info']['id']}:tpm:{current_minute}" ) + rpm_keys.append( + f"global_router:{model['model_info']['id']}:rpm:{current_minute}" + ) + combined_tpm_rpm_keys = tpm_keys + rpm_keys + + combined_tpm_rpm_values = await self.cache.async_batch_get_cache( + keys=combined_tpm_rpm_keys + ) + + if combined_tpm_rpm_values is None: + return None, None + + tpm_usage_list: Optional[List] = combined_tpm_rpm_values[: len(tpm_keys)] + rpm_usage_list: Optional[List] = combined_tpm_rpm_values[len(tpm_keys) :] ## TPM - tpm_usage_list: Optional[List] = await self.cache.async_batch_get_cache( - keys=tpm_keys - ) tpm_usage: Optional[int] = None if tpm_usage_list is not None: for t in tpm_usage_list: @@ -4282,8 +4299,15 @@ class Router: if tpm_usage is None: tpm_usage = 0 tpm_usage += t - - return tpm_usage + ## RPM + rpm_usage: Optional[int] = None + if rpm_usage_list is not None: + for t in rpm_usage_list: + if isinstance(t, int): + if rpm_usage is None: + rpm_usage = 0 + rpm_usage += t + return tpm_usage, rpm_usage def get_model_ids(self) -> List[str]: """ diff --git a/litellm/tests/test_dynamic_rate_limit_handler.py b/litellm/tests/test_dynamic_rate_limit_handler.py index 4f49abff8..bc50fc16f 100644 --- a/litellm/tests/test_dynamic_rate_limit_handler.py +++ b/litellm/tests/test_dynamic_rate_limit_handler.py @@ -109,17 +109,56 @@ async def test_available_tpm(num_projects, dynamic_rate_limit_handler): ## CHECK AVAILABLE TPM PER PROJECT - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[0] expected_availability = int(model_tpm / num_projects) assert availability == expected_availability +@pytest.mark.parametrize("num_projects", [1, 2, 100]) @pytest.mark.asyncio -async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth): +async def test_available_rpm(num_projects, dynamic_rate_limit_handler): + model = "my-fake-model" + ## SET CACHE W/ ACTIVE PROJECTS + projects = [str(uuid.uuid4()) for _ in range(num_projects)] + + await dynamic_rate_limit_handler.internal_usage_cache.async_set_cache_sadd( + model=model, value=projects + ) + + model_rpm = 100 + llm_router = Router( + model_list=[ + { + "model_name": model, + "litellm_params": { + "model": "gpt-3.5-turbo", + "api_key": "my-key", + "api_base": "my-base", + "rpm": model_rpm, + }, + } + ] + ) + dynamic_rate_limit_handler.update_variables(llm_router=llm_router) + + ## CHECK AVAILABLE rpm PER PROJECT + + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[1] + + expected_availability = int(model_rpm / num_projects) + + assert availability == expected_availability + + +@pytest.mark.parametrize("usage", ["rpm", "tpm"]) +@pytest.mark.asyncio +async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth, usage): """ Unit test. Tests if rate limit error raised when quota exhausted. """ @@ -133,7 +172,7 @@ async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth): model=model, value=projects ) - model_tpm = 0 + model_usage = 0 llm_router = Router( model_list=[ { @@ -142,7 +181,7 @@ async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth): "model": "gpt-3.5-turbo", "api_key": "my-key", "api_base": "my-base", - "tpm": model_tpm, + usage: model_usage, }, } ] @@ -151,11 +190,14 @@ async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth): ## CHECK AVAILABLE TPM PER PROJECT - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) - expected_availability = int(model_tpm / 1) + if usage == "tpm": + availability = resp[0] + else: + availability = resp[1] + + expected_availability = 0 assert availability == expected_availability @@ -217,9 +259,9 @@ async def test_base_case(dynamic_rate_limit_handler, mock_response): for _ in range(2): try: # check availability - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[0] print( "prev_availability={}, availability={}".format( @@ -273,9 +315,9 @@ async def test_update_cache( dynamic_rate_limit_handler.update_variables(llm_router=llm_router) ## INITIAL ACTIVE PROJECTS - ASSERT NONE - _, _, active_projects = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + active_projects = resp[-1] assert active_projects is None @@ -289,9 +331,9 @@ async def test_update_cache( await asyncio.sleep(2) ## INITIAL ACTIVE PROJECTS - ASSERT 1 - _, _, active_projects = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + active_projects = resp[-1] assert active_projects == 1 @@ -357,9 +399,9 @@ async def test_multiple_projects( for i in range(expected_runs + 1): # check availability - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[0] ## assert availability updated if prev_availability is not None and availability is not None: @@ -389,12 +431,63 @@ async def test_multiple_projects( await asyncio.sleep(3) # check availability - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[0] + assert availability == 0 +@pytest.mark.parametrize("num_projects", [1, 2, 100]) +@pytest.mark.asyncio +async def test_priority_reservation(num_projects, dynamic_rate_limit_handler): + """ + If reservation is set + `mock_testing_reservation` passed in + + assert correct rpm is reserved + """ + model = "my-fake-model" + ## SET CACHE W/ ACTIVE PROJECTS + projects = [str(uuid.uuid4()) for _ in range(num_projects)] + + await dynamic_rate_limit_handler.internal_usage_cache.async_set_cache_sadd( + model=model, value=projects + ) + + litellm.priority_reservation = {"dev": 0.1, "prod": 0.9} + + model_usage = 100 + + llm_router = Router( + model_list=[ + { + "model_name": model, + "litellm_params": { + "model": "gpt-3.5-turbo", + "api_key": "my-key", + "api_base": "my-base", + "rpm": model_usage, + }, + } + ] + ) + dynamic_rate_limit_handler.update_variables(llm_router=llm_router) + + ## CHECK AVAILABLE TPM PER PROJECT + + resp = await dynamic_rate_limit_handler.check_available_usage( + model=model, priority="prod" + ) + + availability = resp[1] + + expected_availability = int( + model_usage * litellm.priority_reservation["prod"] / num_projects + ) + + assert availability == expected_availability + + @pytest.mark.skip( reason="Unstable on ci/cd due to curr minute changes. Refactor to handle minute changing" ) @@ -456,9 +549,9 @@ async def test_multiple_projects_e2e( print("expected_runs: {}".format(expected_runs)) for i in range(expected_runs + 1): # check availability - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[0] ## assert availability updated if prev_availability is not None and availability is not None: @@ -488,7 +581,7 @@ async def test_multiple_projects_e2e( await asyncio.sleep(3) # check availability - availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm( - model=model - ) + resp = await dynamic_rate_limit_handler.check_available_usage(model=model) + + availability = resp[0] assert availability == 0