Merge pull request #4502 from BerriAI/litellm_support_dynamic_rpm_limiting

feat(dynamic_rate_limiter.py): support dynamic rate limiting on rpm
This commit is contained in:
Krish Dholakia 2024-07-02 17:51:23 -07:00 committed by GitHub
commit 2d6b6662d4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 380 additions and 83 deletions

View file

@ -152,11 +152,11 @@ litellm_remaining_team_budget_metric{team_alias="QA Prod Bot",team_id="de35b29e-
```
### Dynamic TPM Allocation
### Dynamic TPM/RPM Allocation
Prevent projects from gobbling too much quota.
Prevent projects from gobbling too much tpm/rpm.
Dynamically allocate TPM quota to api keys, based on active keys in that minute. [**See Code**](https://github.com/BerriAI/litellm/blob/9bffa9a48e610cc6886fc2dce5c1815aeae2ad46/litellm/proxy/hooks/dynamic_rate_limiter.py#L125)
Dynamically allocate TPM/RPM quota to api keys, based on active keys in that minute. [**See Code**](https://github.com/BerriAI/litellm/blob/9bffa9a48e610cc6886fc2dce5c1815aeae2ad46/litellm/proxy/hooks/dynamic_rate_limiter.py#L125)
1. Setup config.yaml
@ -247,4 +247,90 @@ except RateLimitError as e:
```
This was rate limited b/c - Error code: 429 - {'error': {'message': {'error': 'Key=<hashed_token> over available TPM=0. Model TPM=0, Active keys=2'}, 'type': 'None', 'param': 'None', 'code': 429}}
```
#### ✨ [BETA] Set Priority / Reserve Quota
Reserve tpm/rpm capacity for projects in prod.
:::tip
Reserving tpm/rpm on keys based on priority is a premium feature. Please [get an enterprise license](./enterprise.md) for it.
:::
1. Setup config.yaml
```yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: "gpt-3.5-turbo"
api_key: os.environ/OPENAI_API_KEY
rpm: 100
litellm_settings:
callbacks: ["dynamic_rate_limiter"]
priority_reservation: {"dev": 0, "prod": 1}
general_settings:
master_key: sk-1234 # OR set `LITELLM_MASTER_KEY=".."` in your .env
database_url: postgres://.. # OR set `DATABASE_URL=".."` in your .env
```
priority_reservation:
- Dict[str, float]
- str: can be any string
- float: from 0 to 1. Specify the % of tpm/rpm to reserve for keys of this priority.
**Start Proxy**
```
litellm --config /path/to/config.yaml
```
2. Create a key with that priority
```bash
curl -X POST 'http://0.0.0.0:4000/key/generate' \
-H 'Authorization: Bearer <your-master-key>' \
-H 'Content-Type: application/json' \
-D '{
"metadata": {"priority": "dev"} # 👈 KEY CHANGE
}'
```
**Expected Response**
```
{
...
"key": "sk-.."
}
```
3. Test it!
```bash
curl -X POST 'http://0.0.0.0:4000/chat/completions' \
-H 'Content-Type: application/json' \
-H 'Authorization: sk-...' \ # 👈 key from step 2.
-D '{
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "user",
"content": "what llm are you"
}
],
}'
```
**Expected Response**
```
Key=... over available RPM=0. Model RPM=100, Active keys=None
```

View file

@ -240,6 +240,8 @@ default_user_params: Optional[Dict] = None
default_team_settings: Optional[List] = None
max_user_budget: Optional[float] = None
max_end_user_budget: Optional[float] = None
#### REQUEST PRIORITIZATION ####
priority_reservation: Optional[Dict[str, float]] = None
#### RELIABILITY ####
request_timeout: float = 6000
module_level_aclient = AsyncHTTPHandler(timeout=request_timeout)

View file

@ -6,4 +6,4 @@ model_list:
general_settings:
alerting: ["slack"]
alerting_threshold: 10
alerting_threshold: 10

View file

@ -3,6 +3,7 @@
## Tracks num active projects per minute
import asyncio
import os
import sys
import traceback
from datetime import datetime
@ -81,46 +82,109 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger):
def update_variables(self, llm_router: Router):
self.llm_router = llm_router
async def check_available_tpm(
self, model: str
) -> Tuple[Optional[int], Optional[int], Optional[int]]:
async def check_available_usage(
self, model: str, priority: Optional[str] = None
) -> Tuple[
Optional[int], Optional[int], Optional[int], Optional[int], Optional[int]
]:
"""
For a given model, get its available tpm
Params:
- model: str, the name of the model in the router model_list
- priority: Optional[str], the priority for the request.
Returns
- Tuple[available_tpm, model_tpm, active_projects]
- Tuple[available_tpm, available_tpm, model_tpm, model_rpm, active_projects]
- available_tpm: int or null - always 0 or positive.
- available_tpm: int or null - always 0 or positive.
- remaining_model_tpm: int or null. If available tpm is int, then this will be too.
- remaining_model_rpm: int or null. If available rpm is int, then this will be too.
- active_projects: int or null
"""
active_projects = await self.internal_usage_cache.async_get_cache(model=model)
current_model_tpm: Optional[int] = await self.llm_router.get_model_group_usage(
model_group=model
)
model_group_info: Optional[ModelGroupInfo] = (
self.llm_router.get_model_group_info(model_group=model)
)
total_model_tpm: Optional[int] = None
if model_group_info is not None and model_group_info.tpm is not None:
total_model_tpm = model_group_info.tpm
try:
weight: float = 1
if (
litellm.priority_reservation is None
or priority not in litellm.priority_reservation
):
verbose_proxy_logger.error(
"Priority Reservation not set. priority={}, but litellm.priority_reservation is {}.".format(
priority, litellm.priority_reservation
)
)
elif priority is not None and litellm.priority_reservation is not None:
if os.getenv("LITELLM_LICENSE", None) is None:
verbose_proxy_logger.error(
"PREMIUM FEATURE: Reserving tpm/rpm by priority is a premium feature. Please add a 'LITELLM_LICENSE' to your .env to enable this.\nGet a license: https://docs.litellm.ai/docs/proxy/enterprise."
)
else:
weight = litellm.priority_reservation[priority]
remaining_model_tpm: Optional[int] = None
if total_model_tpm is not None and current_model_tpm is not None:
remaining_model_tpm = total_model_tpm - current_model_tpm
elif total_model_tpm is not None:
remaining_model_tpm = total_model_tpm
active_projects = await self.internal_usage_cache.async_get_cache(
model=model
)
current_model_tpm, current_model_rpm = (
await self.llm_router.get_model_group_usage(model_group=model)
)
model_group_info: Optional[ModelGroupInfo] = (
self.llm_router.get_model_group_info(model_group=model)
)
total_model_tpm: Optional[int] = None
total_model_rpm: Optional[int] = None
if model_group_info is not None:
if model_group_info.tpm is not None:
total_model_tpm = model_group_info.tpm
if model_group_info.rpm is not None:
total_model_rpm = model_group_info.rpm
available_tpm: Optional[int] = None
remaining_model_tpm: Optional[int] = None
if total_model_tpm is not None and current_model_tpm is not None:
remaining_model_tpm = total_model_tpm - current_model_tpm
elif total_model_tpm is not None:
remaining_model_tpm = total_model_tpm
if remaining_model_tpm is not None:
if active_projects is not None:
available_tpm = int(remaining_model_tpm / active_projects)
else:
available_tpm = remaining_model_tpm
remaining_model_rpm: Optional[int] = None
if total_model_rpm is not None and current_model_rpm is not None:
remaining_model_rpm = total_model_rpm - current_model_rpm
elif total_model_rpm is not None:
remaining_model_rpm = total_model_rpm
if available_tpm is not None and available_tpm < 0:
available_tpm = 0
return available_tpm, remaining_model_tpm, active_projects
available_tpm: Optional[int] = None
if remaining_model_tpm is not None:
if active_projects is not None:
available_tpm = int(remaining_model_tpm * weight / active_projects)
else:
available_tpm = int(remaining_model_tpm * weight)
if available_tpm is not None and available_tpm < 0:
available_tpm = 0
available_rpm: Optional[int] = None
if remaining_model_rpm is not None:
if active_projects is not None:
available_rpm = int(remaining_model_rpm * weight / active_projects)
else:
available_rpm = int(remaining_model_rpm * weight)
if available_rpm is not None and available_rpm < 0:
available_rpm = 0
return (
available_tpm,
available_rpm,
remaining_model_tpm,
remaining_model_rpm,
active_projects,
)
except Exception as e:
verbose_proxy_logger.error(
"litellm.proxy.hooks.dynamic_rate_limiter.py::check_available_usage: Exception occurred - {}\n{}".format(
str(e), traceback.format_exc()
)
)
return None, None, None, None, None
async def async_pre_call_hook(
self,
@ -140,13 +204,19 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger):
]: # raise exception if invalid, return a str for the user to receive - if rejected, or return a modified dictionary for passing into litellm
"""
- For a model group
- Check if tpm available
- Raise RateLimitError if no tpm available
- Check if tpm/rpm available
- Raise RateLimitError if no tpm/rpm available
"""
if "model" in data:
available_tpm, model_tpm, active_projects = await self.check_available_tpm(
model=data["model"]
key_priority: Optional[str] = user_api_key_dict.metadata.get(
"priority", None
)
available_tpm, available_rpm, model_tpm, model_rpm, active_projects = (
await self.check_available_usage(
model=data["model"], priority=key_priority
)
)
### CHECK TPM ###
if available_tpm is not None and available_tpm == 0:
raise HTTPException(
status_code=429,
@ -159,7 +229,20 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger):
)
},
)
elif available_tpm is not None:
### CHECK RPM ###
elif available_rpm is not None and available_rpm == 0:
raise HTTPException(
status_code=429,
detail={
"error": "Key={} over available RPM={}. Model RPM={}, Active keys={}".format(
user_api_key_dict.api_key,
available_rpm,
model_rpm,
active_projects,
)
},
)
elif available_rpm is not None or available_tpm is not None:
## UPDATE CACHE WITH ACTIVE PROJECT
asyncio.create_task(
self.internal_usage_cache.async_set_cache_sadd( # this is a set
@ -182,15 +265,24 @@ class _PROXY_DynamicRateLimitHandler(CustomLogger):
), "Model info for model with id={} is None".format(
response._hidden_params["model_id"]
)
available_tpm, remaining_model_tpm, active_projects = (
await self.check_available_tpm(model=model_info["model_name"])
key_priority: Optional[str] = user_api_key_dict.metadata.get(
"priority", None
)
available_tpm, available_rpm, model_tpm, model_rpm, active_projects = (
await self.check_available_usage(
model=model_info["model_name"], priority=key_priority
)
)
response._hidden_params["additional_headers"] = (
{ # Add additional response headers - easier debugging
"x-litellm-model_group": model_info["model_name"],
"x-ratelimit-remaining-litellm-project-tokens": available_tpm,
"x-ratelimit-remaining-litellm-project-requests": available_rpm,
"x-ratelimit-remaining-model-tokens": model_tpm,
"x-ratelimit-remaining-model-requests": model_rpm,
"x-ratelimit-current-active-projects": active_projects,
}
)
response._hidden_params["additional_headers"] = {
"x-litellm-model_group": model_info["model_name"],
"x-ratelimit-remaining-litellm-project-tokens": available_tpm,
"x-ratelimit-remaining-model-tokens": remaining_model_tpm,
"x-ratelimit-current-active-projects": active_projects,
}
return response
return await super().async_post_call_success_hook(

View file

@ -4256,25 +4256,42 @@ class Router:
return model_group_info
async def get_model_group_usage(self, model_group: str) -> Optional[int]:
async def get_model_group_usage(
self, model_group: str
) -> Tuple[Optional[int], Optional[int]]:
"""
Returns remaining tpm quota for model group
Returns remaining tpm/rpm quota for model group
Returns:
- usage: Tuple[tpm, rpm]
"""
dt = get_utc_datetime()
current_minute = dt.strftime(
"%H-%M"
) # use the same timezone regardless of system clock
tpm_keys: List[str] = []
rpm_keys: List[str] = []
for model in self.model_list:
if "model_name" in model and model["model_name"] == model_group:
tpm_keys.append(
f"global_router:{model['model_info']['id']}:tpm:{current_minute}"
)
rpm_keys.append(
f"global_router:{model['model_info']['id']}:rpm:{current_minute}"
)
combined_tpm_rpm_keys = tpm_keys + rpm_keys
combined_tpm_rpm_values = await self.cache.async_batch_get_cache(
keys=combined_tpm_rpm_keys
)
if combined_tpm_rpm_values is None:
return None, None
tpm_usage_list: Optional[List] = combined_tpm_rpm_values[: len(tpm_keys)]
rpm_usage_list: Optional[List] = combined_tpm_rpm_values[len(tpm_keys) :]
## TPM
tpm_usage_list: Optional[List] = await self.cache.async_batch_get_cache(
keys=tpm_keys
)
tpm_usage: Optional[int] = None
if tpm_usage_list is not None:
for t in tpm_usage_list:
@ -4282,8 +4299,15 @@ class Router:
if tpm_usage is None:
tpm_usage = 0
tpm_usage += t
return tpm_usage
## RPM
rpm_usage: Optional[int] = None
if rpm_usage_list is not None:
for t in rpm_usage_list:
if isinstance(t, int):
if rpm_usage is None:
rpm_usage = 0
rpm_usage += t
return tpm_usage, rpm_usage
def get_model_ids(self) -> List[str]:
"""

View file

@ -109,17 +109,56 @@ async def test_available_tpm(num_projects, dynamic_rate_limit_handler):
## CHECK AVAILABLE TPM PER PROJECT
availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm(
model=model
)
resp = await dynamic_rate_limit_handler.check_available_usage(model=model)
availability = resp[0]
expected_availability = int(model_tpm / num_projects)
assert availability == expected_availability
@pytest.mark.parametrize("num_projects", [1, 2, 100])
@pytest.mark.asyncio
async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth):
async def test_available_rpm(num_projects, dynamic_rate_limit_handler):
model = "my-fake-model"
## SET CACHE W/ ACTIVE PROJECTS
projects = [str(uuid.uuid4()) for _ in range(num_projects)]
await dynamic_rate_limit_handler.internal_usage_cache.async_set_cache_sadd(
model=model, value=projects
)
model_rpm = 100
llm_router = Router(
model_list=[
{
"model_name": model,
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "my-key",
"api_base": "my-base",
"rpm": model_rpm,
},
}
]
)
dynamic_rate_limit_handler.update_variables(llm_router=llm_router)
## CHECK AVAILABLE rpm PER PROJECT
resp = await dynamic_rate_limit_handler.check_available_usage(model=model)
availability = resp[1]
expected_availability = int(model_rpm / num_projects)
assert availability == expected_availability
@pytest.mark.parametrize("usage", ["rpm", "tpm"])
@pytest.mark.asyncio
async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth, usage):
"""
Unit test. Tests if rate limit error raised when quota exhausted.
"""
@ -133,7 +172,7 @@ async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth):
model=model, value=projects
)
model_tpm = 0
model_usage = 0
llm_router = Router(
model_list=[
{
@ -142,7 +181,7 @@ async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth):
"model": "gpt-3.5-turbo",
"api_key": "my-key",
"api_base": "my-base",
"tpm": model_tpm,
usage: model_usage,
},
}
]
@ -151,11 +190,14 @@ async def test_rate_limit_raised(dynamic_rate_limit_handler, user_api_key_auth):
## CHECK AVAILABLE TPM PER PROJECT
availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm(
model=model
)
resp = await dynamic_rate_limit_handler.check_available_usage(model=model)
expected_availability = int(model_tpm / 1)
if usage == "tpm":
availability = resp[0]
else:
availability = resp[1]
expected_availability = 0
assert availability == expected_availability
@ -217,9 +259,9 @@ async def test_base_case(dynamic_rate_limit_handler, mock_response):
for _ in range(2):
try:
# check availability
availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm(
model=model
)
resp = await dynamic_rate_limit_handler.check_available_usage(model=model)
availability = resp[0]
print(
"prev_availability={}, availability={}".format(
@ -273,9 +315,9 @@ async def test_update_cache(
dynamic_rate_limit_handler.update_variables(llm_router=llm_router)
## INITIAL ACTIVE PROJECTS - ASSERT NONE
_, _, active_projects = await dynamic_rate_limit_handler.check_available_tpm(
model=model
)
resp = await dynamic_rate_limit_handler.check_available_usage(model=model)
active_projects = resp[-1]
assert active_projects is None
@ -289,9 +331,9 @@ async def test_update_cache(
await asyncio.sleep(2)
## INITIAL ACTIVE PROJECTS - ASSERT 1
_, _, active_projects = await dynamic_rate_limit_handler.check_available_tpm(
model=model
)
resp = await dynamic_rate_limit_handler.check_available_usage(model=model)
active_projects = resp[-1]
assert active_projects == 1
@ -357,9 +399,9 @@ async def test_multiple_projects(
for i in range(expected_runs + 1):
# check availability
availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm(
model=model
)
resp = await dynamic_rate_limit_handler.check_available_usage(model=model)
availability = resp[0]
## assert availability updated
if prev_availability is not None and availability is not None:
@ -389,12 +431,63 @@ async def test_multiple_projects(
await asyncio.sleep(3)
# check availability
availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm(
model=model
)
resp = await dynamic_rate_limit_handler.check_available_usage(model=model)
availability = resp[0]
assert availability == 0
@pytest.mark.parametrize("num_projects", [1, 2, 100])
@pytest.mark.asyncio
async def test_priority_reservation(num_projects, dynamic_rate_limit_handler):
"""
If reservation is set + `mock_testing_reservation` passed in
assert correct rpm is reserved
"""
model = "my-fake-model"
## SET CACHE W/ ACTIVE PROJECTS
projects = [str(uuid.uuid4()) for _ in range(num_projects)]
await dynamic_rate_limit_handler.internal_usage_cache.async_set_cache_sadd(
model=model, value=projects
)
litellm.priority_reservation = {"dev": 0.1, "prod": 0.9}
model_usage = 100
llm_router = Router(
model_list=[
{
"model_name": model,
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "my-key",
"api_base": "my-base",
"rpm": model_usage,
},
}
]
)
dynamic_rate_limit_handler.update_variables(llm_router=llm_router)
## CHECK AVAILABLE TPM PER PROJECT
resp = await dynamic_rate_limit_handler.check_available_usage(
model=model, priority="prod"
)
availability = resp[1]
expected_availability = int(
model_usage * litellm.priority_reservation["prod"] / num_projects
)
assert availability == expected_availability
@pytest.mark.skip(
reason="Unstable on ci/cd due to curr minute changes. Refactor to handle minute changing"
)
@ -456,9 +549,9 @@ async def test_multiple_projects_e2e(
print("expected_runs: {}".format(expected_runs))
for i in range(expected_runs + 1):
# check availability
availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm(
model=model
)
resp = await dynamic_rate_limit_handler.check_available_usage(model=model)
availability = resp[0]
## assert availability updated
if prev_availability is not None and availability is not None:
@ -488,7 +581,7 @@ async def test_multiple_projects_e2e(
await asyncio.sleep(3)
# check availability
availability, _, _ = await dynamic_rate_limit_handler.check_available_tpm(
model=model
)
resp = await dynamic_rate_limit_handler.check_available_usage(model=model)
availability = resp[0]
assert availability == 0