mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 18:54:30 +00:00
193 lines
7.8 KiB
Python
193 lines
7.8 KiB
Python
import json
|
|
from typing import List, Optional
|
|
|
|
import litellm
|
|
from litellm._logging import verbose_proxy_logger
|
|
from litellm.caching.caching import DualCache
|
|
from litellm.integrations.custom_logger import Span
|
|
from litellm.proxy._types import UserAPIKeyAuth
|
|
from litellm.router_strategy.budget_limiter import RouterBudgetLimiting
|
|
from litellm.types.llms.openai import AllMessageValues
|
|
from litellm.types.utils import (
|
|
BudgetConfig,
|
|
GenericBudgetConfigType,
|
|
StandardLoggingPayload,
|
|
)
|
|
|
|
VIRTUAL_KEY_SPEND_CACHE_KEY_PREFIX = "virtual_key_spend"
|
|
|
|
|
|
class _PROXY_VirtualKeyModelMaxBudgetLimiter(RouterBudgetLimiting):
|
|
"""
|
|
Handles budgets for model + virtual key
|
|
|
|
Example: key=sk-1234567890, model=gpt-4o, max_budget=100, time_period=1d
|
|
"""
|
|
|
|
def __init__(self, dual_cache: DualCache):
|
|
self.dual_cache = dual_cache
|
|
self.redis_increment_operation_queue = []
|
|
|
|
async def is_key_within_model_budget(
|
|
self,
|
|
user_api_key_dict: UserAPIKeyAuth,
|
|
model: str,
|
|
) -> bool:
|
|
"""
|
|
Check if the user_api_key_dict is within the model budget
|
|
|
|
Raises:
|
|
BudgetExceededError: If the user_api_key_dict has exceeded the model budget
|
|
"""
|
|
_model_max_budget = user_api_key_dict.model_max_budget
|
|
internal_model_max_budget: GenericBudgetConfigType = {}
|
|
|
|
for _model, _budget_info in _model_max_budget.items():
|
|
internal_model_max_budget[_model] = BudgetConfig(**_budget_info)
|
|
|
|
verbose_proxy_logger.debug(
|
|
"internal_model_max_budget %s",
|
|
json.dumps(internal_model_max_budget, indent=4, default=str),
|
|
)
|
|
|
|
# check if current model is in internal_model_max_budget
|
|
_current_model_budget_info = self._get_request_model_budget_config(
|
|
model=model, internal_model_max_budget=internal_model_max_budget
|
|
)
|
|
if _current_model_budget_info is None:
|
|
verbose_proxy_logger.debug(
|
|
f"Model {model} not found in internal_model_max_budget"
|
|
)
|
|
return True
|
|
|
|
# check if current model is within budget
|
|
if (
|
|
_current_model_budget_info.max_budget
|
|
and _current_model_budget_info.max_budget > 0
|
|
):
|
|
_current_spend = await self._get_virtual_key_spend_for_model(
|
|
user_api_key_hash=user_api_key_dict.token,
|
|
model=model,
|
|
key_budget_config=_current_model_budget_info,
|
|
)
|
|
if (
|
|
_current_spend is not None
|
|
and _current_model_budget_info.max_budget is not None
|
|
and _current_spend > _current_model_budget_info.max_budget
|
|
):
|
|
raise litellm.BudgetExceededError(
|
|
message=f"LiteLLM Virtual Key: {user_api_key_dict.token}, key_alias: {user_api_key_dict.key_alias}, exceeded budget for model={model}",
|
|
current_cost=_current_spend,
|
|
max_budget=_current_model_budget_info.max_budget,
|
|
budget_reset_at=_current_model_budget_info.budget_reset_at,
|
|
)
|
|
|
|
return True
|
|
|
|
async def _get_virtual_key_spend_for_model(
|
|
self,
|
|
user_api_key_hash: Optional[str],
|
|
model: str,
|
|
key_budget_config: BudgetConfig,
|
|
) -> Optional[float]:
|
|
"""
|
|
Get the current spend for a virtual key for a model
|
|
|
|
Lookup model in this order:
|
|
1. model: directly look up `model`
|
|
2. If 1, does not exist, check if passed as {custom_llm_provider}/model
|
|
"""
|
|
|
|
# 1. model: directly look up `model`
|
|
virtual_key_model_spend_cache_key = f"{VIRTUAL_KEY_SPEND_CACHE_KEY_PREFIX}:{user_api_key_hash}:{model}:{key_budget_config.budget_duration}"
|
|
_current_spend = await self.dual_cache.async_get_cache(
|
|
key=virtual_key_model_spend_cache_key,
|
|
)
|
|
|
|
if _current_spend is None:
|
|
# 2. If 1, does not exist, check if passed as {custom_llm_provider}/model
|
|
# if "/" in model, remove first part before "/" - eg. openai/o1-preview -> o1-preview
|
|
virtual_key_model_spend_cache_key = f"{VIRTUAL_KEY_SPEND_CACHE_KEY_PREFIX}:{user_api_key_hash}:{self._get_model_without_custom_llm_provider(model)}:{key_budget_config.budget_duration}"
|
|
_current_spend = await self.dual_cache.async_get_cache(
|
|
key=virtual_key_model_spend_cache_key,
|
|
)
|
|
return _current_spend
|
|
|
|
def _get_request_model_budget_config(
|
|
self, model: str, internal_model_max_budget: GenericBudgetConfigType
|
|
) -> Optional[BudgetConfig]:
|
|
"""
|
|
Get the budget config for the request model
|
|
|
|
1. Check if `model` is in `internal_model_max_budget`
|
|
2. If not, check if `model` without custom llm provider is in `internal_model_max_budget`
|
|
"""
|
|
return internal_model_max_budget.get(
|
|
model, None
|
|
) or internal_model_max_budget.get(
|
|
self._get_model_without_custom_llm_provider(model), None
|
|
)
|
|
|
|
def _get_model_without_custom_llm_provider(self, model: str) -> str:
|
|
if "/" in model:
|
|
return model.split("/")[-1]
|
|
return model
|
|
|
|
async def async_filter_deployments(
|
|
self,
|
|
model: str,
|
|
healthy_deployments: List,
|
|
messages: Optional[List[AllMessageValues]],
|
|
request_kwargs: Optional[dict] = None,
|
|
parent_otel_span: Optional[Span] = None, # type: ignore
|
|
) -> List[dict]:
|
|
return healthy_deployments
|
|
|
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
|
|
"""
|
|
Track spend for virtual key + model in DualCache
|
|
|
|
Example: key=sk-1234567890, model=gpt-4o, max_budget=100, time_period=1d
|
|
"""
|
|
verbose_proxy_logger.debug("in RouterBudgetLimiting.async_log_success_event")
|
|
standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
|
|
"standard_logging_object", None
|
|
)
|
|
if standard_logging_payload is None:
|
|
raise ValueError("standard_logging_payload is required")
|
|
|
|
_litellm_params: dict = kwargs.get("litellm_params", {}) or {}
|
|
_metadata: dict = _litellm_params.get("metadata", {}) or {}
|
|
user_api_key_model_max_budget: Optional[dict] = _metadata.get(
|
|
"user_api_key_model_max_budget", None
|
|
)
|
|
if (
|
|
user_api_key_model_max_budget is None
|
|
or len(user_api_key_model_max_budget) == 0
|
|
):
|
|
verbose_proxy_logger.debug(
|
|
"Not running _PROXY_VirtualKeyModelMaxBudgetLimiter.async_log_success_event because user_api_key_model_max_budget is None or empty. `user_api_key_model_max_budget`=%s",
|
|
user_api_key_model_max_budget,
|
|
)
|
|
return
|
|
response_cost: float = standard_logging_payload.get("response_cost", 0)
|
|
model = standard_logging_payload.get("model")
|
|
|
|
virtual_key = standard_logging_payload.get("metadata").get("user_api_key_hash")
|
|
model = standard_logging_payload.get("model")
|
|
if virtual_key is not None:
|
|
budget_config = BudgetConfig(time_period="1d", budget_limit=0.1)
|
|
virtual_spend_key = f"{VIRTUAL_KEY_SPEND_CACHE_KEY_PREFIX}:{virtual_key}:{model}:{budget_config.budget_duration}"
|
|
virtual_start_time_key = f"virtual_key_budget_start_time:{virtual_key}"
|
|
await self._increment_spend_for_key(
|
|
budget_config=budget_config,
|
|
spend_key=virtual_spend_key,
|
|
start_time_key=virtual_start_time_key,
|
|
response_cost=response_cost,
|
|
)
|
|
verbose_proxy_logger.debug(
|
|
"current state of in memory cache %s",
|
|
json.dumps(
|
|
self.dual_cache.in_memory_cache.cache_dict, indent=4, default=str
|
|
),
|
|
)
|