litellm/tests/router_unit_tests/test_router_cooldown_utils.py
Krish Dholakia 44e7ffd05c
(perf) Litellm redis router fix - ~100ms improvement (#6483)
* docs(exception_mapping.md): add missing exception types

Fixes https://github.com/Aider-AI/aider/issues/2120#issuecomment-2438971183

* fix(main.py): register custom model pricing with specific key

Ensure custom model pricing is registered to the specific model+provider key combination

* test: make testing more robust for custom pricing

* fix(redis_cache.py): instrument otel logging for sync redis calls

ensures complete coverage for all redis cache calls

* refactor: pass parent_otel_span for redis caching calls in router

allows for more observability into what calls are causing latency issues

* test: update tests with new params

* refactor: ensure e2e otel tracing for router

* refactor(router.py): add more otel tracing acrosss router

catch all latency issues for router requests

* fix: fix linting error

* fix(router.py): fix linting error

* fix: fix test

* test: fix tests

* fix(dual_cache.py): pass ttl to redis cache

* fix: fix param

* perf(cooldown_cache.py): improve cooldown cache, to store cache results in memory for 5s, prevents redis call from being made on each request

reduces 100ms latency per call with caching enabled on router

* fix: fix test

* fix(cooldown_cache.py): handle if a result is None

* fix(cooldown_cache.py): add debug statements

* refactor(dual_cache.py): move to using an in-memory check for batch get cache, to prevent redis from being hit for every call

* fix(cooldown_cache.py): fix linting erropr
2024-10-29 13:58:29 -07:00

396 lines
12 KiB
Python

import sys, os, time
import traceback, asyncio
import pytest
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import litellm
from litellm import Router
from litellm.router import Deployment, LiteLLM_Params, ModelInfo
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
from dotenv import load_dotenv
from unittest.mock import AsyncMock, MagicMock, patch
from litellm.integrations.prometheus import PrometheusLogger
from litellm.router_utils.cooldown_callbacks import router_cooldown_event_callback
from litellm.router_utils.cooldown_handlers import (
_should_run_cooldown_logic,
_should_cooldown_deployment,
cast_exception_status_to_int,
)
from litellm.router_utils.router_callbacks.track_deployment_metrics import (
increment_deployment_failures_for_current_minute,
increment_deployment_successes_for_current_minute,
)
load_dotenv()
class CustomPrometheusLogger(PrometheusLogger):
def __init__(self):
super().__init__()
self.deployment_complete_outages = []
self.deployment_cooled_downs = []
def set_deployment_complete_outage(
self,
litellm_model_name: str,
model_id: str,
api_base: str,
api_provider: str,
):
self.deployment_complete_outages.append(
[litellm_model_name, model_id, api_base, api_provider]
)
def increment_deployment_cooled_down(
self,
litellm_model_name: str,
model_id: str,
api_base: str,
api_provider: str,
exception_status: str,
):
self.deployment_cooled_downs.append(
[litellm_model_name, model_id, api_base, api_provider, exception_status]
)
@pytest.mark.asyncio
async def test_router_cooldown_event_callback():
"""
Test the router_cooldown_event_callback function
Ensures that the router_cooldown_event_callback function correctly logs the cooldown event to the PrometheusLogger
"""
# Mock Router instance
mock_router = MagicMock()
mock_deployment = {
"litellm_params": {"model": "gpt-3.5-turbo"},
"model_name": "gpt-3.5-turbo",
"model_info": ModelInfo(id="test-model-id"),
}
mock_router.get_deployment.return_value = mock_deployment
# Create a real PrometheusLogger instance
prometheus_logger = CustomPrometheusLogger()
litellm.callbacks = [prometheus_logger]
await router_cooldown_event_callback(
litellm_router_instance=mock_router,
deployment_id="test-deployment",
exception_status="429",
cooldown_time=60.0,
)
await asyncio.sleep(0.5)
# Assert that the router's get_deployment method was called
mock_router.get_deployment.assert_called_once_with(model_id="test-deployment")
print(
"prometheus_logger.deployment_complete_outages",
prometheus_logger.deployment_complete_outages,
)
print(
"prometheus_logger.deployment_cooled_downs",
prometheus_logger.deployment_cooled_downs,
)
# Assert that PrometheusLogger methods were called
assert len(prometheus_logger.deployment_complete_outages) == 1
assert len(prometheus_logger.deployment_cooled_downs) == 1
assert prometheus_logger.deployment_complete_outages[0] == [
"gpt-3.5-turbo",
"test-model-id",
"https://api.openai.com",
"openai",
]
assert prometheus_logger.deployment_cooled_downs[0] == [
"gpt-3.5-turbo",
"test-model-id",
"https://api.openai.com",
"openai",
"429",
]
@pytest.mark.asyncio
async def test_router_cooldown_event_callback_no_prometheus():
"""
Test the router_cooldown_event_callback function
Ensures that the router_cooldown_event_callback function does not raise an error when no PrometheusLogger is found
"""
# Mock Router instance
mock_router = MagicMock()
mock_deployment = {
"litellm_params": {"model": "gpt-3.5-turbo"},
"model_name": "gpt-3.5-turbo",
"model_info": ModelInfo(id="test-model-id"),
}
mock_router.get_deployment.return_value = mock_deployment
await router_cooldown_event_callback(
litellm_router_instance=mock_router,
deployment_id="test-deployment",
exception_status="429",
cooldown_time=60.0,
)
# Assert that the router's get_deployment method was called
mock_router.get_deployment.assert_called_once_with(model_id="test-deployment")
@pytest.mark.asyncio
async def test_router_cooldown_event_callback_no_deployment():
"""
Test the router_cooldown_event_callback function
Ensures that the router_cooldown_event_callback function does not raise an error when no deployment is found
In this scenario it should do nothing
"""
# Mock Router instance
mock_router = MagicMock()
mock_router.get_deployment.return_value = None
await router_cooldown_event_callback(
litellm_router_instance=mock_router,
deployment_id="test-deployment",
exception_status="429",
cooldown_time=60.0,
)
# Assert that the router's get_deployment method was called
mock_router.get_deployment.assert_called_once_with(model_id="test-deployment")
@pytest.fixture
def testing_litellm_router():
return Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "gpt-3.5-turbo"},
"model_id": "test_deployment",
},
{
"model_name": "test_deployment",
"litellm_params": {"model": "openai/test_deployment"},
"model_id": "test_deployment_2",
},
]
)
def test_should_run_cooldown_logic(testing_litellm_router):
testing_litellm_router.disable_cooldowns = True
# don't run cooldown logic if disable_cooldowns is True
assert (
_should_run_cooldown_logic(
testing_litellm_router, "test_deployment", 500, Exception("Test")
)
is False
)
# don't cooldown if deployment is None
testing_litellm_router.disable_cooldowns = False
assert (
_should_run_cooldown_logic(testing_litellm_router, None, 500, Exception("Test"))
is False
)
# don't cooldown if it's a provider default deployment
testing_litellm_router.provider_default_deployment_ids = ["test_deployment"]
assert (
_should_run_cooldown_logic(
testing_litellm_router, "test_deployment", 500, Exception("Test")
)
is False
)
def test_should_cooldown_deployment_rate_limit_error(testing_litellm_router):
"""
Test the _should_cooldown_deployment function when a rate limit error occurs
"""
# Test 429 error (rate limit) -> always cooldown a deployment returning 429s
_exception = litellm.exceptions.RateLimitError(
"Rate limit", "openai", "gpt-3.5-turbo"
)
assert (
_should_cooldown_deployment(
testing_litellm_router, "test_deployment", 429, _exception
)
is True
)
def test_should_cooldown_deployment_auth_limit_error(testing_litellm_router):
"""
Test the _should_cooldown_deployment function when an auth limit error occurs
"""
# Test 401 error (auth limit) -> always cooldown a deployment returning 401s
_exception = litellm.exceptions.AuthenticationError(
"Unauthorized", "openai", "gpt-3.5-turbo"
)
assert (
_should_cooldown_deployment(
testing_litellm_router, "test_deployment", 401, _exception
)
is True
)
@pytest.mark.asyncio
async def test_should_cooldown_deployment(testing_litellm_router):
"""
Cooldown a deployment if it fails 60% of requests in 1 minute - DEFAULT threshold is 50%
"""
from litellm._logging import verbose_router_logger
import logging
verbose_router_logger.setLevel(logging.DEBUG)
# Test 429 error (rate limit) -> always cooldown a deployment returning 429s
_exception = litellm.exceptions.RateLimitError(
"Rate limit", "openai", "gpt-3.5-turbo"
)
assert (
_should_cooldown_deployment(
testing_litellm_router, "test_deployment", 429, _exception
)
is True
)
available_deployment = testing_litellm_router.get_available_deployment(
model="test_deployment"
)
print("available_deployment", available_deployment)
assert available_deployment is not None
deployment_id = available_deployment["model_info"]["id"]
print("deployment_id", deployment_id)
# set current success for deployment to 40
for _ in range(40):
increment_deployment_successes_for_current_minute(
litellm_router_instance=testing_litellm_router, deployment_id=deployment_id
)
# now we fail 40 requests in a row
tasks = []
for _ in range(41):
tasks.append(
testing_litellm_router.acompletion(
model=deployment_id,
messages=[{"role": "user", "content": "Hello, world!"}],
max_tokens=100,
mock_response="litellm.InternalServerError",
)
)
try:
await asyncio.gather(*tasks)
except Exception:
pass
await asyncio.sleep(1)
# expect this to fail since it's now 51% of requests are failing
assert (
_should_cooldown_deployment(
testing_litellm_router, deployment_id, 500, Exception("Test")
)
is True
)
@pytest.mark.asyncio
async def test_should_cooldown_deployment_allowed_fails_set_on_router():
"""
Test the _should_cooldown_deployment function when Router.allowed_fails is set
"""
# Create a Router instance with a test deployment
router = Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "gpt-3.5-turbo"},
"model_id": "test_deployment",
},
]
)
# Set up allowed_fails for the test deployment
router.allowed_fails = 100
# should not cooldown when fails are below the allowed limit
for _ in range(100):
assert (
_should_cooldown_deployment(
router, "test_deployment", 500, Exception("Test")
)
is False
)
assert (
_should_cooldown_deployment(router, "test_deployment", 500, Exception("Test"))
is True
)
def test_increment_deployment_successes_for_current_minute_does_not_write_to_redis(
testing_litellm_router,
):
"""
Ensure tracking deployment metrics does not write to redis
Important - If it writes to redis on every request it will seriously impact performance / latency
"""
from litellm.caching.dual_cache import DualCache
from litellm.caching.redis_cache import RedisCache
from litellm.caching.in_memory_cache import InMemoryCache
from litellm.router_utils.router_callbacks.track_deployment_metrics import (
increment_deployment_successes_for_current_minute,
)
# Mock RedisCache
mock_redis_cache = MagicMock(spec=RedisCache)
testing_litellm_router.cache = DualCache(
redis_cache=mock_redis_cache, in_memory_cache=InMemoryCache()
)
# Call the function we're testing
increment_deployment_successes_for_current_minute(
litellm_router_instance=testing_litellm_router, deployment_id="test_deployment"
)
increment_deployment_failures_for_current_minute(
litellm_router_instance=testing_litellm_router, deployment_id="test_deployment"
)
time.sleep(1)
# Assert that no methods were called on the mock_redis_cache
assert not mock_redis_cache.method_calls, "RedisCache methods should not be called"
print(
"in memory cache values=",
testing_litellm_router.cache.in_memory_cache.cache_dict,
)
assert (
testing_litellm_router.cache.in_memory_cache.get_cache(
"test_deployment:successes"
)
is not None
)
def test_cast_exception_status_to_int():
assert cast_exception_status_to_int(200) == 200
assert cast_exception_status_to_int("404") == 404
assert cast_exception_status_to_int("invalid") == 500