feat(slack_alerting.py): support sending daily reports on deployments

allow admin to easily know slow + failing deployments

Closes  https://github.com/BerriAI/litellm/issues/3483
This commit is contained in:
Krrish Dholakia 2024-05-06 17:18:42 -07:00
parent 863f9c60a2
commit 718f423d7d
7 changed files with 400 additions and 25 deletions

View file

@ -106,7 +106,7 @@ class InMemoryCache(BaseCache):
return_val.append(val) return_val.append(val)
return return_val return return_val
async def async_increment(self, key, value: int, **kwargs) -> int: async def async_increment(self, key, value: float, **kwargs) -> float:
# get the value # get the value
init_value = await self.async_get_cache(key=key) or 0 init_value = await self.async_get_cache(key=key) or 0
value = init_value + value value = init_value + value
@ -423,12 +423,12 @@ class RedisCache(BaseCache):
if len(self.redis_batch_writing_buffer) >= self.redis_flush_size: if len(self.redis_batch_writing_buffer) >= self.redis_flush_size:
await self.flush_cache_buffer() # logging done in here await self.flush_cache_buffer() # logging done in here
async def async_increment(self, key, value: int, **kwargs) -> int: async def async_increment(self, key, value: float, **kwargs) -> float:
_redis_client = self.init_async_client() _redis_client = self.init_async_client()
start_time = time.time() start_time = time.time()
try: try:
async with _redis_client as redis_client: async with _redis_client as redis_client:
result = await redis_client.incr(name=key, amount=value) result = await redis_client.incrbyfloat(name=key, amount=value)
## LOGGING ## ## LOGGING ##
end_time = time.time() end_time = time.time()
_duration = end_time - start_time _duration = end_time - start_time
@ -1382,18 +1382,41 @@ class DualCache(BaseCache):
print_verbose(f"LiteLLM Cache: Excepton async add_cache: {str(e)}") print_verbose(f"LiteLLM Cache: Excepton async add_cache: {str(e)}")
traceback.print_exc() traceback.print_exc()
async def async_batch_set_cache(
self, cache_list: list, local_only: bool = False, **kwargs
):
"""
Batch write values to the cache
"""
print_verbose(
f"async batch set cache: cache keys: {cache_list}; local_only: {local_only}"
)
try:
if self.in_memory_cache is not None:
await self.in_memory_cache.async_set_cache_pipeline(
cache_list=cache_list, **kwargs
)
if self.redis_cache is not None and local_only == False:
await self.redis_cache.async_set_cache_pipeline(
cache_list=cache_list, ttl=kwargs.get("ttl", None)
)
except Exception as e:
print_verbose(f"LiteLLM Cache: Excepton async add_cache: {str(e)}")
traceback.print_exc()
async def async_increment_cache( async def async_increment_cache(
self, key, value: int, local_only: bool = False, **kwargs self, key, value: float, local_only: bool = False, **kwargs
) -> int: ) -> float:
""" """
Key - the key in cache Key - the key in cache
Value - int - the value you want to increment by Value - float - the value you want to increment by
Returns - int - the incremented value Returns - float - the incremented value
""" """
try: try:
result: int = value result: float = value
if self.in_memory_cache is not None: if self.in_memory_cache is not None:
result = await self.in_memory_cache.async_increment( result = await self.in_memory_cache.async_increment(
key, value, **kwargs key, value, **kwargs

View file

@ -2,23 +2,74 @@
# Class for sending Slack Alerts # # Class for sending Slack Alerts #
import dotenv, os import dotenv, os
from litellm.proxy._types import UserAPIKeyAuth
dotenv.load_dotenv() # Loading env variables using dotenv dotenv.load_dotenv() # Loading env variables using dotenv
import copy
import traceback
from litellm._logging import verbose_logger, verbose_proxy_logger from litellm._logging import verbose_logger, verbose_proxy_logger
import litellm import litellm, threading
from typing import List, Literal, Any, Union, Optional, Dict from typing import List, Literal, Any, Union, Optional, Dict
from litellm.caching import DualCache from litellm.caching import DualCache
import asyncio import asyncio
import aiohttp import aiohttp
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler
import datetime import datetime
from pydantic import BaseModel
from enum import Enum
from datetime import datetime as dt, timedelta
from litellm.integrations.custom_logger import CustomLogger
class SlackAlerting: class LiteLLMBase(BaseModel):
"""
Implements default functions, all pydantic objects should have.
"""
def json(self, **kwargs):
try:
return self.model_dump() # noqa
except:
# if using pydantic v1
return self.dict()
class SlackArgs(LiteLLMBase):
daily_report_frequency: int = 12 * 60 * 60 # 12 hours
class DeploymentMetrics(LiteLLMBase):
"""
Metrics per deployment, stored in cache
Used for daily reporting
"""
id: str
"""id of deployment in router model list"""
failed_request: bool
"""did it fail the request?"""
latency_per_output_token: Optional[float]
"""latency/output token of deployment"""
updated_at: dt
"""Current time of deployment being updated"""
class SlackAlertingCacheKeys(Enum):
"""
Enum for deployment daily metrics keys - {deployment_id}:{enum}
"""
failed_requests_key = "failed_requests_daily_metrics"
latency_key = "latency_daily_metrics"
class SlackAlerting(CustomLogger):
# Class variables or attributes # Class variables or attributes
def __init__( def __init__(
self, self,
internal_usage_cache: DualCache,
alerting_threshold: float = 300, alerting_threshold: float = 300,
alerting: Optional[List] = [], alerting: Optional[List] = [],
alert_types: Optional[ alert_types: Optional[
@ -29,6 +80,7 @@ class SlackAlerting:
"llm_requests_hanging", "llm_requests_hanging",
"budget_alerts", "budget_alerts",
"db_exceptions", "db_exceptions",
"daily_reports",
] ]
] ]
] = [ ] = [
@ -37,6 +89,7 @@ class SlackAlerting:
"llm_requests_hanging", "llm_requests_hanging",
"budget_alerts", "budget_alerts",
"db_exceptions", "db_exceptions",
"daily_reports",
], ],
alert_to_webhook_url: Optional[ alert_to_webhook_url: Optional[
Dict Dict
@ -45,10 +98,10 @@ class SlackAlerting:
self.alerting_threshold = alerting_threshold self.alerting_threshold = alerting_threshold
self.alerting = alerting self.alerting = alerting
self.alert_types = alert_types self.alert_types = alert_types
self.internal_usage_cache = DualCache() self.internal_usage_cache = internal_usage_cache
self.async_http_handler = AsyncHTTPHandler() self.async_http_handler = AsyncHTTPHandler()
self.alert_to_webhook_url = alert_to_webhook_url self.alert_to_webhook_url = alert_to_webhook_url
pass self.is_running = False
def update_values( def update_values(
self, self,
@ -196,8 +249,178 @@ class SlackAlerting:
alert_type="llm_too_slow", alert_type="llm_too_slow",
) )
async def log_failure_event(self, original_exception: Exception): async def async_update_daily_reports(
pass self, deployment_metrics: DeploymentMetrics
) -> int:
"""
Store the perf by deployment in cache
- Number of failed requests per deployment
- Latency / output tokens per deployment
'deployment_id:daily_metrics:failed_requests'
'deployment_id:daily_metrics:latency_per_output_token'
Returns
int - count of metrics set (1 - if just latency, 2 - if failed + latency)
"""
return_val = 0
try:
## FAILED REQUESTS ##
if deployment_metrics.failed_request:
await self.internal_usage_cache.async_increment_cache(
key="{}:{}".format(
deployment_metrics.id,
SlackAlertingCacheKeys.failed_requests_key.value,
),
value=1,
)
return_val += 1
## LATENCY ##
if deployment_metrics.latency_per_output_token is not None:
await self.internal_usage_cache.async_increment_cache(
key="{}:{}".format(
deployment_metrics.id, SlackAlertingCacheKeys.latency_key.value
),
value=deployment_metrics.latency_per_output_token,
)
return_val += 1
return return_val
except Exception as e:
return 0
async def send_daily_reports(self, router: litellm.Router) -> bool:
"""
Send a daily report on:
- Top 5 deployments with most failed requests
- Top 5 slowest deployments (normalized by latency/output tokens)
Get the value from redis cache (if available) or in-memory and send it
Cleanup:
- reset values in cache -> prevent memory leak
Returns:
True -> if successfuly sent
False -> if not sent
"""
ids = router.get_model_ids()
# get keys
failed_request_keys = [
"{}:{}".format(id, SlackAlertingCacheKeys.failed_requests_key.value)
for id in ids
]
latency_keys = [
"{}:{}".format(id, SlackAlertingCacheKeys.latency_key.value) for id in ids
]
combined_metrics_keys = failed_request_keys + latency_keys # reduce cache calls
combined_metrics_values = await self.internal_usage_cache.async_batch_get_cache(
keys=combined_metrics_keys
) # [1, 2, None, ..]
all_none = True
for val in combined_metrics_values:
if val is not None:
all_none = False
if all_none:
return False
failed_request_values = combined_metrics_values[
: len(failed_request_keys)
] # # [1, 2, None, ..]
latency_values = combined_metrics_values[len(failed_request_keys) :]
# find top 5 failed
## Replace None values with a placeholder value (-1 in this case)
placeholder_value = 0
replaced_failed_values = [
value if value is not None else placeholder_value
for value in failed_request_values
]
## Get the indices of top 5 keys with the highest numerical values (ignoring None values)
top_5_failed = sorted(
range(len(replaced_failed_values)),
key=lambda i: replaced_failed_values[i],
reverse=True,
)[:5]
# find top 5 slowest
# Replace None values with a placeholder value (-1 in this case)
placeholder_value = -1
replaced_slowest_values = [
value if value is not None else placeholder_value
for value in latency_values
]
# Get the indices of top 5 values with the highest numerical values (ignoring None values)
top_5_slowest = sorted(
range(len(replaced_slowest_values)),
key=lambda i: replaced_slowest_values[i],
reverse=True,
)[:5]
# format alert -> return the litellm model name + api base
message = f"\n\nHere are today's key metrics 📈: \n\n"
message += "\n\n*❗️ Top 5 Deployments with Most Failed Requests:*\n\n"
for i in range(len(top_5_failed)):
key = failed_request_keys[top_5_failed[i]].split(":")[0]
_deployment = router.get_model_info(key)
if isinstance(_deployment, dict):
deployment_name = _deployment["litellm_params"].get("model", "")
else:
return False
api_base = litellm.get_api_base(
model=deployment_name,
optional_params=(
_deployment["litellm_params"] if _deployment is not None else {}
),
)
if api_base is None:
api_base = ""
value = replaced_failed_values[top_5_failed[i]]
message += f"\t{i+1}. Deployment: `{deployment_name}`, Failed Requests: `{value}`, API Base: `{api_base}`\n"
message += "\n\n*😅 Top 5 Slowest Deployments:*\n\n"
for i in range(len(top_5_slowest)):
key = latency_keys[top_5_slowest[i]].split(":")[0]
_deployment = router.get_model_info(key)
if _deployment is not None:
deployment_name = _deployment["litellm_params"].get("model", "")
else:
deployment_name = ""
api_base = litellm.get_api_base(
model=deployment_name,
optional_params=(
_deployment["litellm_params"] if _deployment is not None else {}
),
)
value = replaced_slowest_values[top_5_slowest[i]]
message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency: `{value}`, API Base: `{api_base}`\n\n"
# cache cleanup -> reset values to 0
latency_cache_keys = [(key, 0) for key in latency_keys]
failed_request_cache_keys = [(key, 0) for key in failed_request_keys]
combined_metrics_cache_keys = latency_cache_keys + failed_request_cache_keys
await self.internal_usage_cache.async_batch_set_cache(
cache_list=combined_metrics_cache_keys
)
# send alert
await self.send_alert(message=message, level="Low", alert_type="daily_reports")
return True
async def response_taking_too_long( async def response_taking_too_long(
self, self,
@ -414,6 +637,7 @@ class SlackAlerting:
"llm_requests_hanging", "llm_requests_hanging",
"budget_alerts", "budget_alerts",
"db_exceptions", "db_exceptions",
"daily_reports",
], ],
): ):
""" """
@ -439,9 +663,12 @@ class SlackAlerting:
# Get the current timestamp # Get the current timestamp
current_time = datetime.now().strftime("%H:%M:%S") current_time = datetime.now().strftime("%H:%M:%S")
_proxy_base_url = os.getenv("PROXY_BASE_URL", None) _proxy_base_url = os.getenv("PROXY_BASE_URL", None)
formatted_message = ( if alert_type == "daily_reports":
f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}" formatted_message = message
) else:
formatted_message = (
f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}"
)
if _proxy_base_url is not None: if _proxy_base_url is not None:
formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`" formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`"
@ -468,3 +695,36 @@ class SlackAlerting:
pass pass
else: else:
print("Error sending slack alert. Error=", response.text) # noqa print("Error sending slack alert. Error=", response.text) # noqa
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
"""Log deployment latency"""
model_id = kwargs.get("litellm_params", {}).get("model_info", {}).get("id", "")
response_ms: timedelta = end_time - start_time
final_value = response_ms
total_tokens = 0
if isinstance(response_obj, litellm.ModelResponse):
completion_tokens = response_obj.usage.completion_tokens
final_value = float(response_ms.total_seconds() / completion_tokens)
await self.async_update_daily_reports(
DeploymentMetrics(
id=model_id,
failed_request=False,
latency_per_output_token=final_value,
updated_at=litellm.utils.get_utc_datetime(),
)
)
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
"""Log failure + deployment latency"""
model_id = kwargs.get("litellm_params", {}).get("model_info", {}).get("id", "")
await self.async_update_daily_reports(
DeploymentMetrics(
id=model_id,
failed_request=True,
latency_per_output_token=None,
updated_at=litellm.utils.get_utc_datetime(),
)
)

