diff --git a/litellm/integrations/slack_alerting.py b/litellm/integrations/slack_alerting.py index af328666f7..171b8a8347 100644 --- a/litellm/integrations/slack_alerting.py +++ b/litellm/integrations/slack_alerting.py @@ -17,6 +17,7 @@ from pydantic import BaseModel from enum import Enum from datetime import datetime as dt, timedelta from litellm.integrations.custom_logger import CustomLogger +import random class LiteLLMBase(BaseModel): @@ -32,8 +33,9 @@ class LiteLLMBase(BaseModel): return self.dict() -class SlackArgs(LiteLLMBase): +class SlackAlertingArgs(LiteLLMBase): daily_report_frequency: int = 12 * 60 * 60 # 12 hours + report_check_interval: int = 5 * 60 # 5 minutes class DeploymentMetrics(LiteLLMBase): @@ -63,6 +65,7 @@ class SlackAlertingCacheKeys(Enum): failed_requests_key = "failed_requests_daily_metrics" latency_key = "latency_daily_metrics" + report_sent_key = "daily_metrics_report_sent" class SlackAlerting(CustomLogger): @@ -94,6 +97,7 @@ class SlackAlerting(CustomLogger): alert_to_webhook_url: Optional[ Dict ] = None, # if user wants to separate alerts to diff channels + alerting_args={}, ): self.alerting_threshold = alerting_threshold self.alerting = alerting @@ -102,6 +106,7 @@ class SlackAlerting(CustomLogger): self.async_http_handler = AsyncHTTPHandler() self.alert_to_webhook_url = alert_to_webhook_url self.is_running = False + self.alerting_args = SlackAlertingArgs(**alerting_args) def update_values( self, @@ -109,6 +114,7 @@ class SlackAlerting(CustomLogger): alerting_threshold: Optional[float] = None, alert_types: Optional[List] = None, alert_to_webhook_url: Optional[Dict] = None, + alerting_args: Optional[Dict] = None, ): if alerting is not None: self.alerting = alerting @@ -116,7 +122,8 @@ class SlackAlerting(CustomLogger): self.alerting_threshold = alerting_threshold if alert_types is not None: self.alert_types = alert_types - + if alerting_args is not None: + self.alerting_args = SlackAlertingArgs(**alerting_args) if alert_to_webhook_url is not None: # update the dict if self.alert_to_webhook_url is None: @@ -356,7 +363,7 @@ class SlackAlerting(CustomLogger): # find top 5 slowest # Replace None values with a placeholder value (-1 in this case) - placeholder_value = -1 + placeholder_value = 0 replaced_slowest_values = [ value if value is not None else placeholder_value for value in latency_values @@ -406,8 +413,8 @@ class SlackAlerting(CustomLogger): _deployment["litellm_params"] if _deployment is not None else {} ), ) - value = replaced_slowest_values[top_5_slowest[i]] - message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency: `{value}`, API Base: `{api_base}`\n\n" + value = round(replaced_slowest_values[top_5_slowest[i]], 3) + message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency per output token: `{value}s/token`, API Base: `{api_base}`\n\n" # cache cleanup -> reset values to 0 latency_cache_keys = [(key, 0) for key in latency_keys] @@ -698,33 +705,82 @@ class SlackAlerting(CustomLogger): async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): """Log deployment latency""" - model_id = kwargs.get("litellm_params", {}).get("model_info", {}).get("id", "") - response_ms: timedelta = end_time - start_time - - final_value = response_ms - total_tokens = 0 - - if isinstance(response_obj, litellm.ModelResponse): - completion_tokens = response_obj.usage.completion_tokens - final_value = float(response_ms.total_seconds() / completion_tokens) - - await self.async_update_daily_reports( - DeploymentMetrics( - id=model_id, - failed_request=False, - latency_per_output_token=final_value, - updated_at=litellm.utils.get_utc_datetime(), + if "daily_reports" in self.alert_types: + model_id = ( + kwargs.get("litellm_params", {}).get("model_info", {}).get("id", "") + ) + response_s: timedelta = end_time - start_time + + final_value = response_s + total_tokens = 0 + + if isinstance(response_obj, litellm.ModelResponse): + completion_tokens = response_obj.usage.completion_tokens + final_value = float(response_s.total_seconds() / completion_tokens) + + await self.async_update_daily_reports( + DeploymentMetrics( + id=model_id, + failed_request=False, + latency_per_output_token=final_value, + updated_at=litellm.utils.get_utc_datetime(), + ) ) - ) async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): """Log failure + deployment latency""" - model_id = kwargs.get("litellm_params", {}).get("model_info", {}).get("id", "") - await self.async_update_daily_reports( - DeploymentMetrics( - id=model_id, - failed_request=True, - latency_per_output_token=None, - updated_at=litellm.utils.get_utc_datetime(), + if "daily_reports" in self.alert_types: + model_id = ( + kwargs.get("litellm_params", {}).get("model_info", {}).get("id", "") ) - ) + await self.async_update_daily_reports( + DeploymentMetrics( + id=model_id, + failed_request=True, + latency_per_output_token=None, + updated_at=litellm.utils.get_utc_datetime(), + ) + ) + + async def _run_scheduled_daily_report(self, llm_router: Optional[litellm.Router]): + """ + If 'daily_reports' enabled + + Ping redis cache every 5 minutes to check if we should send the report + + If yes -> call send_daily_report() + """ + if llm_router is None or self.alert_types is None: + return + + if "daily_reports" in self.alert_types: + while True: + report_sent = await self.internal_usage_cache.async_get_cache( + key=SlackAlertingCacheKeys.report_sent_key.value + ) # None | datetime + + if report_sent is None: + await self.internal_usage_cache.async_set_cache( + key=SlackAlertingCacheKeys.report_sent_key.value, + value=litellm.utils.get_utc_datetime(), + ) + else: + # check if current time - interval >= time last sent + current_time = litellm.utils.get_utc_datetime() + delta = current_time - timedelta( + seconds=self.alerting_args.daily_report_frequency + ) + if delta >= report_sent: + # Sneak in the reporting logic here + await self.send_daily_reports(router=llm_router) + # Also, don't forget to update the report_sent time after sending the report! + await self.internal_usage_cache.async_set_cache( + key=SlackAlertingCacheKeys.report_sent_key.value, + value=litellm.utils.get_utc_datetime(), + ) + interval = random.randint( + self.alerting_args.report_check_interval - 3, + self.alerting_args.report_check_interval + 3, + ) # shuffle to prevent collisions + await asyncio.sleep(interval) + return diff --git a/litellm/proxy/_super_secret_config.yaml b/litellm/proxy/_super_secret_config.yaml index ec8d097b14..0475508e31 100644 --- a/litellm/proxy/_super_secret_config.yaml +++ b/litellm/proxy/_super_secret_config.yaml @@ -4,6 +4,16 @@ model_list: api_key: my-fake-key model: openai/my-fake-model model_name: fake-openai-endpoint +- litellm_params: + api_base: https://openai-function-calling-workers.tasslexyz.workers.dev/ + api_key: my-fake-key-2 + model: openai/my-fake-model-2 + model_name: fake-openai-endpoint +- litellm_params: + api_base: https://openai-function-calling-workers.tasslexyz.workers.dev/ + api_key: my-fake-key-3 + model: openai/my-fake-model-3 + model_name: fake-openai-endpoint router_settings: num_retries: 0 enable_pre_call_checks: true @@ -19,4 +29,7 @@ litellm_settings: general_settings: alerting: ["slack"] - alert_types: ["llm_exceptions", "daily_reports"] \ No newline at end of file + alert_types: ["llm_exceptions", "daily_reports"] + alerting_args: + daily_report_frequency: 60 # every minute + report_check_interval: 5 # every 5s \ No newline at end of file diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index 4bb8dee7f9..dbea1c57f2 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -1900,9 +1900,6 @@ async def _run_background_health_check(): await asyncio.sleep(health_check_interval) -semaphore = asyncio.Semaphore(1) - - class ProxyConfig: """ Abstraction class on top of config loading/updating logic. Gives us one place to control all config updating logic. @@ -2377,6 +2374,7 @@ class ProxyConfig: alerting=general_settings.get("alerting", None), alerting_threshold=general_settings.get("alerting_threshold", 600), alert_types=general_settings.get("alert_types", None), + alerting_args=general_settings.get("alerting_args", None), redis_cache=redis_usage_cache, ) ### CONNECT TO DATABASE ### @@ -2501,7 +2499,7 @@ class ProxyConfig: for k, v in router_settings.items(): if k in available_args: router_params[k] = v - router = litellm.Router(**router_params, semaphore=semaphore) # type:ignore + router = litellm.Router(**router_params) # type:ignore return router, model_list, general_settings def get_model_info_with_id(self, model) -> RouterModelInfo: @@ -3273,6 +3271,13 @@ async def startup_event(): proxy_logging_obj._init_litellm_callbacks() # INITIALIZE LITELLM CALLBACKS ON SERVER STARTUP <- do this to catch any logging errors on startup, not when calls are being made + if "daily_reports" in proxy_logging_obj.slack_alerting_instance.alert_types: + asyncio.create_task( + proxy_logging_obj.slack_alerting_instance._run_scheduled_daily_report( + llm_router=llm_router + ) + ) # RUN DAILY REPORT (if scheduled) + ## JWT AUTH ## if general_settings.get("litellm_jwtauth", None) is not None: for k, v in general_settings["litellm_jwtauth"].items(): diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index 8e66ff76ce..9734806dfb 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -107,6 +107,7 @@ class ProxyLogging: ] ] ] = None, + alerting_args: Optional[dict] = None, ): self.alerting = alerting if alerting_threshold is not None: @@ -118,8 +119,12 @@ class ProxyLogging: alerting=self.alerting, alerting_threshold=self.alerting_threshold, alert_types=self.alert_types, + alerting_args=alerting_args, ) + if "daily_reports" in self.alert_types: + litellm.callbacks.append(self.slack_alerting_instance) # type: ignore + if redis_cache is not None: self.internal_usage_cache.redis_cache = redis_cache