(testing) add router unit testing for send_llm_exception_alert , router_cooldown_event_callback , cooldown utils (#6258)

* add router unit testing for send_llm_exception_alert

* test router_cooldown_event_callback

* test test_router_cooldown_event_callback_no_prometheus

* test_router_cooldown_event_callback_no_deployment

* test_router_cooldown_event_callback_no_deployment

* add testing for test_should_run_cooldown_logic

* test_increment_deployment_successes_for_current_minute_does_not_write_to_redis

* test test_should_cooldown_deployment_allowed_fails_set_on_router

* use helper for _is_allowed_fails_set_on_router

* add complete testing for cooldown utils

* move router unit tests

* move router handle error

* fix test_send_llm_exception_alert_no_logger
This commit is contained in:
Ishaan Jaff 2024-10-16 23:19:51 +05:30 committed by GitHub
parent 8530000b44
commit 891e9001b5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 564 additions and 26 deletions

View file

@ -12,8 +12,10 @@ if TYPE_CHECKING:
from litellm.router import Router as _Router
LitellmRouter = _Router
from litellm.integrations.prometheus import PrometheusLogger
else:
LitellmRouter = Any
PrometheusLogger = Any
async def router_cooldown_event_callback(
@ -56,34 +58,38 @@ async def router_cooldown_event_callback(
except Exception:
pass
# Trigger cooldown on Prometheus
from litellm.integrations.custom_logger import CustomLogger
from litellm.integrations.prometheus import PrometheusLogger
from litellm.litellm_core_utils.litellm_logging import (
get_custom_logger_compatible_class,
)
# get the prometheus logger from in memory loggers
prometheusLogger: Optional[CustomLogger] = get_custom_logger_compatible_class(
logging_integration="prometheus",
prometheusLogger: Optional[PrometheusLogger] = (
_get_prometheus_logger_from_callbacks()
)
if prometheusLogger is not None:
prometheusLogger.set_deployment_complete_outage(
litellm_model_name=_model_name,
model_id=model_id,
api_base=_api_base,
api_provider=llm_provider,
)
if isinstance(prometheusLogger, PrometheusLogger):
prometheusLogger.set_deployment_complete_outage(
litellm_model_name=_model_name,
model_id=model_id,
api_base=_api_base,
api_provider=llm_provider,
)
prometheusLogger.increment_deployment_cooled_down(
litellm_model_name=_model_name,
model_id=model_id,
api_base=_api_base,
api_provider=llm_provider,
exception_status=str(exception_status),
)
prometheusLogger.increment_deployment_cooled_down(
litellm_model_name=_model_name,
model_id=model_id,
api_base=_api_base,
api_provider=llm_provider,
exception_status=str(exception_status),
)
return
def _get_prometheus_logger_from_callbacks() -> Optional[PrometheusLogger]:
"""
Checks if prometheus is a initalized callback, if yes returns it
"""
from litellm.integrations.prometheus import PrometheusLogger
for _callback in litellm.callbacks:
if isinstance(_callback, PrometheusLogger):
return _callback
return None

View file

@ -92,7 +92,13 @@ def _should_cooldown_deployment(
- v1 logic (Legacy): if allowed fails or allowed fail policy set, coolsdown if num fails in this minute > allowed fails
"""
if litellm_router_instance.allowed_fails_policy is None:
if (
litellm_router_instance.allowed_fails_policy is None
and _is_allowed_fails_set_on_router(
litellm_router_instance=litellm_router_instance
)
is False
):
num_successes_this_minute = get_deployment_successes_for_current_minute(
litellm_router_instance=litellm_router_instance, deployment_id=deployment
)
@ -303,6 +309,23 @@ def should_cooldown_based_on_allowed_fails_policy(
return False
def _is_allowed_fails_set_on_router(
litellm_router_instance: LitellmRouter,
) -> bool:
"""
Check if Router.allowed_fails is set or is Non-default Value
Returns:
- True if Router.allowed_fails is set or is Non-default Value
- False if Router.allowed_fails is None or is Default Value
"""
if litellm_router_instance.allowed_fails is None:
return False
if litellm_router_instance.allowed_fails != litellm.allowed_fails:
return True
return False
def cast_exception_status_to_int(exception_status: Union[str, int]) -> int:
if isinstance(exception_status, str):
try:

View file

@ -19,7 +19,8 @@ async def send_llm_exception_alert(
original_exception,
):
"""
Sends a Slack / MS Teams alert for the LLM API call failure.
Only runs if router.slack_alerting_logger is set
Sends a Slack / MS Teams alert for the LLM API call failure. Only if router.slack_alerting_logger is set.
Parameters:
litellm_router_instance (_Router): The LitellmRouter instance.

View file

@ -0,0 +1,396 @@
import sys, os, time
import traceback, asyncio
import pytest
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import litellm
from litellm import Router
from litellm.router import Deployment, LiteLLM_Params, ModelInfo
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
from dotenv import load_dotenv
from unittest.mock import AsyncMock, MagicMock
from litellm.integrations.prometheus import PrometheusLogger
from litellm.router_utils.cooldown_callbacks import router_cooldown_event_callback
from litellm.router_utils.cooldown_handlers import (
_should_run_cooldown_logic,
_should_cooldown_deployment,
cast_exception_status_to_int,
)
from litellm.router_utils.router_callbacks.track_deployment_metrics import (
increment_deployment_failures_for_current_minute,
increment_deployment_successes_for_current_minute,
)
load_dotenv()
class CustomPrometheusLogger(PrometheusLogger):
def __init__(self):
super().__init__()
self.deployment_complete_outages = []
self.deployment_cooled_downs = []
def set_deployment_complete_outage(
self,
litellm_model_name: str,
model_id: str,
api_base: str,
api_provider: str,
):
self.deployment_complete_outages.append(
[litellm_model_name, model_id, api_base, api_provider]
)
def increment_deployment_cooled_down(
self,
litellm_model_name: str,
model_id: str,
api_base: str,
api_provider: str,
exception_status: str,
):
self.deployment_cooled_downs.append(
[litellm_model_name, model_id, api_base, api_provider, exception_status]
)
@pytest.mark.asyncio
async def test_router_cooldown_event_callback():
"""
Test the router_cooldown_event_callback function
Ensures that the router_cooldown_event_callback function correctly logs the cooldown event to the PrometheusLogger
"""
# Mock Router instance
mock_router = MagicMock()
mock_deployment = {
"litellm_params": {"model": "gpt-3.5-turbo"},
"model_name": "gpt-3.5-turbo",
"model_info": ModelInfo(id="test-model-id"),
}
mock_router.get_deployment.return_value = mock_deployment
# Create a real PrometheusLogger instance
prometheus_logger = CustomPrometheusLogger()
litellm.callbacks = [prometheus_logger]
await router_cooldown_event_callback(
litellm_router_instance=mock_router,
deployment_id="test-deployment",
exception_status="429",
cooldown_time=60.0,
)
await asyncio.sleep(0.5)
# Assert that the router's get_deployment method was called
mock_router.get_deployment.assert_called_once_with(model_id="test-deployment")
print(
"prometheus_logger.deployment_complete_outages",
prometheus_logger.deployment_complete_outages,
)
print(
"prometheus_logger.deployment_cooled_downs",
prometheus_logger.deployment_cooled_downs,
)
# Assert that PrometheusLogger methods were called
assert len(prometheus_logger.deployment_complete_outages) == 1
assert len(prometheus_logger.deployment_cooled_downs) == 1
assert prometheus_logger.deployment_complete_outages[0] == [
"gpt-3.5-turbo",
"test-model-id",
"https://api.openai.com",
"openai",
]
assert prometheus_logger.deployment_cooled_downs[0] == [
"gpt-3.5-turbo",
"test-model-id",
"https://api.openai.com",
"openai",
"429",
]
@pytest.mark.asyncio
async def test_router_cooldown_event_callback_no_prometheus():
"""
Test the router_cooldown_event_callback function
Ensures that the router_cooldown_event_callback function does not raise an error when no PrometheusLogger is found
"""
# Mock Router instance
mock_router = MagicMock()
mock_deployment = {
"litellm_params": {"model": "gpt-3.5-turbo"},
"model_name": "gpt-3.5-turbo",
"model_info": ModelInfo(id="test-model-id"),
}
mock_router.get_deployment.return_value = mock_deployment
await router_cooldown_event_callback(
litellm_router_instance=mock_router,
deployment_id="test-deployment",
exception_status="429",
cooldown_time=60.0,
)
# Assert that the router's get_deployment method was called
mock_router.get_deployment.assert_called_once_with(model_id="test-deployment")
@pytest.mark.asyncio
async def test_router_cooldown_event_callback_no_deployment():
"""
Test the router_cooldown_event_callback function
Ensures that the router_cooldown_event_callback function does not raise an error when no deployment is found
In this scenario it should do nothing
"""
# Mock Router instance
mock_router = MagicMock()
mock_router.get_deployment.return_value = None
await router_cooldown_event_callback(
litellm_router_instance=mock_router,
deployment_id="test-deployment",
exception_status="429",
cooldown_time=60.0,
)
# Assert that the router's get_deployment method was called
mock_router.get_deployment.assert_called_once_with(model_id="test-deployment")
@pytest.fixture
def testing_litellm_router():
return Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "gpt-3.5-turbo"},
"model_id": "test_deployment",
},
{
"model_name": "test_deployment",
"litellm_params": {"model": "openai/test_deployment"},
"model_id": "test_deployment_2",
},
]
)
def test_should_run_cooldown_logic(testing_litellm_router):
testing_litellm_router.disable_cooldowns = True
# don't run cooldown logic if disable_cooldowns is True
assert (
_should_run_cooldown_logic(
testing_litellm_router, "test_deployment", 500, Exception("Test")
)
is False
)
# don't cooldown if deployment is None
testing_litellm_router.disable_cooldowns = False
assert (
_should_run_cooldown_logic(testing_litellm_router, None, 500, Exception("Test"))
is False
)
# don't cooldown if it's a provider default deployment
testing_litellm_router.provider_default_deployment_ids = ["test_deployment"]
assert (
_should_run_cooldown_logic(
testing_litellm_router, "test_deployment", 500, Exception("Test")
)
is False
)
def test_should_cooldown_deployment_rate_limit_error(testing_litellm_router):
"""
Test the _should_cooldown_deployment function when a rate limit error occurs
"""
# Test 429 error (rate limit) -> always cooldown a deployment returning 429s
_exception = litellm.exceptions.RateLimitError(
"Rate limit", "openai", "gpt-3.5-turbo"
)
assert (
_should_cooldown_deployment(
testing_litellm_router, "test_deployment", 429, _exception
)
is True
)
def test_should_cooldown_deployment_auth_limit_error(testing_litellm_router):
"""
Test the _should_cooldown_deployment function when an auth limit error occurs
"""
# Test 401 error (auth limit) -> always cooldown a deployment returning 401s
_exception = litellm.exceptions.AuthenticationError(
"Unauthorized", "openai", "gpt-3.5-turbo"
)
assert (
_should_cooldown_deployment(
testing_litellm_router, "test_deployment", 401, _exception
)
is True
)
@pytest.mark.asyncio
async def test_should_cooldown_deployment(testing_litellm_router):
"""
Cooldown a deployment if it fails 60% of requests in 1 minute - DEFAULT threshold is 50%
"""
from litellm._logging import verbose_router_logger
import logging
verbose_router_logger.setLevel(logging.DEBUG)
# Test 429 error (rate limit) -> always cooldown a deployment returning 429s
_exception = litellm.exceptions.RateLimitError(
"Rate limit", "openai", "gpt-3.5-turbo"
)
assert (
_should_cooldown_deployment(
testing_litellm_router, "test_deployment", 429, _exception
)
is True
)
available_deployment = testing_litellm_router.get_available_deployment(
model="test_deployment"
)
print("available_deployment", available_deployment)
assert available_deployment is not None
deployment_id = available_deployment["model_info"]["id"]
print("deployment_id", deployment_id)
# set current success for deployment to 40
for _ in range(40):
increment_deployment_successes_for_current_minute(
litellm_router_instance=testing_litellm_router, deployment_id=deployment_id
)
# now we fail 40 requests in a row
tasks = []
for _ in range(41):
tasks.append(
testing_litellm_router.acompletion(
model=deployment_id,
messages=[{"role": "user", "content": "Hello, world!"}],
max_tokens=100,
mock_response="litellm.InternalServerError",
)
)
try:
await asyncio.gather(*tasks)
except Exception:
pass
await asyncio.sleep(1)
# expect this to fail since it's now 51% of requests are failing
assert (
_should_cooldown_deployment(
testing_litellm_router, deployment_id, 500, Exception("Test")
)
is True
)
@pytest.mark.asyncio
async def test_should_cooldown_deployment_allowed_fails_set_on_router():
"""
Test the _should_cooldown_deployment function when Router.allowed_fails is set
"""
# Create a Router instance with a test deployment
router = Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "gpt-3.5-turbo"},
"model_id": "test_deployment",
},
]
)
# Set up allowed_fails for the test deployment
router.allowed_fails = 100
# should not cooldown when fails are below the allowed limit
for _ in range(100):
assert (
_should_cooldown_deployment(
router, "test_deployment", 500, Exception("Test")
)
is False
)
assert (
_should_cooldown_deployment(router, "test_deployment", 500, Exception("Test"))
is True
)
def test_increment_deployment_successes_for_current_minute_does_not_write_to_redis(
testing_litellm_router,
):
"""
Ensure tracking deployment metrics does not write to redis
Important - If it writes to redis on every request it will seriously impact performance / latency
"""
from litellm.caching.dual_cache import DualCache
from litellm.caching.redis_cache import RedisCache
from litellm.caching.in_memory_cache import InMemoryCache
from litellm.router_utils.router_callbacks.track_deployment_metrics import (
increment_deployment_successes_for_current_minute,
)
# Mock RedisCache
mock_redis_cache = MagicMock(spec=RedisCache)
testing_litellm_router.cache = DualCache(
redis_cache=mock_redis_cache, in_memory_cache=InMemoryCache()
)
# Call the function we're testing
increment_deployment_successes_for_current_minute(
litellm_router_instance=testing_litellm_router, deployment_id="test_deployment"
)
increment_deployment_failures_for_current_minute(
litellm_router_instance=testing_litellm_router, deployment_id="test_deployment"
)
time.sleep(1)
# Assert that no methods were called on the mock_redis_cache
assert not mock_redis_cache.method_calls, "RedisCache methods should not be called"
print(
"in memory cache values=",
testing_litellm_router.cache.in_memory_cache.cache_dict,
)
assert (
testing_litellm_router.cache.in_memory_cache.get_cache(
"test_deployment:successes"
)
is not None
)
def test_cast_exception_status_to_int():
assert cast_exception_status_to_int(200) == 200
assert cast_exception_status_to_int("404") == 404
assert cast_exception_status_to_int("invalid") == 500