View file

@ -19,4 +19,4 @@ litellm_settings:
general_settings: general_settings:
alerting: ["slack"] alerting: ["slack"]
alert_types: ["llm_exceptions"] alert_types: ["llm_exceptions", "daily_reports"]

View file

@ -73,6 +73,7 @@ class ProxyLogging:
"llm_requests_hanging", "llm_requests_hanging",
"budget_alerts", "budget_alerts",
"db_exceptions", "db_exceptions",
"daily_reports",
] ]
] = [ ] = [
"llm_exceptions", "llm_exceptions",
@ -80,11 +81,13 @@ class ProxyLogging:
"llm_requests_hanging", "llm_requests_hanging",
"budget_alerts", "budget_alerts",
"db_exceptions", "db_exceptions",
"daily_reports",
] ]
self.slack_alerting_instance = SlackAlerting( self.slack_alerting_instance = SlackAlerting(
alerting_threshold=self.alerting_threshold, alerting_threshold=self.alerting_threshold,
alerting=self.alerting, alerting=self.alerting,
alert_types=self.alert_types, alert_types=self.alert_types,
internal_usage_cache=self.internal_usage_cache,
) )
def update_values( def update_values(
@ -100,6 +103,7 @@ class ProxyLogging:
"llm_requests_hanging", "llm_requests_hanging",
"budget_alerts", "budget_alerts",
"db_exceptions", "db_exceptions",
"daily_reports",
] ]
] ]
] = None, ] = None,

