(feat) - provider budget improvements - ensure provider budgets work with multiple proxy instances + improve latency to ~90ms (#6886)

* use 1 file for duration_in_seconds

* add to readme.md

* re use duration_in_seconds

* fix importing _extract_from_regex, get_last_day_of_month

* fix import

* update provider budget routing

* fix - remove dup test

* add support for using in multi instance environments

* test_in_memory_redis_sync_e2e

* test_in_memory_redis_sync_e2e

* fix test_in_memory_redis_sync_e2e

* fix code quality check

* fix test provider budgets

* working provider budget tests

* add fixture for provider budget routing

* fix router testing for provider budgets

* add comments on provider budget routing

* use RedisPipelineIncrementOperation

* add redis async_increment_pipeline

* use redis async_increment_pipeline

* use lower value for testing

* use redis async_increment_pipeline

* use consistent key name for increment op

* add handling for budget windows

* fix typing async_increment_pipeline

* fix set attr

* add clear doc strings

* unit testing for provider budgets

* test_redis_increment_pipeline
This commit is contained in:
Ishaan Jaff 2024-11-24 16:36:19 -08:00 committed by GitHub
parent 34bfebe470
commit c73ce95c01
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 638 additions and 52 deletions

View file

@ -2433,3 +2433,48 @@ async def test_dual_cache_caching_batch_get_cache():
await dc.async_batch_get_cache(keys=["test_key1", "test_key2"])
assert mock_async_get_cache.call_count == 1
@pytest.mark.asyncio
async def test_redis_increment_pipeline():
"""Test Redis increment pipeline functionality"""
try:
from litellm.caching.redis_cache import RedisCache
litellm.set_verbose = True
redis_cache = RedisCache(
host=os.environ["REDIS_HOST"],
port=os.environ["REDIS_PORT"],
password=os.environ["REDIS_PASSWORD"],
)
# Create test increment operations
increment_list = [
{"key": "test_key1", "increment_value": 1.5, "ttl": 60},
{"key": "test_key1", "increment_value": 1.1, "ttl": 58},
{"key": "test_key1", "increment_value": 0.4, "ttl": 55},
{"key": "test_key2", "increment_value": 2.5, "ttl": 60},
]
# Test pipeline increment
results = await redis_cache.async_increment_pipeline(increment_list)
# Verify results
assert len(results) == 8 # 4 increment operations + 4 expire operations
# Verify the values were actually set in Redis
value1 = await redis_cache.async_get_cache("test_key1")
print("result in cache for key=test_key1", value1)
value2 = await redis_cache.async_get_cache("test_key2")
print("result in cache for key=test_key2", value2)
assert float(value1) == 3.0
assert float(value2) == 2.5
# Clean up
await redis_cache.async_delete_cache("test_key1")
await redis_cache.async_delete_cache("test_key2")
except Exception as e:
print(f"Error occurred: {str(e)}")
raise e

View file

@ -17,7 +17,7 @@ from litellm.types.router import (
ProviderBudgetConfigType,
ProviderBudgetInfo,
)
from litellm.caching.caching import DualCache
from litellm.caching.caching import DualCache, RedisCache
import logging
from litellm._logging import verbose_router_logger
import litellm
@ -25,6 +25,27 @@ import litellm
verbose_router_logger.setLevel(logging.DEBUG)
def cleanup_redis():
"""Cleanup Redis cache before each test"""
try:
import redis
print("cleaning up redis..")
redis_client = redis.Redis(
host=os.getenv("REDIS_HOST"),
port=int(os.getenv("REDIS_PORT")),
password=os.getenv("REDIS_PASSWORD"),
)
print("scan iter result", redis_client.scan_iter("provider_spend:*"))
# Delete all provider spend keys
for key in redis_client.scan_iter("provider_spend:*"):
print("deleting key", key)
redis_client.delete(key)
except Exception as e:
print(f"Error cleaning up Redis: {str(e)}")
@pytest.mark.asyncio
async def test_provider_budgets_e2e_test():
"""
@ -34,6 +55,8 @@ async def test_provider_budgets_e2e_test():
- Next 3 requests all go to Azure
"""
cleanup_redis()
# Modify for test
provider_budget_config: ProviderBudgetConfigType = {
"openai": ProviderBudgetInfo(time_period="1d", budget_limit=0.000000000001),
"azure": ProviderBudgetInfo(time_period="1d", budget_limit=100),
@ -71,7 +94,7 @@ async def test_provider_budgets_e2e_test():
)
print(response)
await asyncio.sleep(0.5)
await asyncio.sleep(2.5)
for _ in range(3):
response = await router.acompletion(
@ -94,6 +117,7 @@ async def test_provider_budgets_e2e_test_expect_to_fail():
- first request passes, all subsequent requests fail
"""
cleanup_redis()
# Note: We intentionally use a dictionary with string keys for budget_limit and time_period
# we want to test that the router can handle type conversion, since the proxy config yaml passes these values as a dictionary
@ -125,7 +149,7 @@ async def test_provider_budgets_e2e_test_expect_to_fail():
)
print(response)
await asyncio.sleep(0.5)
await asyncio.sleep(2.5)
for _ in range(3):
with pytest.raises(Exception) as exc_info:
@ -142,11 +166,13 @@ async def test_provider_budgets_e2e_test_expect_to_fail():
assert "Exceeded budget for provider" in str(exc_info.value)
def test_get_llm_provider_for_deployment():
@pytest.mark.asyncio
async def test_get_llm_provider_for_deployment():
"""
Test the _get_llm_provider_for_deployment helper method
"""
cleanup_redis()
provider_budget = ProviderBudgetLimiting(
router_cache=DualCache(), provider_budget_config={}
)
@ -172,11 +198,13 @@ def test_get_llm_provider_for_deployment():
assert provider_budget._get_llm_provider_for_deployment(unknown_deployment) is None
def test_get_budget_config_for_provider():
@pytest.mark.asyncio
async def test_get_budget_config_for_provider():
"""
Test the _get_budget_config_for_provider helper method
"""
cleanup_redis()
config = {
"openai": ProviderBudgetInfo(time_period="1d", budget_limit=100),
"anthropic": ProviderBudgetInfo(time_period="7d", budget_limit=500),
@ -206,6 +234,7 @@ async def test_prometheus_metric_tracking():
"""
Test that the Prometheus metric for provider budget is tracked correctly
"""
cleanup_redis()
from unittest.mock import MagicMock
from litellm.integrations.prometheus import PrometheusLogger
@ -263,7 +292,187 @@ async def test_prometheus_metric_tracking():
except Exception as e:
print("error", e)
await asyncio.sleep(0.5)
await asyncio.sleep(2.5)
# Verify the mock was called correctly
mock_prometheus.track_provider_remaining_budget.assert_called_once()
@pytest.mark.asyncio
async def test_handle_new_budget_window():
"""
Test _handle_new_budget_window helper method
Current
"""
cleanup_redis()
provider_budget = ProviderBudgetLimiting(
router_cache=DualCache(), provider_budget_config={}
)
spend_key = "provider_spend:openai:7d"
start_time_key = "provider_budget_start_time:openai"
current_time = 1000.0
response_cost = 0.5
ttl_seconds = 86400 # 1 day
# Test handling new budget window
new_start_time = await provider_budget._handle_new_budget_window(
spend_key=spend_key,
start_time_key=start_time_key,
current_time=current_time,
response_cost=response_cost,
ttl_seconds=ttl_seconds,
)
assert new_start_time == current_time
# Verify the spend was set correctly
spend = await provider_budget.router_cache.async_get_cache(spend_key)
print("spend in cache for key", spend_key, "is", spend)
assert float(spend) == response_cost
# Verify start time was set correctly
start_time = await provider_budget.router_cache.async_get_cache(start_time_key)
print("start time in cache for key", start_time_key, "is", start_time)
assert float(start_time) == current_time
@pytest.mark.asyncio
async def test_get_or_set_budget_start_time():
"""
Test _get_or_set_budget_start_time helper method
scenario 1: no existing start time in cache, should return current time
scenario 2: existing start time in cache, should return existing start time
"""
cleanup_redis()
provider_budget = ProviderBudgetLimiting(
router_cache=DualCache(), provider_budget_config={}
)
start_time_key = "test_start_time"
current_time = 1000.0
ttl_seconds = 86400 # 1 day
# When there is no existing start time, we should set it to the current time
start_time = await provider_budget._get_or_set_budget_start_time(
start_time_key=start_time_key,
current_time=current_time,
ttl_seconds=ttl_seconds,
)
print("budget start time when no existing start time is in cache", start_time)
assert start_time == current_time
# When there is an existing start time, we should return it even if the current time is later
new_current_time = 2000.0
existing_start_time = await provider_budget._get_or_set_budget_start_time(
start_time_key=start_time_key,
current_time=new_current_time,
ttl_seconds=ttl_seconds,
)
print(
"budget start time when existing start time is in cache, but current time is later",
existing_start_time,
)
assert existing_start_time == current_time # Should return the original start time
@pytest.mark.asyncio
async def test_increment_spend_in_current_window():
"""
Test _increment_spend_in_current_window helper method
Expected behavior:
- Increment the spend in memory cache
- Queue the increment operation to Redis
"""
cleanup_redis()
provider_budget = ProviderBudgetLimiting(
router_cache=DualCache(), provider_budget_config={}
)
spend_key = "provider_spend:openai:1d"
response_cost = 0.5
ttl = 86400 # 1 day
# Set initial spend
await provider_budget.router_cache.async_set_cache(
key=spend_key, value=1.0, ttl=ttl
)
# Test incrementing spend
await provider_budget._increment_spend_in_current_window(
spend_key=spend_key,
response_cost=response_cost,
ttl=ttl,
)
# Verify the spend was incremented correctly in memory
spend = await provider_budget.router_cache.async_get_cache(spend_key)
assert float(spend) == 1.5
# Verify the increment operation was queued for Redis
print(
"redis_increment_operation_queue",
provider_budget.redis_increment_operation_queue,
)
assert len(provider_budget.redis_increment_operation_queue) == 1
queued_op = provider_budget.redis_increment_operation_queue[0]
assert queued_op["key"] == spend_key
assert queued_op["increment_value"] == response_cost
assert queued_op["ttl"] == ttl
@pytest.mark.asyncio
async def test_sync_in_memory_spend_with_redis():
"""
Test _sync_in_memory_spend_with_redis helper method
Expected behavior:
- Push all provider spend increments to Redis
- Fetch all current provider spend from Redis to update in-memory cache
"""
cleanup_redis()
provider_budget_config = {
"openai": ProviderBudgetInfo(time_period="1d", budget_limit=100),
"anthropic": ProviderBudgetInfo(time_period="1d", budget_limit=200),
}
provider_budget = ProviderBudgetLimiting(
router_cache=DualCache(
redis_cache=RedisCache(
host=os.getenv("REDIS_HOST"),
port=int(os.getenv("REDIS_PORT")),
password=os.getenv("REDIS_PASSWORD"),
)
),
provider_budget_config=provider_budget_config,
)
# Set some values in Redis
spend_key_openai = "provider_spend:openai:1d"
spend_key_anthropic = "provider_spend:anthropic:1d"
await provider_budget.router_cache.redis_cache.async_set_cache(
key=spend_key_openai, value=50.0
)
await provider_budget.router_cache.redis_cache.async_set_cache(
key=spend_key_anthropic, value=75.0
)
# Test syncing with Redis
await provider_budget._sync_in_memory_spend_with_redis()
# Verify in-memory cache was updated
openai_spend = await provider_budget.router_cache.in_memory_cache.async_get_cache(
spend_key_openai
)
anthropic_spend = (
await provider_budget.router_cache.in_memory_cache.async_get_cache(
spend_key_anthropic
)
)
assert float(openai_spend) == 50.0
assert float(anthropic_spend) == 75.0