diff --git a/docs/my-website/docs/proxy/provider_budget_routing.md b/docs/my-website/docs/proxy/provider_budget_routing.md new file mode 100644 index 000000000..a945ef89a --- /dev/null +++ b/docs/my-website/docs/proxy/provider_budget_routing.md @@ -0,0 +1,64 @@ +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +# Provider Budget Routing +Use this to set budgets for LLM Providers - example $100/day for OpenAI, $100/day for Azure. + +```yaml +model_list: + - model_name: gpt-3.5-turbo + litellm_params: + model: openai/gpt-3.5-turbo + api_key: os.environ/OPENAI_API_KEY + - model_name: gpt-3.5-turbo + litellm_params: + model: azure/chatgpt-functioncalling + api_key: os.environ/AZURE_API_KEY + api_version: os.environ/AZURE_API_VERSION + api_base: os.environ/AZURE_API_BASE + +router_settings: + redis_host: + redis_password: + redis_port: + provider_budget_config: + openai: + budget_limit: 0.000000000001 # float of $ value budget for time period + time_period: 1d # can be 1d, 2d, 30d + azure: + budget_limit: 100 + time_period: 1d + anthropic: + budget_limit: 100 + time_period: 10d + vertexai: + budget_limit: 100 + time_period: 12d + gemini: + budget_limit: 100 + time_period: 12d + +general_settings: + master_key: sk-1234 +``` + + +#### How provider-budget-routing works + +1. **Budget Tracking**: + - Uses Redis to track spend for each provider + - Tracks spend over specified time periods (e.g., "1d", "30d") + - Automatically resets spend after time period expires + +2. **Routing Logic**: + - Routes requests to providers under their budget limits + - Skips providers that have exceeded their budget + - If all providers exceed budget, raises an error + +3. **Supported Time Periods**: + - Format: "Xd" where X is number of days + - Examples: "1d" (1 day), "30d" (30 days) + +4. **Requirements**: + - Redis required for tracking spend across instances + - Provider names must be litellm provider names. See [Supported Providers](https://docs.litellm.ai/docs/providers) diff --git a/docs/my-website/sidebars.js b/docs/my-website/sidebars.js index 107a877da..50cc83c08 100644 --- a/docs/my-website/sidebars.js +++ b/docs/my-website/sidebars.js @@ -100,7 +100,7 @@ const sidebars = { { type: "category", label: "Routing", - items: ["proxy/load_balancing", "proxy/tag_routing", "proxy/team_based_routing", "proxy/customer_routing",], + items: ["proxy/load_balancing", "proxy/tag_routing", "proxy/provider_budget_routing", "proxy/team_based_routing", "proxy/customer_routing",], }, { type: "category", diff --git a/litellm/router.py b/litellm/router.py index 97065bc85..f724c96c4 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -59,6 +59,7 @@ from litellm.router_strategy.lowest_cost import LowestCostLoggingHandler from litellm.router_strategy.lowest_latency import LowestLatencyLoggingHandler from litellm.router_strategy.lowest_tpm_rpm import LowestTPMLoggingHandler from litellm.router_strategy.lowest_tpm_rpm_v2 import LowestTPMLoggingHandler_v2 +from litellm.router_strategy.provider_budgets import ProviderBudgetLimiting from litellm.router_strategy.simple_shuffle import simple_shuffle from litellm.router_strategy.tag_based_routing import get_deployments_for_tag from litellm.router_utils.batch_utils import ( @@ -119,6 +120,7 @@ from litellm.types.router import ( LiteLLMParamsTypedDict, ModelGroupInfo, ModelInfo, + ProviderBudgetConfigType, RetryPolicy, RouterErrors, RouterGeneralSettings, @@ -235,7 +237,8 @@ class Router: "cost-based-routing", "usage-based-routing-v2", ] = "simple-shuffle", - routing_strategy_args: dict = {}, # just for latency-based routing + routing_strategy_args: dict = {}, # just for latency-based + provider_budget_config: Optional[ProviderBudgetConfigType] = None, semaphore: Optional[asyncio.Semaphore] = None, alerting_config: Optional[AlertingConfig] = None, router_general_settings: Optional[ @@ -272,6 +275,7 @@ class Router: routing_strategy (Literal["simple-shuffle", "least-busy", "usage-based-routing", "latency-based-routing", "cost-based-routing"]): Routing strategy. Defaults to "simple-shuffle". routing_strategy_args (dict): Additional args for latency-based routing. Defaults to {}. alerting_config (AlertingConfig): Slack alerting configuration. Defaults to None. + provider_budget_config (ProviderBudgetConfig): Provider budget configuration. Use this to set llm_provider budget limits. example $100/day to OpenAI, $100/day to Azure, etc. Defaults to None. Returns: Router: An instance of the litellm.Router class. @@ -517,6 +521,12 @@ class Router: ) self.service_logger_obj = ServiceLogging() self.routing_strategy_args = routing_strategy_args + self.provider_budget_config = provider_budget_config + if self.provider_budget_config is not None: + self.provider_budget_logger = ProviderBudgetLimiting( + router_cache=self.cache, + provider_budget_config=self.provider_budget_config, + ) self.retry_policy: Optional[RetryPolicy] = None if retry_policy is not None: if isinstance(retry_policy, dict): @@ -5109,6 +5119,14 @@ class Router: healthy_deployments=healthy_deployments, ) + if self.provider_budget_config is not None: + healthy_deployments = ( + await self.provider_budget_logger.async_filter_deployments( + healthy_deployments=healthy_deployments, + request_kwargs=request_kwargs, + ) + ) + if len(healthy_deployments) == 0: exception = await async_raise_no_deployment_exception( litellm_router_instance=self, diff --git a/litellm/router_strategy/provider_budgets.py b/litellm/router_strategy/provider_budgets.py new file mode 100644 index 000000000..c1805fea9 --- /dev/null +++ b/litellm/router_strategy/provider_budgets.py @@ -0,0 +1,219 @@ +""" +Provider budget limiting + +Use this if you want to set $ budget limits for each provider. + +Note: This is a filter, like tag-routing. Meaning it will accept healthy deployments and then filter out deployments that have exceeded their budget limit. + +This means you can use this with weighted-pick, lowest-latency, simple-shuffle, routing etc + +Example: +``` +openai: + budget_limit: 0.000000000001 + time_period: 1d +anthropic: + budget_limit: 100 + time_period: 7d +``` +""" + +from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypedDict, Union + +import litellm +from litellm._logging import verbose_router_logger +from litellm.caching.caching import DualCache +from litellm.integrations.custom_logger import CustomLogger +from litellm.litellm_core_utils.core_helpers import _get_parent_otel_span_from_kwargs +from litellm.types.router import ( + LiteLLM_Params, + ProviderBudgetConfigType, + ProviderBudgetInfo, +) +from litellm.types.utils import StandardLoggingPayload + +if TYPE_CHECKING: + from opentelemetry.trace import Span as _Span + + Span = _Span +else: + Span = Any + + +class ProviderBudgetLimiting(CustomLogger): + def __init__(self, router_cache: DualCache, provider_budget_config: dict): + self.router_cache = router_cache + self.provider_budget_config: ProviderBudgetConfigType = provider_budget_config + verbose_router_logger.debug( + f"Initalized Provider budget config: {self.provider_budget_config}" + ) + + # Add self to litellm callbacks if it's a list + if isinstance(litellm.callbacks, list): + litellm.callbacks.append(self) # type: ignore + + async def async_filter_deployments( + self, + healthy_deployments: Union[List[Dict[str, Any]], Dict[str, Any]], + request_kwargs: Optional[Dict] = None, + ): + """ + Filter out deployments that have exceeded their provider budget limit. + + + Example: + if deployment = openai/gpt-3.5-turbo + and openai spend > openai budget limit + then skip this deployment + """ + + # If a single deployment is passed, convert it to a list + if isinstance(healthy_deployments, dict): + healthy_deployments = [healthy_deployments] + + potential_deployments: List[Dict] = [] + + # Extract the parent OpenTelemetry span for tracing + parent_otel_span: Optional[Span] = _get_parent_otel_span_from_kwargs( + request_kwargs + ) + + # Collect all providers and their budget configs + # {"openai": ProviderBudgetInfo, "anthropic": ProviderBudgetInfo, "azure": None} + _provider_configs: Dict[str, Optional[ProviderBudgetInfo]] = {} + for deployment in healthy_deployments: + provider = self._get_llm_provider_for_deployment(deployment) + if provider is None: + continue + budget_config = self._get_budget_config_for_provider(provider) + _provider_configs[provider] = budget_config + + # Filter out providers without budget config + provider_configs: Dict[str, ProviderBudgetInfo] = { + provider: config + for provider, config in _provider_configs.items() + if config is not None + } + + # Build cache keys for batch retrieval + cache_keys = [] + for provider, config in provider_configs.items(): + cache_keys.append(f"provider_spend:{provider}:{config.time_period}") + + # Fetch current spend for all providers using batch cache + _current_spends = await self.router_cache.async_batch_get_cache( + keys=cache_keys, + parent_otel_span=parent_otel_span, + ) + current_spends: List = _current_spends or [0.0] * len(provider_configs) + + # Map providers to their current spend values + provider_spend_map: Dict[str, float] = {} + for idx, provider in enumerate(provider_configs.keys()): + provider_spend_map[provider] = float(current_spends[idx] or 0.0) + + # Filter healthy deployments based on budget constraints + for deployment in healthy_deployments: + provider = self._get_llm_provider_for_deployment(deployment) + if provider is None: + continue + budget_config = provider_configs.get(provider) + + if not budget_config: + continue + + current_spend = provider_spend_map.get(provider, 0.0) + budget_limit = budget_config.budget_limit + + verbose_router_logger.debug( + f"Current spend for {provider}: {current_spend}, budget limit: {budget_limit}" + ) + + if current_spend >= budget_limit: + verbose_router_logger.debug( + f"Skipping deployment {deployment} for provider {provider} as spend limit exceeded" + ) + continue + + potential_deployments.append(deployment) + + return potential_deployments + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + """ + Increment provider spend in DualCache (InMemory + Redis) + + Handles saving current provider spend to Redis. + + Spend is stored as: + provider_spend:{provider}:{time_period} + ex. provider_spend:openai:1d + ex. provider_spend:anthropic:7d + + The time period is tracked for time_periods set in the provider budget config. + """ + verbose_router_logger.debug("in ProviderBudgetLimiting.async_log_success_event") + standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get( + "standard_logging_object", None + ) + if standard_logging_payload is None: + raise ValueError("standard_logging_payload is required") + + response_cost: float = standard_logging_payload.get("response_cost", 0) + + custom_llm_provider: str = kwargs.get("litellm_params", {}).get( + "custom_llm_provider", None + ) + if custom_llm_provider is None: + raise ValueError("custom_llm_provider is required") + + budget_config = self._get_budget_config_for_provider(custom_llm_provider) + if budget_config is None: + raise ValueError( + f"No budget config found for provider {custom_llm_provider}, self.provider_budget_config: {self.provider_budget_config}" + ) + + spend_key = f"provider_spend:{custom_llm_provider}:{budget_config.time_period}" + ttl_seconds = self.get_ttl_seconds(budget_config.time_period) + verbose_router_logger.debug( + f"Incrementing spend for {spend_key} by {response_cost}, ttl: {ttl_seconds}" + ) + # Increment the spend in Redis and set TTL + await self.router_cache.async_increment_cache( + key=spend_key, + value=response_cost, + ttl=ttl_seconds, + ) + verbose_router_logger.debug( + f"Incremented spend for {spend_key} by {response_cost}, ttl: {ttl_seconds}" + ) + + def _get_budget_config_for_provider( + self, provider: str + ) -> Optional[ProviderBudgetInfo]: + return self.provider_budget_config.get(provider, None) + + def _get_llm_provider_for_deployment(self, deployment: Dict) -> Optional[str]: + try: + _litellm_params: LiteLLM_Params = LiteLLM_Params( + **deployment.get("litellm_params", {"model": ""}) + ) + _, custom_llm_provider, _, _ = litellm.get_llm_provider( + model=_litellm_params.model, + litellm_params=_litellm_params, + ) + except Exception: + verbose_router_logger.error( + f"Error getting LLM provider for deployment: {deployment}" + ) + return None + return custom_llm_provider + + def get_ttl_seconds(self, time_period: str) -> int: + """ + Convert time period (e.g., '1d', '30d') to seconds for Redis TTL + """ + if time_period.endswith("d"): + days = int(time_period[:-1]) + return days * 24 * 60 * 60 + raise ValueError(f"Unsupported time period format: {time_period}") diff --git a/litellm/types/router.py b/litellm/types/router.py index bb93aaa63..f4d2b39ed 100644 --- a/litellm/types/router.py +++ b/litellm/types/router.py @@ -628,3 +628,12 @@ class RoutingStrategy(enum.Enum): COST_BASED = "cost-based-routing" USAGE_BASED_ROUTING_V2 = "usage-based-routing-v2" USAGE_BASED_ROUTING = "usage-based-routing" + PROVIDER_BUDGET_LIMITING = "provider-budget-routing" + + +class ProviderBudgetInfo(BaseModel): + time_period: str # e.g., '1d', '30d' + budget_limit: float + + +ProviderBudgetConfigType = Dict[str, ProviderBudgetInfo] diff --git a/tests/local_testing/test_provider_budgets.py b/tests/local_testing/test_provider_budgets.py new file mode 100644 index 000000000..5e685cae6 --- /dev/null +++ b/tests/local_testing/test_provider_budgets.py @@ -0,0 +1,209 @@ +import sys, os, asyncio, time, random +from datetime import datetime +import traceback +from dotenv import load_dotenv + +load_dotenv() +import os, copy + +sys.path.insert( + 0, os.path.abspath("../..") +) # Adds the parent directory to the system path +import pytest +from litellm import Router +from litellm.router_strategy.provider_budgets import ProviderBudgetLimiting +from litellm.types.router import ( + RoutingStrategy, + ProviderBudgetConfigType, + ProviderBudgetInfo, +) +from litellm.caching.caching import DualCache +import logging +from litellm._logging import verbose_router_logger + +verbose_router_logger.setLevel(logging.DEBUG) + + +@pytest.mark.asyncio +async def test_provider_budgets_e2e_test(): + """ + Expected behavior: + - First request forced to OpenAI + - Hit OpenAI budget limit + - Next 3 requests all go to Azure + + """ + provider_budget_config: ProviderBudgetConfigType = { + "openai": ProviderBudgetInfo(time_period="1d", budget_limit=0.000000000001), + "azure": ProviderBudgetInfo(time_period="1d", budget_limit=100), + } + + router = Router( + model_list=[ + { + "model_name": "gpt-3.5-turbo", # openai model name + "litellm_params": { # params for litellm completion/embedding call + "model": "azure/chatgpt-v-2", + "api_key": os.getenv("AZURE_API_KEY"), + "api_version": os.getenv("AZURE_API_VERSION"), + "api_base": os.getenv("AZURE_API_BASE"), + }, + "model_info": {"id": "azure-model-id"}, + }, + { + "model_name": "gpt-3.5-turbo", # openai model name + "litellm_params": { + "model": "openai/gpt-4o-mini", + }, + "model_info": {"id": "openai-model-id"}, + }, + ], + provider_budget_config=provider_budget_config, + redis_host=os.getenv("REDIS_HOST"), + redis_port=int(os.getenv("REDIS_PORT")), + redis_password=os.getenv("REDIS_PASSWORD"), + ) + + response = await router.acompletion( + messages=[{"role": "user", "content": "Hello, how are you?"}], + model="openai/gpt-4o-mini", + ) + print(response) + + await asyncio.sleep(0.5) + + for _ in range(3): + response = await router.acompletion( + messages=[{"role": "user", "content": "Hello, how are you?"}], + model="gpt-3.5-turbo", + ) + print(response) + + print("response.hidden_params", response._hidden_params) + + await asyncio.sleep(0.5) + + assert response._hidden_params.get("custom_llm_provider") == "azure" + + +@pytest.mark.asyncio +async def test_provider_budgets_e2e_test_expect_to_fail(): + """ + Expected behavior: + - first request passes, all subsequent requests fail + + """ + provider_budget_config: ProviderBudgetConfigType = { + "anthropic": ProviderBudgetInfo(time_period="1d", budget_limit=0.000000000001), + } + + router = Router( + model_list=[ + { + "model_name": "anthropic/*", # openai model name + "litellm_params": { + "model": "anthropic/*", + }, + }, + ], + redis_host=os.getenv("REDIS_HOST"), + redis_port=int(os.getenv("REDIS_PORT")), + redis_password=os.getenv("REDIS_PASSWORD"), + provider_budget_config=provider_budget_config, + ) + + response = await router.acompletion( + messages=[{"role": "user", "content": "Hello, how are you?"}], + model="anthropic/claude-3-5-sonnet-20240620", + ) + print(response) + + await asyncio.sleep(0.5) + + for _ in range(3): + with pytest.raises(Exception) as exc_info: + response = await router.acompletion( + messages=[{"role": "user", "content": "Hello, how are you?"}], + model="anthropic/claude-3-5-sonnet-20240620", + ) + print(response) + print("response.hidden_params", response._hidden_params) + + await asyncio.sleep(0.5) + # Verify the error is related to budget exceeded + + +def test_get_ttl_seconds(): + """ + Test the get_ttl_seconds helper method" + + """ + provider_budget = ProviderBudgetLimiting( + router_cache=DualCache(), provider_budget_config={} + ) + + assert provider_budget.get_ttl_seconds("1d") == 86400 # 1 day in seconds + assert provider_budget.get_ttl_seconds("7d") == 604800 # 7 days in seconds + assert provider_budget.get_ttl_seconds("30d") == 2592000 # 30 days in seconds + + with pytest.raises(ValueError, match="Unsupported time period format"): + provider_budget.get_ttl_seconds("1h") + + +def test_get_llm_provider_for_deployment(): + """ + Test the _get_llm_provider_for_deployment helper method + + """ + provider_budget = ProviderBudgetLimiting( + router_cache=DualCache(), provider_budget_config={} + ) + + # Test OpenAI deployment + openai_deployment = {"litellm_params": {"model": "openai/gpt-4"}} + assert ( + provider_budget._get_llm_provider_for_deployment(openai_deployment) == "openai" + ) + + # Test Azure deployment + azure_deployment = { + "litellm_params": { + "model": "azure/gpt-4", + "api_key": "test", + "api_base": "test", + } + } + assert provider_budget._get_llm_provider_for_deployment(azure_deployment) == "azure" + + # should not raise error for unknown deployment + unknown_deployment = {} + assert provider_budget._get_llm_provider_for_deployment(unknown_deployment) is None + + +def test_get_budget_config_for_provider(): + """ + Test the _get_budget_config_for_provider helper method + + """ + config = { + "openai": ProviderBudgetInfo(time_period="1d", budget_limit=100), + "anthropic": ProviderBudgetInfo(time_period="7d", budget_limit=500), + } + + provider_budget = ProviderBudgetLimiting( + router_cache=DualCache(), provider_budget_config=config + ) + + # Test existing providers + openai_config = provider_budget._get_budget_config_for_provider("openai") + assert openai_config is not None + assert openai_config.time_period == "1d" + assert openai_config.budget_limit == 100 + + anthropic_config = provider_budget._get_budget_config_for_provider("anthropic") + assert anthropic_config is not None + assert anthropic_config.time_period == "7d" + assert anthropic_config.budget_limit == 500 + + # Test non-existent provider + assert provider_budget._get_budget_config_for_provider("unknown") is None