View file

@ -2597,7 +2597,10 @@ class Router:
return model return model
return None return None
def get_model_ids(self): def get_model_ids(self) -> List[str]:
"""
Returns list of model id's.
"""
ids = [] ids = []
for model in self.model_list: for model in self.model_list:
if "model_info" in model and "id" in model["model_info"]: if "model_info" in model and "id" in model["model_info"]:
@ -2605,7 +2608,7 @@ class Router:
ids.append(id) ids.append(id)
return ids return ids
def get_model_names(self): def get_model_names(self) -> List[str]:
return self.model_names return self.model_names
def get_model_list(self): def get_model_list(self):

View file

@ -17,7 +17,7 @@ import asyncio
from unittest.mock import patch, MagicMock from unittest.mock import patch, MagicMock
from litellm.utils import get_api_base from litellm.utils import get_api_base
from litellm.caching import DualCache from litellm.caching import DualCache
from litellm.integrations.slack_alerting import SlackAlerting from litellm.integrations.slack_alerting import SlackAlerting, DeploymentMetrics
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -116,7 +116,7 @@ from datetime import datetime, timedelta
@pytest.fixture @pytest.fixture
def slack_alerting(): def slack_alerting():
return SlackAlerting(alerting_threshold=1) return SlackAlerting(alerting_threshold=1, internal_usage_cache=DualCache())
# Test for hanging LLM responses # Test for hanging LLM responses
@ -185,3 +185,88 @@ async def test_send_alert(slack_alerting):
mock_post.return_value.status_code = 200 mock_post.return_value.status_code = 200
await slack_alerting.send_alert("Test message", "Low", "budget_alerts") await slack_alerting.send_alert("Test message", "Low", "budget_alerts")
mock_post.assert_awaited_once() mock_post.assert_awaited_once()
@pytest.mark.asyncio
async def test_daily_reports_unit_test(slack_alerting):
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert:
router = litellm.Router(
model_list=[
{
"model_name": "test-gpt",
"litellm_params": {"model": "gpt-3.5-turbo"},
"model_info": {"id": "1234"},
}
]
)
deployment_metrics = DeploymentMetrics(
id="1234",
failed_request=False,
latency_per_output_token=20.3,
updated_at=litellm.utils.get_utc_datetime(),
)
updated_val = await slack_alerting.async_update_daily_reports(
deployment_metrics=deployment_metrics
)
assert updated_val == 1
await slack_alerting.send_daily_reports(router=router)
mock_send_alert.assert_awaited_once()
@pytest.mark.asyncio
async def test_daily_reports_completion(slack_alerting):
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert:
litellm.callbacks = [slack_alerting]
# on async success
router = litellm.Router(
model_list=[
{
"model_name": "gpt-5",
"litellm_params": {
"model": "gpt-3.5-turbo",
},
}
]
)
await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
await asyncio.sleep(3)
response_val = await slack_alerting.send_daily_reports(router=router)
assert response_val == True
mock_send_alert.assert_awaited_once()
# on async failure
router = litellm.Router(
model_list=[
{
"model_name": "gpt-5",
"litellm_params": {"model": "gpt-3.5-turbo", "api_key": "bad_key"},
}
]
)
try:
await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
except Exception as e:
pass
await asyncio.sleep(3)
response_val = await slack_alerting.send_daily_reports(router=router)
assert response_val == True
mock_send_alert.assert_awaited()

View file

@ -5825,7 +5825,7 @@ def get_api_base(model: str, optional_params: dict) -> Optional[str]:
Parameters: Parameters:
- model: str - the model passed to litellm.completion() - model: str - the model passed to litellm.completion()
- optional_params - the additional params passed to litellm.completion - eg. api_base, api_key, etc. See `LiteLLM_Params` - https://github.com/BerriAI/litellm/blob/f09e6ba98d65e035a79f73bc069145002ceafd36/litellm/router.py#L67 - optional_params - the 'litellm_params' in router.completion *OR* additional params passed to litellm.completion - eg. api_base, api_key, etc. See `LiteLLM_Params` - https://github.com/BerriAI/litellm/blob/f09e6ba98d65e035a79f73bc069145002ceafd36/litellm/router.py#L67
Returns: Returns:
- string (api_base) or None - string (api_base) or None