#### What this does #### # Class for sending Slack Alerts # import dotenv, os from litellm.proxy._types import UserAPIKeyAuth, CallInfo from litellm._logging import verbose_logger, verbose_proxy_logger import litellm, threading from typing import List, Literal, Any, Union, Optional, Dict from litellm.caching import DualCache import asyncio, time import aiohttp from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler import datetime from pydantic import BaseModel from enum import Enum from datetime import datetime as dt, timedelta, timezone from litellm.integrations.custom_logger import CustomLogger import random class LiteLLMBase(BaseModel): """ Implements default functions, all pydantic objects should have. """ def json(self, **kwargs): try: return self.model_dump() # noqa except: # if using pydantic v1 return self.dict() class SlackAlertingArgs(LiteLLMBase): default_daily_report_frequency: int = 12 * 60 * 60 # 12 hours daily_report_frequency: int = int( os.getenv("SLACK_DAILY_REPORT_FREQUENCY", default_daily_report_frequency) ) report_check_interval: int = 5 * 60 # 5 minutes budget_alert_ttl: int = 24 * 60 * 60 # 24 hours class WebhookEvent(CallInfo): event: Literal["budget_crossed", "threshold_crossed", "projected_limit_exceeded"] event_group: Literal["user", "key", "team", "proxy"] event_message: str # human-readable description of event class DeploymentMetrics(LiteLLMBase): """ Metrics per deployment, stored in cache Used for daily reporting """ id: str """id of deployment in router model list""" failed_request: bool """did it fail the request?""" latency_per_output_token: Optional[float] """latency/output token of deployment""" updated_at: dt """Current time of deployment being updated""" class SlackAlertingCacheKeys(Enum): """ Enum for deployment daily metrics keys - {deployment_id}:{enum} """ failed_requests_key = "failed_requests_daily_metrics" latency_key = "latency_daily_metrics" report_sent_key = "daily_metrics_report_sent" class SlackAlerting(CustomLogger): """ Class for sending Slack Alerts """ # Class variables or attributes def __init__( self, internal_usage_cache: Optional[DualCache] = None, alerting_threshold: float = 300, # threshold for slow / hanging llm responses (in seconds) alerting: Optional[List] = [], alert_types: List[ Literal[ "llm_exceptions", "llm_too_slow", "llm_requests_hanging", "budget_alerts", "db_exceptions", "daily_reports", "spend_reports", "cooldown_deployment", "new_model_added", ] ] = [ "llm_exceptions", "llm_too_slow", "llm_requests_hanging", "budget_alerts", "db_exceptions", "daily_reports", "spend_reports", "cooldown_deployment", "new_model_added", ], alert_to_webhook_url: Optional[ Dict ] = None, # if user wants to separate alerts to diff channels alerting_args={}, default_webhook_url: Optional[str] = None, ): self.alerting_threshold = alerting_threshold self.alerting = alerting self.alert_types = alert_types self.internal_usage_cache = internal_usage_cache or DualCache() self.async_http_handler = AsyncHTTPHandler() self.alert_to_webhook_url = alert_to_webhook_url self.is_running = False self.alerting_args = SlackAlertingArgs(**alerting_args) self.default_webhook_url = default_webhook_url def update_values( self, alerting: Optional[List] = None, alerting_threshold: Optional[float] = None, alert_types: Optional[List] = None, alert_to_webhook_url: Optional[Dict] = None, alerting_args: Optional[Dict] = None, ): if alerting is not None: self.alerting = alerting if alerting_threshold is not None: self.alerting_threshold = alerting_threshold if alert_types is not None: self.alert_types = alert_types if alerting_args is not None: self.alerting_args = SlackAlertingArgs(**alerting_args) if alert_to_webhook_url is not None: # update the dict if self.alert_to_webhook_url is None: self.alert_to_webhook_url = alert_to_webhook_url else: self.alert_to_webhook_url.update(alert_to_webhook_url) async def deployment_in_cooldown(self): pass async def deployment_removed_from_cooldown(self): pass def _all_possible_alert_types(self): # used by the UI to show all supported alert types # Note: This is not the alerts the user has configured, instead it's all possible alert types a user can select return [ "llm_exceptions", "llm_too_slow", "llm_requests_hanging", "budget_alerts", "db_exceptions", ] def _add_langfuse_trace_id_to_alert( self, request_data: Optional[dict] = None, ) -> Optional[str]: """ Returns langfuse trace url - check: -> existing_trace_id -> trace_id -> litellm_call_id """ # do nothing for now if request_data is not None: trace_id = None if ( request_data.get("metadata", {}).get("existing_trace_id", None) is not None ): trace_id = request_data["metadata"]["existing_trace_id"] elif request_data.get("metadata", {}).get("trace_id", None) is not None: trace_id = request_data["metadata"]["trace_id"] elif request_data.get("litellm_logging_obj", None) is not None and hasattr( request_data["litellm_logging_obj"], "model_call_details" ): trace_id = request_data["litellm_logging_obj"].model_call_details[ "litellm_call_id" ] if litellm.utils.langFuseLogger is not None: base_url = litellm.utils.langFuseLogger.Langfuse.base_url return f"{base_url}/trace/{trace_id}" return None def _response_taking_too_long_callback_helper( self, kwargs, # kwargs to completion start_time, end_time, # start/end time ): try: time_difference = end_time - start_time # Convert the timedelta to float (in seconds) time_difference_float = time_difference.total_seconds() litellm_params = kwargs.get("litellm_params", {}) model = kwargs.get("model", "") api_base = litellm.get_api_base(model=model, optional_params=litellm_params) messages = kwargs.get("messages", None) # if messages does not exist fallback to "input" if messages is None: messages = kwargs.get("input", None) # only use first 100 chars for alerting _messages = str(messages)[:100] return time_difference_float, model, api_base, _messages except Exception as e: raise e def _get_deployment_latencies_to_alert(self, metadata=None): if metadata is None: return None if "_latency_per_deployment" in metadata: # Translate model_id to -> api_base # _latency_per_deployment is a dictionary that looks like this: """ _latency_per_deployment: { api_base: 0.01336697916666667 } """ _message_to_send = "" _deployment_latencies = metadata["_latency_per_deployment"] if len(_deployment_latencies) == 0: return None try: # try sorting deployments by latency _deployment_latencies = sorted( _deployment_latencies.items(), key=lambda x: x[1] ) _deployment_latencies = dict(_deployment_latencies) except: pass for api_base, latency in _deployment_latencies.items(): _message_to_send += f"\n{api_base}: {round(latency,2)}s" _message_to_send = "```" + _message_to_send + "```" return _message_to_send async def response_taking_too_long_callback( self, kwargs, # kwargs to completion completion_response, # response from completion start_time, end_time, # start/end time ): if self.alerting is None or self.alert_types is None: return time_difference_float, model, api_base, messages = ( self._response_taking_too_long_callback_helper( kwargs=kwargs, start_time=start_time, end_time=end_time, ) ) if litellm.turn_off_message_logging: messages = "Message not logged. `litellm.turn_off_message_logging=True`." request_info = f"\nRequest Model: `{model}`\nAPI Base: `{api_base}`\nMessages: `{messages}`" slow_message = f"`Responses are slow - {round(time_difference_float,2)}s response time > Alerting threshold: {self.alerting_threshold}s`" if time_difference_float > self.alerting_threshold: # add deployment latencies to alert if ( kwargs is not None and "litellm_params" in kwargs and "metadata" in kwargs["litellm_params"] ): _metadata = kwargs["litellm_params"]["metadata"] request_info = litellm.utils._add_key_name_and_team_to_alert( request_info=request_info, metadata=_metadata ) _deployment_latency_map = self._get_deployment_latencies_to_alert( metadata=_metadata ) if _deployment_latency_map is not None: request_info += ( f"\nAvailable Deployment Latencies\n{_deployment_latency_map}" ) await self.send_alert( message=slow_message + request_info, level="Low", alert_type="llm_too_slow", ) async def async_update_daily_reports( self, deployment_metrics: DeploymentMetrics ) -> int: """ Store the perf by deployment in cache - Number of failed requests per deployment - Latency / output tokens per deployment 'deployment_id:daily_metrics:failed_requests' 'deployment_id:daily_metrics:latency_per_output_token' Returns int - count of metrics set (1 - if just latency, 2 - if failed + latency) """ return_val = 0 try: ## FAILED REQUESTS ## if deployment_metrics.failed_request: await self.internal_usage_cache.async_increment_cache( key="{}:{}".format( deployment_metrics.id, SlackAlertingCacheKeys.failed_requests_key.value, ), value=1, ) return_val += 1 ## LATENCY ## if deployment_metrics.latency_per_output_token is not None: await self.internal_usage_cache.async_increment_cache( key="{}:{}".format( deployment_metrics.id, SlackAlertingCacheKeys.latency_key.value ), value=deployment_metrics.latency_per_output_token, ) return_val += 1 return return_val except Exception as e: return 0 async def send_daily_reports(self, router) -> bool: """ Send a daily report on: - Top 5 deployments with most failed requests - Top 5 slowest deployments (normalized by latency/output tokens) Get the value from redis cache (if available) or in-memory and send it Cleanup: - reset values in cache -> prevent memory leak Returns: True -> if successfuly sent False -> if not sent """ ids = router.get_model_ids() # get keys failed_request_keys = [ "{}:{}".format(id, SlackAlertingCacheKeys.failed_requests_key.value) for id in ids ] latency_keys = [ "{}:{}".format(id, SlackAlertingCacheKeys.latency_key.value) for id in ids ] combined_metrics_keys = failed_request_keys + latency_keys # reduce cache calls combined_metrics_values = await self.internal_usage_cache.async_batch_get_cache( keys=combined_metrics_keys ) # [1, 2, None, ..] if combined_metrics_values is None: return False all_none = True for val in combined_metrics_values: if val is not None and val > 0: all_none = False break if all_none: return False failed_request_values = combined_metrics_values[ : len(failed_request_keys) ] # # [1, 2, None, ..] latency_values = combined_metrics_values[len(failed_request_keys) :] # find top 5 failed ## Replace None values with a placeholder value (-1 in this case) placeholder_value = 0 replaced_failed_values = [ value if value is not None else placeholder_value for value in failed_request_values ] ## Get the indices of top 5 keys with the highest numerical values (ignoring None and 0 values) top_5_failed = sorted( range(len(replaced_failed_values)), key=lambda i: replaced_failed_values[i], reverse=True, )[:5] top_5_failed = [ index for index in top_5_failed if replaced_failed_values[index] > 0 ] # find top 5 slowest # Replace None values with a placeholder value (-1 in this case) placeholder_value = 0 replaced_slowest_values = [ value if value is not None else placeholder_value for value in latency_values ] # Get the indices of top 5 values with the highest numerical values (ignoring None and 0 values) top_5_slowest = sorted( range(len(replaced_slowest_values)), key=lambda i: replaced_slowest_values[i], reverse=True, )[:5] top_5_slowest = [ index for index in top_5_slowest if replaced_slowest_values[index] > 0 ] # format alert -> return the litellm model name + api base message = f"\n\nTime: `{time.time()}`s\nHere are today's key metrics šŸ“ˆ: \n\n" message += "\n\n*ā—ļø Top Deployments with Most Failed Requests:*\n\n" if not top_5_failed: message += "\tNone\n" for i in range(len(top_5_failed)): key = failed_request_keys[top_5_failed[i]].split(":")[0] _deployment = router.get_model_info(key) if isinstance(_deployment, dict): deployment_name = _deployment["litellm_params"].get("model", "") else: return False api_base = litellm.get_api_base( model=deployment_name, optional_params=( _deployment["litellm_params"] if _deployment is not None else {} ), ) if api_base is None: api_base = "" value = replaced_failed_values[top_5_failed[i]] message += f"\t{i+1}. Deployment: `{deployment_name}`, Failed Requests: `{value}`, API Base: `{api_base}`\n" message += "\n\n*šŸ˜… Top Slowest Deployments:*\n\n" if not top_5_slowest: message += "\tNone\n" for i in range(len(top_5_slowest)): key = latency_keys[top_5_slowest[i]].split(":")[0] _deployment = router.get_model_info(key) if _deployment is not None: deployment_name = _deployment["litellm_params"].get("model", "") else: deployment_name = "" api_base = litellm.get_api_base( model=deployment_name, optional_params=( _deployment["litellm_params"] if _deployment is not None else {} ), ) value = round(replaced_slowest_values[top_5_slowest[i]], 3) message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency per output token: `{value}s/token`, API Base: `{api_base}`\n\n" # cache cleanup -> reset values to 0 latency_cache_keys = [(key, 0) for key in latency_keys] failed_request_cache_keys = [(key, 0) for key in failed_request_keys] combined_metrics_cache_keys = latency_cache_keys + failed_request_cache_keys await self.internal_usage_cache.async_batch_set_cache( cache_list=combined_metrics_cache_keys ) message += f"\n\nNext Run is in: `{time.time() + self.alerting_args.daily_report_frequency}`s" # send alert await self.send_alert(message=message, level="Low", alert_type="daily_reports") return True async def response_taking_too_long( self, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, type: Literal["hanging_request", "slow_response"] = "hanging_request", request_data: Optional[dict] = None, ): if self.alerting is None or self.alert_types is None: return if request_data is not None: model = request_data.get("model", "") messages = request_data.get("messages", None) if messages is None: # if messages does not exist fallback to "input" messages = request_data.get("input", None) # try casting messages to str and get the first 100 characters, else mark as None try: messages = str(messages) messages = messages[:100] except: messages = "" if litellm.turn_off_message_logging: messages = ( "Message not logged. `litellm.turn_off_message_logging=True`." ) request_info = f"\nRequest Model: `{model}`\nMessages: `{messages}`" else: request_info = "" if type == "hanging_request": await asyncio.sleep( self.alerting_threshold ) # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests if ( request_data is not None and request_data.get("litellm_status", "") != "success" and request_data.get("litellm_status", "") != "fail" ): if request_data.get("deployment", None) is not None and isinstance( request_data["deployment"], dict ): _api_base = litellm.get_api_base( model=model, optional_params=request_data["deployment"].get( "litellm_params", {} ), ) if _api_base is None: _api_base = "" request_info += f"\nAPI Base: {_api_base}" elif request_data.get("metadata", None) is not None and isinstance( request_data["metadata"], dict ): # In hanging requests sometime it has not made it to the point where the deployment is passed to the `request_data`` # in that case we fallback to the api base set in the request metadata _metadata = request_data["metadata"] _api_base = _metadata.get("api_base", "") request_info = litellm.utils._add_key_name_and_team_to_alert( request_info=request_info, metadata=_metadata ) if _api_base is None: _api_base = "" request_info += f"\nAPI Base: `{_api_base}`" # only alert hanging responses if they have not been marked as success alerting_message = ( f"`Requests are hanging - {self.alerting_threshold}s+ request time`" ) if "langfuse" in litellm.success_callback: langfuse_url = self._add_langfuse_trace_id_to_alert( request_data=request_data, ) if langfuse_url is not None: request_info += "\n🪢 Langfuse Trace: {}".format(langfuse_url) # add deployment latencies to alert _deployment_latency_map = self._get_deployment_latencies_to_alert( metadata=request_data.get("metadata", {}) ) if _deployment_latency_map is not None: request_info += f"\nDeployment Latencies\n{_deployment_latency_map}" await self.send_alert( message=alerting_message + request_info, level="Medium", alert_type="llm_requests_hanging", ) async def failed_tracking_alert(self, error_message: str): """Raise alert when tracking failed for specific model""" _cache: DualCache = self.internal_usage_cache message = "Failed Tracking Cost for" + error_message _cache_key = "budget_alerts:failed_tracking:{}".format(message) result = await _cache.async_get_cache(key=_cache_key) if result is None: await self.send_alert( message=message, level="High", alert_type="budget_alerts" ) await _cache.async_set_cache( key=_cache_key, value="SENT", ttl=self.alerting_args.budget_alert_ttl, ) async def budget_alerts( self, type: Literal[ "token_budget", "user_budget", "team_budget", "proxy_budget", "projected_limit_exceeded", ], user_info: CallInfo, ): ## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727 # - Alert once within 24hr period # - Cache this information # - Don't re-alert, if alert already sent _cache: DualCache = self.internal_usage_cache if self.alerting is None or self.alert_types is None: # do nothing if alerting is not switched on return if "budget_alerts" not in self.alert_types: return _id: str = "default_id" # used for caching user_info_json = user_info.model_dump(exclude_none=True) for k, v in user_info_json.items(): user_info_str = "\n{}: {}\n".format(k, v) event: Optional[ Literal["budget_crossed", "threshold_crossed", "projected_limit_exceeded"] ] = None event_group: Optional[Literal["user", "team", "key", "proxy"]] = None event_message: str = "" webhook_event: Optional[WebhookEvent] = None if type == "proxy_budget": event_group = "proxy" event_message += "Proxy Budget: " elif type == "user_budget": event_group = "user" event_message += "User Budget: " _id = user_info.user_id or _id elif type == "team_budget": event_group = "team" event_message += "Team Budget: " _id = user_info.team_id or _id elif type == "token_budget": event_group = "key" event_message += "Key Budget: " _id = user_info.token elif type == "projected_limit_exceeded": event_group = "key" event_message += "Key Budget: Projected Limit Exceeded" event = "projected_limit_exceeded" _id = user_info.token # percent of max_budget left to spend if user_info.max_budget > 0: percent_left = ( user_info.max_budget - user_info.spend ) / user_info.max_budget else: percent_left = 0 # check if crossed budget if user_info.spend >= user_info.max_budget: event = "budget_crossed" event_message += "Budget Crossed" elif percent_left <= 0.05: event = "threshold_crossed" event_message += "5% Threshold Crossed" elif percent_left <= 0.15: event = "threshold_crossed" event_message += "15% Threshold Crossed" if event is not None and event_group is not None: _cache_key = "budget_alerts:{}:{}".format(event, _id) result = await _cache.async_get_cache(key=_cache_key) if result is None: webhook_event = WebhookEvent( event=event, event_group=event_group, event_message=event_message, **user_info_json, ) await self.send_alert( message=event_message + "\n\n" + user_info_str, level="High", alert_type="budget_alerts", user_info=webhook_event, ) await _cache.async_set_cache( key=_cache_key, value="SENT", ttl=self.alerting_args.budget_alert_ttl, ) return return async def model_added_alert( self, model_name: str, litellm_model_name: str, passed_model_info: Any ): base_model_from_user = getattr(passed_model_info, "base_model", None) model_info = {} base_model = "" if base_model_from_user is not None: model_info = litellm.model_cost.get(base_model_from_user, {}) base_model = f"Base Model: `{base_model_from_user}`\n" else: model_info = litellm.model_cost.get(litellm_model_name, {}) model_info_str = "" for k, v in model_info.items(): if k == "input_cost_per_token" or k == "output_cost_per_token": # when converting to string it should not be 1.63e-06 v = "{:.8f}".format(v) model_info_str += f"{k}: {v}\n" message = f""" *šŸš… New Model Added* Model Name: `{model_name}` {base_model} Usage OpenAI Python SDK: ``` import openai client = openai.OpenAI( api_key="your_api_key", base_url={os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")} ) response = client.chat.completions.create( model="{model_name}", # model to send to the proxy messages = [ {{ "role": "user", "content": "this is a test request, write a short poem" }} ] ) ``` Model Info: ``` {model_info_str} ``` """ await self.send_alert( message=message, level="Low", alert_type="new_model_added" ) pass async def model_removed_alert(self, model_name: str): pass async def send_webhook_alert(self, webhook_event: WebhookEvent) -> bool: """ Sends structured alert to webhook, if set. Currently only implemented for budget alerts Returns -> True if sent, False if not. """ webhook_url = os.getenv("WEBHOOK_URL", None) if webhook_url is None: raise Exception("Missing webhook_url from environment") payload = webhook_event.model_dump_json() headers = {"Content-type": "application/json"} response = await self.async_http_handler.post( url=webhook_url, headers=headers, data=payload, ) if response.status_code == 200: return True else: print("Error sending webhook alert. Error=", response.text) # noqa return False async def send_alert( self, message: str, level: Literal["Low", "Medium", "High"], alert_type: Literal[ "llm_exceptions", "llm_too_slow", "llm_requests_hanging", "budget_alerts", "db_exceptions", "daily_reports", "spend_reports", "new_model_added", "cooldown_deployment", ], user_info: Optional[WebhookEvent] = None, **kwargs, ): """ Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298 - Responses taking too long - Requests are hanging - Calls are failing - DB Read/Writes are failing - Proxy Close to max budget - Key Close to max budget Parameters: level: str - Low|Medium|High - if calls might fail (Medium) or are failing (High); Currently, no alerts would be 'Low'. message: str - what is the alert about """ if self.alerting is None: return if ( "webhook" in self.alerting and alert_type == "budget_alerts" and user_info is not None ): await self.send_webhook_alert(webhook_event=user_info) if "slack" not in self.alerting: return if alert_type not in self.alert_types: return from datetime import datetime import json # Get the current timestamp current_time = datetime.now().strftime("%H:%M:%S") _proxy_base_url = os.getenv("PROXY_BASE_URL", None) if alert_type == "daily_reports" or alert_type == "new_model_added": formatted_message = message else: formatted_message = ( f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}" ) if kwargs: for key, value in kwargs.items(): formatted_message += f"\n\n{key}: `{value}`\n\n" if _proxy_base_url is not None: formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`" # check if we find the slack webhook url in self.alert_to_webhook_url if ( self.alert_to_webhook_url is not None and alert_type in self.alert_to_webhook_url ): slack_webhook_url = self.alert_to_webhook_url[alert_type] elif self.default_webhook_url is not None: slack_webhook_url = self.default_webhook_url else: slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL", None) if slack_webhook_url is None: raise Exception("Missing SLACK_WEBHOOK_URL from environment") payload = {"text": formatted_message} headers = {"Content-type": "application/json"} response = await self.async_http_handler.post( url=slack_webhook_url, headers=headers, data=json.dumps(payload), ) if response.status_code == 200: pass else: print("Error sending slack alert. Error=", response.text) # noqa async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): """Log deployment latency""" try: if "daily_reports" in self.alert_types: model_id = ( kwargs.get("litellm_params", {}).get("model_info", {}).get("id", "") ) response_s: timedelta = end_time - start_time final_value = response_s total_tokens = 0 if isinstance(response_obj, litellm.ModelResponse): completion_tokens = response_obj.usage.completion_tokens if completion_tokens is not None and completion_tokens > 0: final_value = float( response_s.total_seconds() / completion_tokens ) await self.async_update_daily_reports( DeploymentMetrics( id=model_id, failed_request=False, latency_per_output_token=final_value, updated_at=litellm.utils.get_utc_datetime(), ) ) except Exception as e: verbose_proxy_logger.error( "[Non-Blocking Error] Slack Alerting: Got error in logging LLM deployment latency: ", e, ) pass async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): """Log failure + deployment latency""" if "daily_reports" in self.alert_types: model_id = ( kwargs.get("litellm_params", {}).get("model_info", {}).get("id", "") ) await self.async_update_daily_reports( DeploymentMetrics( id=model_id, failed_request=True, latency_per_output_token=None, updated_at=litellm.utils.get_utc_datetime(), ) ) async def _run_scheduler_helper(self, llm_router) -> bool: """ Returns: - True -> report sent - False -> report not sent """ report_sent_bool = False report_sent = await self.internal_usage_cache.async_get_cache( key=SlackAlertingCacheKeys.report_sent_key.value ) # None | float current_time = time.time() if report_sent is None: await self.internal_usage_cache.async_set_cache( key=SlackAlertingCacheKeys.report_sent_key.value, value=current_time, ) elif isinstance(report_sent, float): # Check if current time - interval >= time last sent interval_seconds = self.alerting_args.daily_report_frequency if current_time - report_sent >= interval_seconds: # Sneak in the reporting logic here await self.send_daily_reports(router=llm_router) # Also, don't forget to update the report_sent time after sending the report! await self.internal_usage_cache.async_set_cache( key=SlackAlertingCacheKeys.report_sent_key.value, value=current_time, ) report_sent_bool = True return report_sent_bool async def _run_scheduled_daily_report(self, llm_router: Optional[Any] = None): """ If 'daily_reports' enabled Ping redis cache every 5 minutes to check if we should send the report If yes -> call send_daily_report() """ if llm_router is None or self.alert_types is None: return if "daily_reports" in self.alert_types: while True: await self._run_scheduler_helper(llm_router=llm_router) interval = random.randint( self.alerting_args.report_check_interval - 3, self.alerting_args.report_check_interval + 3, ) # shuffle to prevent collisions await asyncio.sleep(interval) return async def send_weekly_spend_report(self): """ """ try: from litellm.proxy.proxy_server import _get_spend_report_for_time_range todays_date = datetime.datetime.now().date() week_before = todays_date - datetime.timedelta(days=7) weekly_spend_per_team, weekly_spend_per_tag = ( await _get_spend_report_for_time_range( start_date=week_before.strftime("%Y-%m-%d"), end_date=todays_date.strftime("%Y-%m-%d"), ) ) _weekly_spend_message = f"*šŸ’ø Weekly Spend Report for `{week_before.strftime('%m-%d-%Y')} - {todays_date.strftime('%m-%d-%Y')}` *\n" if weekly_spend_per_team is not None: _weekly_spend_message += "\n*Team Spend Report:*\n" for spend in weekly_spend_per_team: _team_spend = spend["total_spend"] _team_spend = float(_team_spend) # round to 4 decimal places _team_spend = round(_team_spend, 4) _weekly_spend_message += ( f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n" ) if weekly_spend_per_tag is not None: _weekly_spend_message += "\n*Tag Spend Report:*\n" for spend in weekly_spend_per_tag: _tag_spend = spend["total_spend"] _tag_spend = float(_tag_spend) # round to 4 decimal places _tag_spend = round(_tag_spend, 4) _weekly_spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n" await self.send_alert( message=_weekly_spend_message, level="Low", alert_type="spend_reports", ) except Exception as e: verbose_proxy_logger.error("Error sending weekly spend report", e) async def send_monthly_spend_report(self): """ """ try: from calendar import monthrange from litellm.proxy.proxy_server import _get_spend_report_for_time_range todays_date = datetime.datetime.now().date() first_day_of_month = todays_date.replace(day=1) _, last_day_of_month = monthrange(todays_date.year, todays_date.month) last_day_of_month = first_day_of_month + datetime.timedelta( days=last_day_of_month - 1 ) monthly_spend_per_team, monthly_spend_per_tag = ( await _get_spend_report_for_time_range( start_date=first_day_of_month.strftime("%Y-%m-%d"), end_date=last_day_of_month.strftime("%Y-%m-%d"), ) ) _spend_message = f"*šŸ’ø Monthly Spend Report for `{first_day_of_month.strftime('%m-%d-%Y')} - {last_day_of_month.strftime('%m-%d-%Y')}` *\n" if monthly_spend_per_team is not None: _spend_message += "\n*Team Spend Report:*\n" for spend in monthly_spend_per_team: _team_spend = spend["total_spend"] _team_spend = float(_team_spend) # round to 4 decimal places _team_spend = round(_team_spend, 4) _spend_message += ( f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n" ) if monthly_spend_per_tag is not None: _spend_message += "\n*Tag Spend Report:*\n" for spend in monthly_spend_per_tag: _tag_spend = spend["total_spend"] _tag_spend = float(_tag_spend) # round to 4 decimal places _tag_spend = round(_tag_spend, 4) _spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n" await self.send_alert( message=_spend_message, level="Low", alert_type="spend_reports", ) except Exception as e: verbose_proxy_logger.error("Error sending weekly spend report", e)