forked from phoenix/litellm-mirror
* docs(exception_mapping.md): add missing exception types Fixes https://github.com/Aider-AI/aider/issues/2120#issuecomment-2438971183 * fix(main.py): register custom model pricing with specific key Ensure custom model pricing is registered to the specific model+provider key combination * test: make testing more robust for custom pricing * fix(redis_cache.py): instrument otel logging for sync redis calls ensures complete coverage for all redis cache calls * refactor: pass parent_otel_span for redis caching calls in router allows for more observability into what calls are causing latency issues * test: update tests with new params * refactor: ensure e2e otel tracing for router * refactor(router.py): add more otel tracing acrosss router catch all latency issues for router requests * fix: fix linting error * fix(router.py): fix linting error * fix: fix test * test: fix tests * fix(dual_cache.py): pass ttl to redis cache * fix: fix param * perf(cooldown_cache.py): improve cooldown cache, to store cache results in memory for 5s, prevents redis call from being made on each request reduces 100ms latency per call with caching enabled on router * fix: fix test * fix(cooldown_cache.py): handle if a result is None * fix(cooldown_cache.py): add debug statements * refactor(dual_cache.py): move to using an in-memory check for batch get cache, to prevent redis from being hit for every call * fix(cooldown_cache.py): fix linting erropr
396 lines
12 KiB
Python
396 lines
12 KiB
Python
import sys, os, time
|
|
import traceback, asyncio
|
|
import pytest
|
|
|
|
sys.path.insert(
|
|
0, os.path.abspath("../..")
|
|
) # Adds the parent directory to the system path
|
|
import litellm
|
|
from litellm import Router
|
|
from litellm.router import Deployment, LiteLLM_Params, ModelInfo
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from collections import defaultdict
|
|
from dotenv import load_dotenv
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
from litellm.integrations.prometheus import PrometheusLogger
|
|
from litellm.router_utils.cooldown_callbacks import router_cooldown_event_callback
|
|
from litellm.router_utils.cooldown_handlers import (
|
|
_should_run_cooldown_logic,
|
|
_should_cooldown_deployment,
|
|
cast_exception_status_to_int,
|
|
)
|
|
from litellm.router_utils.router_callbacks.track_deployment_metrics import (
|
|
increment_deployment_failures_for_current_minute,
|
|
increment_deployment_successes_for_current_minute,
|
|
)
|
|
|
|
load_dotenv()
|
|
|
|
|
|
class CustomPrometheusLogger(PrometheusLogger):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.deployment_complete_outages = []
|
|
self.deployment_cooled_downs = []
|
|
|
|
def set_deployment_complete_outage(
|
|
self,
|
|
litellm_model_name: str,
|
|
model_id: str,
|
|
api_base: str,
|
|
api_provider: str,
|
|
):
|
|
self.deployment_complete_outages.append(
|
|
[litellm_model_name, model_id, api_base, api_provider]
|
|
)
|
|
|
|
def increment_deployment_cooled_down(
|
|
self,
|
|
litellm_model_name: str,
|
|
model_id: str,
|
|
api_base: str,
|
|
api_provider: str,
|
|
exception_status: str,
|
|
):
|
|
self.deployment_cooled_downs.append(
|
|
[litellm_model_name, model_id, api_base, api_provider, exception_status]
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_router_cooldown_event_callback():
|
|
"""
|
|
Test the router_cooldown_event_callback function
|
|
|
|
Ensures that the router_cooldown_event_callback function correctly logs the cooldown event to the PrometheusLogger
|
|
"""
|
|
# Mock Router instance
|
|
mock_router = MagicMock()
|
|
mock_deployment = {
|
|
"litellm_params": {"model": "gpt-3.5-turbo"},
|
|
"model_name": "gpt-3.5-turbo",
|
|
"model_info": ModelInfo(id="test-model-id"),
|
|
}
|
|
mock_router.get_deployment.return_value = mock_deployment
|
|
|
|
# Create a real PrometheusLogger instance
|
|
prometheus_logger = CustomPrometheusLogger()
|
|
litellm.callbacks = [prometheus_logger]
|
|
|
|
await router_cooldown_event_callback(
|
|
litellm_router_instance=mock_router,
|
|
deployment_id="test-deployment",
|
|
exception_status="429",
|
|
cooldown_time=60.0,
|
|
)
|
|
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Assert that the router's get_deployment method was called
|
|
mock_router.get_deployment.assert_called_once_with(model_id="test-deployment")
|
|
|
|
print(
|
|
"prometheus_logger.deployment_complete_outages",
|
|
prometheus_logger.deployment_complete_outages,
|
|
)
|
|
print(
|
|
"prometheus_logger.deployment_cooled_downs",
|
|
prometheus_logger.deployment_cooled_downs,
|
|
)
|
|
|
|
# Assert that PrometheusLogger methods were called
|
|
assert len(prometheus_logger.deployment_complete_outages) == 1
|
|
assert len(prometheus_logger.deployment_cooled_downs) == 1
|
|
|
|
assert prometheus_logger.deployment_complete_outages[0] == [
|
|
"gpt-3.5-turbo",
|
|
"test-model-id",
|
|
"https://api.openai.com",
|
|
"openai",
|
|
]
|
|
assert prometheus_logger.deployment_cooled_downs[0] == [
|
|
"gpt-3.5-turbo",
|
|
"test-model-id",
|
|
"https://api.openai.com",
|
|
"openai",
|
|
"429",
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_router_cooldown_event_callback_no_prometheus():
|
|
"""
|
|
Test the router_cooldown_event_callback function
|
|
|
|
Ensures that the router_cooldown_event_callback function does not raise an error when no PrometheusLogger is found
|
|
"""
|
|
# Mock Router instance
|
|
mock_router = MagicMock()
|
|
mock_deployment = {
|
|
"litellm_params": {"model": "gpt-3.5-turbo"},
|
|
"model_name": "gpt-3.5-turbo",
|
|
"model_info": ModelInfo(id="test-model-id"),
|
|
}
|
|
mock_router.get_deployment.return_value = mock_deployment
|
|
|
|
await router_cooldown_event_callback(
|
|
litellm_router_instance=mock_router,
|
|
deployment_id="test-deployment",
|
|
exception_status="429",
|
|
cooldown_time=60.0,
|
|
)
|
|
|
|
# Assert that the router's get_deployment method was called
|
|
mock_router.get_deployment.assert_called_once_with(model_id="test-deployment")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_router_cooldown_event_callback_no_deployment():
|
|
"""
|
|
Test the router_cooldown_event_callback function
|
|
|
|
Ensures that the router_cooldown_event_callback function does not raise an error when no deployment is found
|
|
|
|
In this scenario it should do nothing
|
|
"""
|
|
# Mock Router instance
|
|
mock_router = MagicMock()
|
|
mock_router.get_deployment.return_value = None
|
|
|
|
await router_cooldown_event_callback(
|
|
litellm_router_instance=mock_router,
|
|
deployment_id="test-deployment",
|
|
exception_status="429",
|
|
cooldown_time=60.0,
|
|
)
|
|
|
|
# Assert that the router's get_deployment method was called
|
|
mock_router.get_deployment.assert_called_once_with(model_id="test-deployment")
|
|
|
|
|
|
@pytest.fixture
|
|
def testing_litellm_router():
|
|
return Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gpt-3.5-turbo",
|
|
"litellm_params": {"model": "gpt-3.5-turbo"},
|
|
"model_id": "test_deployment",
|
|
},
|
|
{
|
|
"model_name": "test_deployment",
|
|
"litellm_params": {"model": "openai/test_deployment"},
|
|
"model_id": "test_deployment_2",
|
|
},
|
|
]
|
|
)
|
|
|
|
|
|
def test_should_run_cooldown_logic(testing_litellm_router):
|
|
testing_litellm_router.disable_cooldowns = True
|
|
# don't run cooldown logic if disable_cooldowns is True
|
|
assert (
|
|
_should_run_cooldown_logic(
|
|
testing_litellm_router, "test_deployment", 500, Exception("Test")
|
|
)
|
|
is False
|
|
)
|
|
|
|
# don't cooldown if deployment is None
|
|
testing_litellm_router.disable_cooldowns = False
|
|
assert (
|
|
_should_run_cooldown_logic(testing_litellm_router, None, 500, Exception("Test"))
|
|
is False
|
|
)
|
|
|
|
# don't cooldown if it's a provider default deployment
|
|
testing_litellm_router.provider_default_deployment_ids = ["test_deployment"]
|
|
assert (
|
|
_should_run_cooldown_logic(
|
|
testing_litellm_router, "test_deployment", 500, Exception("Test")
|
|
)
|
|
is False
|
|
)
|
|
|
|
|
|
def test_should_cooldown_deployment_rate_limit_error(testing_litellm_router):
|
|
"""
|
|
Test the _should_cooldown_deployment function when a rate limit error occurs
|
|
"""
|
|
# Test 429 error (rate limit) -> always cooldown a deployment returning 429s
|
|
_exception = litellm.exceptions.RateLimitError(
|
|
"Rate limit", "openai", "gpt-3.5-turbo"
|
|
)
|
|
assert (
|
|
_should_cooldown_deployment(
|
|
testing_litellm_router, "test_deployment", 429, _exception
|
|
)
|
|
is True
|
|
)
|
|
|
|
|
|
def test_should_cooldown_deployment_auth_limit_error(testing_litellm_router):
|
|
"""
|
|
Test the _should_cooldown_deployment function when an auth limit error occurs
|
|
"""
|
|
# Test 401 error (auth limit) -> always cooldown a deployment returning 401s
|
|
_exception = litellm.exceptions.AuthenticationError(
|
|
"Unauthorized", "openai", "gpt-3.5-turbo"
|
|
)
|
|
assert (
|
|
_should_cooldown_deployment(
|
|
testing_litellm_router, "test_deployment", 401, _exception
|
|
)
|
|
is True
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_should_cooldown_deployment(testing_litellm_router):
|
|
"""
|
|
Cooldown a deployment if it fails 60% of requests in 1 minute - DEFAULT threshold is 50%
|
|
"""
|
|
from litellm._logging import verbose_router_logger
|
|
import logging
|
|
|
|
verbose_router_logger.setLevel(logging.DEBUG)
|
|
|
|
# Test 429 error (rate limit) -> always cooldown a deployment returning 429s
|
|
_exception = litellm.exceptions.RateLimitError(
|
|
"Rate limit", "openai", "gpt-3.5-turbo"
|
|
)
|
|
assert (
|
|
_should_cooldown_deployment(
|
|
testing_litellm_router, "test_deployment", 429, _exception
|
|
)
|
|
is True
|
|
)
|
|
|
|
available_deployment = testing_litellm_router.get_available_deployment(
|
|
model="test_deployment"
|
|
)
|
|
print("available_deployment", available_deployment)
|
|
assert available_deployment is not None
|
|
|
|
deployment_id = available_deployment["model_info"]["id"]
|
|
print("deployment_id", deployment_id)
|
|
|
|
# set current success for deployment to 40
|
|
for _ in range(40):
|
|
increment_deployment_successes_for_current_minute(
|
|
litellm_router_instance=testing_litellm_router, deployment_id=deployment_id
|
|
)
|
|
|
|
# now we fail 40 requests in a row
|
|
tasks = []
|
|
for _ in range(41):
|
|
tasks.append(
|
|
testing_litellm_router.acompletion(
|
|
model=deployment_id,
|
|
messages=[{"role": "user", "content": "Hello, world!"}],
|
|
max_tokens=100,
|
|
mock_response="litellm.InternalServerError",
|
|
)
|
|
)
|
|
try:
|
|
await asyncio.gather(*tasks)
|
|
except Exception:
|
|
pass
|
|
|
|
await asyncio.sleep(1)
|
|
|
|
# expect this to fail since it's now 51% of requests are failing
|
|
assert (
|
|
_should_cooldown_deployment(
|
|
testing_litellm_router, deployment_id, 500, Exception("Test")
|
|
)
|
|
is True
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_should_cooldown_deployment_allowed_fails_set_on_router():
|
|
"""
|
|
Test the _should_cooldown_deployment function when Router.allowed_fails is set
|
|
"""
|
|
# Create a Router instance with a test deployment
|
|
router = Router(
|
|
model_list=[
|
|
{
|
|
"model_name": "gpt-3.5-turbo",
|
|
"litellm_params": {"model": "gpt-3.5-turbo"},
|
|
"model_id": "test_deployment",
|
|
},
|
|
]
|
|
)
|
|
|
|
# Set up allowed_fails for the test deployment
|
|
router.allowed_fails = 100
|
|
|
|
# should not cooldown when fails are below the allowed limit
|
|
for _ in range(100):
|
|
assert (
|
|
_should_cooldown_deployment(
|
|
router, "test_deployment", 500, Exception("Test")
|
|
)
|
|
is False
|
|
)
|
|
|
|
assert (
|
|
_should_cooldown_deployment(router, "test_deployment", 500, Exception("Test"))
|
|
is True
|
|
)
|
|
|
|
|
|
def test_increment_deployment_successes_for_current_minute_does_not_write_to_redis(
|
|
testing_litellm_router,
|
|
):
|
|
"""
|
|
Ensure tracking deployment metrics does not write to redis
|
|
|
|
Important - If it writes to redis on every request it will seriously impact performance / latency
|
|
"""
|
|
from litellm.caching.dual_cache import DualCache
|
|
from litellm.caching.redis_cache import RedisCache
|
|
from litellm.caching.in_memory_cache import InMemoryCache
|
|
from litellm.router_utils.router_callbacks.track_deployment_metrics import (
|
|
increment_deployment_successes_for_current_minute,
|
|
)
|
|
|
|
# Mock RedisCache
|
|
mock_redis_cache = MagicMock(spec=RedisCache)
|
|
|
|
testing_litellm_router.cache = DualCache(
|
|
redis_cache=mock_redis_cache, in_memory_cache=InMemoryCache()
|
|
)
|
|
|
|
# Call the function we're testing
|
|
increment_deployment_successes_for_current_minute(
|
|
litellm_router_instance=testing_litellm_router, deployment_id="test_deployment"
|
|
)
|
|
|
|
increment_deployment_failures_for_current_minute(
|
|
litellm_router_instance=testing_litellm_router, deployment_id="test_deployment"
|
|
)
|
|
|
|
time.sleep(1)
|
|
|
|
# Assert that no methods were called on the mock_redis_cache
|
|
assert not mock_redis_cache.method_calls, "RedisCache methods should not be called"
|
|
|
|
print(
|
|
"in memory cache values=",
|
|
testing_litellm_router.cache.in_memory_cache.cache_dict,
|
|
)
|
|
assert (
|
|
testing_litellm_router.cache.in_memory_cache.get_cache(
|
|
"test_deployment:successes"
|
|
)
|
|
is not None
|
|
)
|
|
|
|
|
|
def test_cast_exception_status_to_int():
|
|
assert cast_exception_status_to_int(200) == 200
|
|
assert cast_exception_status_to_int("404") == 404
|
|
assert cast_exception_status_to_int("invalid") == 500
|