From 718f423d7daa8d55b38b66234d3a3e72c78b654b Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Mon, 6 May 2024 17:18:42 -0700 Subject: [PATCH] feat(slack_alerting.py): support sending daily reports on deployments allow admin to easily know slow + failing deployments Closes https://github.com/BerriAI/litellm/issues/3483 --- litellm/caching.py | 39 +++- litellm/integrations/slack_alerting.py | 282 +++++++++++++++++++++++- litellm/proxy/_super_secret_config.yaml | 2 +- litellm/proxy/utils.py | 4 + litellm/router.py | 7 +- litellm/tests/test_alerting.py | 89 +++++++- litellm/utils.py | 2 +- 7 files changed, 400 insertions(+), 25 deletions(-) diff --git a/litellm/caching.py b/litellm/caching.py index d7cf03370..83cfe060b 100644 --- a/litellm/caching.py +++ b/litellm/caching.py @@ -106,7 +106,7 @@ class InMemoryCache(BaseCache): return_val.append(val) return return_val - async def async_increment(self, key, value: int, **kwargs) -> int: + async def async_increment(self, key, value: float, **kwargs) -> float: # get the value init_value = await self.async_get_cache(key=key) or 0 value = init_value + value @@ -423,12 +423,12 @@ class RedisCache(BaseCache): if len(self.redis_batch_writing_buffer) >= self.redis_flush_size: await self.flush_cache_buffer() # logging done in here - async def async_increment(self, key, value: int, **kwargs) -> int: + async def async_increment(self, key, value: float, **kwargs) -> float: _redis_client = self.init_async_client() start_time = time.time() try: async with _redis_client as redis_client: - result = await redis_client.incr(name=key, amount=value) + result = await redis_client.incrbyfloat(name=key, amount=value) ## LOGGING ## end_time = time.time() _duration = end_time - start_time @@ -1382,18 +1382,41 @@ class DualCache(BaseCache): print_verbose(f"LiteLLM Cache: Excepton async add_cache: {str(e)}") traceback.print_exc() + async def async_batch_set_cache( + self, cache_list: list, local_only: bool = False, **kwargs + ): + """ + Batch write values to the cache + """ + print_verbose( + f"async batch set cache: cache keys: {cache_list}; local_only: {local_only}" + ) + try: + if self.in_memory_cache is not None: + await self.in_memory_cache.async_set_cache_pipeline( + cache_list=cache_list, **kwargs + ) + + if self.redis_cache is not None and local_only == False: + await self.redis_cache.async_set_cache_pipeline( + cache_list=cache_list, ttl=kwargs.get("ttl", None) + ) + except Exception as e: + print_verbose(f"LiteLLM Cache: Excepton async add_cache: {str(e)}") + traceback.print_exc() + async def async_increment_cache( - self, key, value: int, local_only: bool = False, **kwargs - ) -> int: + self, key, value: float, local_only: bool = False, **kwargs + ) -> float: """ Key - the key in cache - Value - int - the value you want to increment by + Value - float - the value you want to increment by - Returns - int - the incremented value + Returns - float - the incremented value """ try: - result: int = value + result: float = value if self.in_memory_cache is not None: result = await self.in_memory_cache.async_increment( key, value, **kwargs diff --git a/litellm/integrations/slack_alerting.py b/litellm/integrations/slack_alerting.py index a9aba2f1c..af328666f 100644 --- a/litellm/integrations/slack_alerting.py +++ b/litellm/integrations/slack_alerting.py @@ -2,23 +2,74 @@ # Class for sending Slack Alerts # import dotenv, os +from litellm.proxy._types import UserAPIKeyAuth + dotenv.load_dotenv() # Loading env variables using dotenv -import copy -import traceback from litellm._logging import verbose_logger, verbose_proxy_logger -import litellm +import litellm, threading from typing import List, Literal, Any, Union, Optional, Dict from litellm.caching import DualCache import asyncio import aiohttp from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler import datetime +from pydantic import BaseModel +from enum import Enum +from datetime import datetime as dt, timedelta +from litellm.integrations.custom_logger import CustomLogger -class SlackAlerting: +class LiteLLMBase(BaseModel): + """ + Implements default functions, all pydantic objects should have. + """ + + def json(self, **kwargs): + try: + return self.model_dump() # noqa + except: + # if using pydantic v1 + return self.dict() + + +class SlackArgs(LiteLLMBase): + daily_report_frequency: int = 12 * 60 * 60 # 12 hours + + +class DeploymentMetrics(LiteLLMBase): + """ + Metrics per deployment, stored in cache + + Used for daily reporting + """ + + id: str + """id of deployment in router model list""" + + failed_request: bool + """did it fail the request?""" + + latency_per_output_token: Optional[float] + """latency/output token of deployment""" + + updated_at: dt + """Current time of deployment being updated""" + + +class SlackAlertingCacheKeys(Enum): + """ + Enum for deployment daily metrics keys - {deployment_id}:{enum} + """ + + failed_requests_key = "failed_requests_daily_metrics" + latency_key = "latency_daily_metrics" + + +class SlackAlerting(CustomLogger): # Class variables or attributes def __init__( self, + internal_usage_cache: DualCache, alerting_threshold: float = 300, alerting: Optional[List] = [], alert_types: Optional[ @@ -29,6 +80,7 @@ class SlackAlerting: "llm_requests_hanging", "budget_alerts", "db_exceptions", + "daily_reports", ] ] ] = [ @@ -37,6 +89,7 @@ class SlackAlerting: "llm_requests_hanging", "budget_alerts", "db_exceptions", + "daily_reports", ], alert_to_webhook_url: Optional[ Dict @@ -45,10 +98,10 @@ class SlackAlerting: self.alerting_threshold = alerting_threshold self.alerting = alerting self.alert_types = alert_types - self.internal_usage_cache = DualCache() + self.internal_usage_cache = internal_usage_cache self.async_http_handler = AsyncHTTPHandler() self.alert_to_webhook_url = alert_to_webhook_url - pass + self.is_running = False def update_values( self, @@ -196,8 +249,178 @@ class SlackAlerting: alert_type="llm_too_slow", ) - async def log_failure_event(self, original_exception: Exception): - pass + async def async_update_daily_reports( + self, deployment_metrics: DeploymentMetrics + ) -> int: + """ + Store the perf by deployment in cache + - Number of failed requests per deployment + - Latency / output tokens per deployment + + 'deployment_id:daily_metrics:failed_requests' + 'deployment_id:daily_metrics:latency_per_output_token' + + Returns + int - count of metrics set (1 - if just latency, 2 - if failed + latency) + """ + + return_val = 0 + try: + ## FAILED REQUESTS ## + if deployment_metrics.failed_request: + await self.internal_usage_cache.async_increment_cache( + key="{}:{}".format( + deployment_metrics.id, + SlackAlertingCacheKeys.failed_requests_key.value, + ), + value=1, + ) + + return_val += 1 + + ## LATENCY ## + if deployment_metrics.latency_per_output_token is not None: + await self.internal_usage_cache.async_increment_cache( + key="{}:{}".format( + deployment_metrics.id, SlackAlertingCacheKeys.latency_key.value + ), + value=deployment_metrics.latency_per_output_token, + ) + + return_val += 1 + + return return_val + except Exception as e: + return 0 + + async def send_daily_reports(self, router: litellm.Router) -> bool: + """ + Send a daily report on: + - Top 5 deployments with most failed requests + - Top 5 slowest deployments (normalized by latency/output tokens) + + Get the value from redis cache (if available) or in-memory and send it + + Cleanup: + - reset values in cache -> prevent memory leak + + Returns: + True -> if successfuly sent + False -> if not sent + """ + + ids = router.get_model_ids() + + # get keys + failed_request_keys = [ + "{}:{}".format(id, SlackAlertingCacheKeys.failed_requests_key.value) + for id in ids + ] + latency_keys = [ + "{}:{}".format(id, SlackAlertingCacheKeys.latency_key.value) for id in ids + ] + + combined_metrics_keys = failed_request_keys + latency_keys # reduce cache calls + + combined_metrics_values = await self.internal_usage_cache.async_batch_get_cache( + keys=combined_metrics_keys + ) # [1, 2, None, ..] + + all_none = True + for val in combined_metrics_values: + if val is not None: + all_none = False + + if all_none: + return False + + failed_request_values = combined_metrics_values[ + : len(failed_request_keys) + ] # # [1, 2, None, ..] + latency_values = combined_metrics_values[len(failed_request_keys) :] + + # find top 5 failed + ## Replace None values with a placeholder value (-1 in this case) + placeholder_value = 0 + replaced_failed_values = [ + value if value is not None else placeholder_value + for value in failed_request_values + ] + + ## Get the indices of top 5 keys with the highest numerical values (ignoring None values) + top_5_failed = sorted( + range(len(replaced_failed_values)), + key=lambda i: replaced_failed_values[i], + reverse=True, + )[:5] + + # find top 5 slowest + # Replace None values with a placeholder value (-1 in this case) + placeholder_value = -1 + replaced_slowest_values = [ + value if value is not None else placeholder_value + for value in latency_values + ] + + # Get the indices of top 5 values with the highest numerical values (ignoring None values) + top_5_slowest = sorted( + range(len(replaced_slowest_values)), + key=lambda i: replaced_slowest_values[i], + reverse=True, + )[:5] + + # format alert -> return the litellm model name + api base + message = f"\n\nHere are today's key metrics 📈: \n\n" + + message += "\n\n*❗️ Top 5 Deployments with Most Failed Requests:*\n\n" + for i in range(len(top_5_failed)): + key = failed_request_keys[top_5_failed[i]].split(":")[0] + _deployment = router.get_model_info(key) + if isinstance(_deployment, dict): + deployment_name = _deployment["litellm_params"].get("model", "") + else: + return False + + api_base = litellm.get_api_base( + model=deployment_name, + optional_params=( + _deployment["litellm_params"] if _deployment is not None else {} + ), + ) + if api_base is None: + api_base = "" + value = replaced_failed_values[top_5_failed[i]] + message += f"\t{i+1}. Deployment: `{deployment_name}`, Failed Requests: `{value}`, API Base: `{api_base}`\n" + + message += "\n\n*😅 Top 5 Slowest Deployments:*\n\n" + for i in range(len(top_5_slowest)): + key = latency_keys[top_5_slowest[i]].split(":")[0] + _deployment = router.get_model_info(key) + if _deployment is not None: + deployment_name = _deployment["litellm_params"].get("model", "") + else: + deployment_name = "" + api_base = litellm.get_api_base( + model=deployment_name, + optional_params=( + _deployment["litellm_params"] if _deployment is not None else {} + ), + ) + value = replaced_slowest_values[top_5_slowest[i]] + message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency: `{value}`, API Base: `{api_base}`\n\n" + + # cache cleanup -> reset values to 0 + latency_cache_keys = [(key, 0) for key in latency_keys] + failed_request_cache_keys = [(key, 0) for key in failed_request_keys] + combined_metrics_cache_keys = latency_cache_keys + failed_request_cache_keys + await self.internal_usage_cache.async_batch_set_cache( + cache_list=combined_metrics_cache_keys + ) + + # send alert + await self.send_alert(message=message, level="Low", alert_type="daily_reports") + + return True async def response_taking_too_long( self, @@ -414,6 +637,7 @@ class SlackAlerting: "llm_requests_hanging", "budget_alerts", "db_exceptions", + "daily_reports", ], ): """ @@ -439,9 +663,12 @@ class SlackAlerting: # Get the current timestamp current_time = datetime.now().strftime("%H:%M:%S") _proxy_base_url = os.getenv("PROXY_BASE_URL", None) - formatted_message = ( - f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}" - ) + if alert_type == "daily_reports": + formatted_message = message + else: + formatted_message = ( + f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}" + ) if _proxy_base_url is not None: formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`" @@ -468,3 +695,36 @@ class SlackAlerting: pass else: print("Error sending slack alert. Error=", response.text) # noqa + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + """Log deployment latency""" + model_id = kwargs.get("litellm_params", {}).get("model_info", {}).get("id", "") + response_ms: timedelta = end_time - start_time + + final_value = response_ms + total_tokens = 0 + + if isinstance(response_obj, litellm.ModelResponse): + completion_tokens = response_obj.usage.completion_tokens + final_value = float(response_ms.total_seconds() / completion_tokens) + + await self.async_update_daily_reports( + DeploymentMetrics( + id=model_id, + failed_request=False, + latency_per_output_token=final_value, + updated_at=litellm.utils.get_utc_datetime(), + ) + ) + + async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): + """Log failure + deployment latency""" + model_id = kwargs.get("litellm_params", {}).get("model_info", {}).get("id", "") + await self.async_update_daily_reports( + DeploymentMetrics( + id=model_id, + failed_request=True, + latency_per_output_token=None, + updated_at=litellm.utils.get_utc_datetime(), + ) + ) diff --git a/litellm/proxy/_super_secret_config.yaml b/litellm/proxy/_super_secret_config.yaml index d90fb13fd..ec8d097b1 100644 --- a/litellm/proxy/_super_secret_config.yaml +++ b/litellm/proxy/_super_secret_config.yaml @@ -19,4 +19,4 @@ litellm_settings: general_settings: alerting: ["slack"] - alert_types: ["llm_exceptions"] \ No newline at end of file + alert_types: ["llm_exceptions", "daily_reports"] \ No newline at end of file diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index 1048c6727..8e66ff76c 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -73,6 +73,7 @@ class ProxyLogging: "llm_requests_hanging", "budget_alerts", "db_exceptions", + "daily_reports", ] ] = [ "llm_exceptions", @@ -80,11 +81,13 @@ class ProxyLogging: "llm_requests_hanging", "budget_alerts", "db_exceptions", + "daily_reports", ] self.slack_alerting_instance = SlackAlerting( alerting_threshold=self.alerting_threshold, alerting=self.alerting, alert_types=self.alert_types, + internal_usage_cache=self.internal_usage_cache, ) def update_values( @@ -100,6 +103,7 @@ class ProxyLogging: "llm_requests_hanging", "budget_alerts", "db_exceptions", + "daily_reports", ] ] ] = None, diff --git a/litellm/router.py b/litellm/router.py index fbb245a3d..52f778f0b 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -2597,7 +2597,10 @@ class Router: return model return None - def get_model_ids(self): + def get_model_ids(self) -> List[str]: + """ + Returns list of model id's. + """ ids = [] for model in self.model_list: if "model_info" in model and "id" in model["model_info"]: @@ -2605,7 +2608,7 @@ class Router: ids.append(id) return ids - def get_model_names(self): + def get_model_names(self) -> List[str]: return self.model_names def get_model_list(self): diff --git a/litellm/tests/test_alerting.py b/litellm/tests/test_alerting.py index 06c4e84b3..dd77a0d31 100644 --- a/litellm/tests/test_alerting.py +++ b/litellm/tests/test_alerting.py @@ -17,7 +17,7 @@ import asyncio from unittest.mock import patch, MagicMock from litellm.utils import get_api_base from litellm.caching import DualCache -from litellm.integrations.slack_alerting import SlackAlerting +from litellm.integrations.slack_alerting import SlackAlerting, DeploymentMetrics @pytest.mark.parametrize( @@ -116,7 +116,7 @@ from datetime import datetime, timedelta @pytest.fixture def slack_alerting(): - return SlackAlerting(alerting_threshold=1) + return SlackAlerting(alerting_threshold=1, internal_usage_cache=DualCache()) # Test for hanging LLM responses @@ -185,3 +185,88 @@ async def test_send_alert(slack_alerting): mock_post.return_value.status_code = 200 await slack_alerting.send_alert("Test message", "Low", "budget_alerts") mock_post.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_daily_reports_unit_test(slack_alerting): + with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: + router = litellm.Router( + model_list=[ + { + "model_name": "test-gpt", + "litellm_params": {"model": "gpt-3.5-turbo"}, + "model_info": {"id": "1234"}, + } + ] + ) + deployment_metrics = DeploymentMetrics( + id="1234", + failed_request=False, + latency_per_output_token=20.3, + updated_at=litellm.utils.get_utc_datetime(), + ) + + updated_val = await slack_alerting.async_update_daily_reports( + deployment_metrics=deployment_metrics + ) + + assert updated_val == 1 + + await slack_alerting.send_daily_reports(router=router) + + mock_send_alert.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_daily_reports_completion(slack_alerting): + with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: + litellm.callbacks = [slack_alerting] + + # on async success + router = litellm.Router( + model_list=[ + { + "model_name": "gpt-5", + "litellm_params": { + "model": "gpt-3.5-turbo", + }, + } + ] + ) + + await router.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Hey, how's it going?"}], + ) + + await asyncio.sleep(3) + response_val = await slack_alerting.send_daily_reports(router=router) + + assert response_val == True + + mock_send_alert.assert_awaited_once() + + # on async failure + router = litellm.Router( + model_list=[ + { + "model_name": "gpt-5", + "litellm_params": {"model": "gpt-3.5-turbo", "api_key": "bad_key"}, + } + ] + ) + + try: + await router.acompletion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Hey, how's it going?"}], + ) + except Exception as e: + pass + + await asyncio.sleep(3) + response_val = await slack_alerting.send_daily_reports(router=router) + + assert response_val == True + + mock_send_alert.assert_awaited() diff --git a/litellm/utils.py b/litellm/utils.py index 24ebcea91..563a551d2 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -5825,7 +5825,7 @@ def get_api_base(model: str, optional_params: dict) -> Optional[str]: Parameters: - model: str - the model passed to litellm.completion() - - optional_params - the additional params passed to litellm.completion - eg. api_base, api_key, etc. See `LiteLLM_Params` - https://github.com/BerriAI/litellm/blob/f09e6ba98d65e035a79f73bc069145002ceafd36/litellm/router.py#L67 + - optional_params - the 'litellm_params' in router.completion *OR* additional params passed to litellm.completion - eg. api_base, api_key, etc. See `LiteLLM_Params` - https://github.com/BerriAI/litellm/blob/f09e6ba98d65e035a79f73bc069145002ceafd36/litellm/router.py#L67 Returns: - string (api_base) or None