View file

@ -0,0 +1,112 @@
import sys, os, time
import traceback, asyncio
import pytest
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
import litellm
from litellm import Router
from litellm.router import Deployment, LiteLLM_Params, ModelInfo
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
from dotenv import load_dotenv
from unittest.mock import AsyncMock, MagicMock
load_dotenv()
@pytest.mark.asyncio
async def test_send_llm_exception_alert_success():
"""
Test that the function sends an alert when the router.slack_alerting_logger is set.
"""
# Create a mock LitellmRouter instance
mock_router = MagicMock()
mock_router.slack_alerting_logger = AsyncMock()
# Create a mock exception
mock_exception = Exception("Test exception")
# Create mock request kwargs
request_kwargs = {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hello"}],
}
# Create a mock error traceback
error_traceback = 'Traceback (most recent call last):\n File "test.py", line 10, in <module>\n raise Exception("Test exception")\nException: Test exception'
# Call the function
from litellm.router_utils.handle_error import send_llm_exception_alert
await send_llm_exception_alert(
mock_router, request_kwargs, error_traceback, mock_exception
)
# Assert that the slack_alerting_logger's send_alert method was called
mock_router.slack_alerting_logger.send_alert.assert_called_once()
@pytest.mark.asyncio
async def test_send_llm_exception_alert_no_logger():
"""
Test that the function does error out when no slack_alerting_logger is set
"""
# Create a mock LitellmRouter instance without a slack_alerting_logger
mock_router = MagicMock()
mock_router.slack_alerting_logger = None
# Create a mock exception
mock_exception = Exception("Test exception")
# Create mock request kwargs
request_kwargs = {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hello"}],
}
# Create a mock error traceback
error_traceback = 'Traceback (most recent call last):\n File "test.py", line 10, in <module>\n raise Exception("Test exception")\nException: Test exception'
# Call the function
from litellm.router_utils.handle_error import send_llm_exception_alert
await send_llm_exception_alert(
mock_router, request_kwargs, error_traceback, mock_exception
)
@pytest.mark.asyncio
async def test_send_llm_exception_alert_when_proxy_server_request_in_kwargs():
"""
Test that the function does not send an alert when the request kwargs contains a proxy_server_request key.
"""
# Create a mock LitellmRouter instance with a slack_alerting_logger
mock_router = MagicMock()
mock_router.slack_alerting_logger = AsyncMock()
# Create a mock exception
mock_exception = Exception("Test exception")
# Create mock request kwargs
request_kwargs = {
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hello"}],
"proxy_server_request": {},
}
# Create a mock error traceback
error_traceback = 'Traceback (most recent call last):\n File "test.py", line 10, in <module>\n raise Exception("Test exception")\nException: Test exception'
# Call the function
from litellm.router_utils.handle_error import send_llm_exception_alert
await send_llm_exception_alert(
mock_router, request_kwargs, error_traceback, mock_exception
)
# Assert that no exception was raised and the function completed successfully
mock_router.slack_alerting_logger.send_alert.assert_not_called()