#### What this does #### # Class for sending Slack Alerts # import dotenv, os, traceback from litellm.proxy._types import UserAPIKeyAuth, CallInfo, AlertType from litellm._logging import verbose_logger, verbose_proxy_logger import litellm, threading from typing import List, Literal, Any, Union, Optional, Dict, Set from litellm.caching import DualCache import asyncio, time import aiohttp from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler import datetime from pydantic import BaseModel, Field from enum import Enum from datetime import datetime as dt, timedelta, timezone from litellm.integrations.custom_logger import CustomLogger from litellm.proxy._types import WebhookEvent import random from typing import TypedDict from openai import APIError import litellm.types from litellm.types.router import LiteLLM_Params class BaseOutageModel(TypedDict): alerts: List[int] minor_alert_sent: bool major_alert_sent: bool last_updated_at: float class OutageModel(BaseOutageModel): model_id: str class ProviderRegionOutageModel(BaseOutageModel): provider_region_id: str deployment_ids: Set[str] # we use this for the email header, please send a test email if you change this. verify it looks good on email LITELLM_LOGO_URL = "https://litellm-listing.s3.amazonaws.com/litellm_logo.png" class LiteLLMBase(BaseModel): """ Implements default functions, all pydantic objects should have. """ def json(self, **kwargs): try: return self.model_dump() # noqa except: # if using pydantic v1 return self.dict() class SlackAlertingArgsEnum(Enum): daily_report_frequency: int = 12 * 60 * 60 report_check_interval: int = 5 * 60 budget_alert_ttl: int = 24 * 60 * 60 outage_alert_ttl: int = 1 * 60 region_outage_alert_ttl: int = 1 * 60 minor_outage_alert_threshold: int = 1 * 5 major_outage_alert_threshold: int = 1 * 10 max_outage_alert_list_size: int = 1 * 10 class SlackAlertingArgs(LiteLLMBase): daily_report_frequency: int = Field( default=int( os.getenv( "SLACK_DAILY_REPORT_FREQUENCY", SlackAlertingArgsEnum.daily_report_frequency.value, ) ), description="Frequency of receiving deployment latency/failure reports. Default is 12hours. Value is in seconds.", ) report_check_interval: int = Field( default=SlackAlertingArgsEnum.report_check_interval.value, description="Frequency of checking cache if report should be sent. Background process. Default is once per hour. Value is in seconds.", ) # 5 minutes budget_alert_ttl: int = Field( default=SlackAlertingArgsEnum.budget_alert_ttl.value, description="Cache ttl for budgets alerts. Prevents spamming same alert, each time budget is crossed. Value is in seconds.", ) # 24 hours outage_alert_ttl: int = Field( default=SlackAlertingArgsEnum.outage_alert_ttl.value, description="Cache ttl for model outage alerts. Sets time-window for errors. Default is 1 minute. Value is in seconds.", ) # 1 minute ttl region_outage_alert_ttl: int = Field( default=SlackAlertingArgsEnum.region_outage_alert_ttl.value, description="Cache ttl for provider-region based outage alerts. Alert sent if 2+ models in same region report errors. Sets time-window for errors. Default is 1 minute. Value is in seconds.", ) # 1 minute ttl minor_outage_alert_threshold: int = Field( default=SlackAlertingArgsEnum.minor_outage_alert_threshold.value, description="The number of errors that count as a model/region minor outage. ('400' error code is not counted).", ) major_outage_alert_threshold: int = Field( default=SlackAlertingArgsEnum.major_outage_alert_threshold.value, description="The number of errors that countas a model/region major outage. ('400' error code is not counted).", ) max_outage_alert_list_size: int = Field( default=SlackAlertingArgsEnum.max_outage_alert_list_size.value, description="Maximum number of errors to store in cache. For a given model/region. Prevents memory leaks.", ) # prevent memory leak class DeploymentMetrics(LiteLLMBase): """ Metrics per deployment, stored in cache Used for daily reporting """ id: str """id of deployment in router model list""" failed_request: bool """did it fail the request?""" latency_per_output_token: Optional[float] """latency/output token of deployment""" updated_at: dt """Current time of deployment being updated""" class SlackAlertingCacheKeys(Enum): """ Enum for deployment daily metrics keys - {deployment_id}:{enum} """ failed_requests_key = "failed_requests_daily_metrics" latency_key = "latency_daily_metrics" report_sent_key = "daily_metrics_report_sent" class SlackAlerting(CustomLogger): """ Class for sending Slack Alerts """ # Class variables or attributes def __init__( self, internal_usage_cache: Optional[DualCache] = None, alerting_threshold: float = 300, # threshold for slow / hanging llm responses (in seconds) alerting: Optional[List] = [], alert_types: List[AlertType] = [ "llm_exceptions", "llm_too_slow", "llm_requests_hanging", "budget_alerts", "db_exceptions", "daily_reports", "spend_reports", "cooldown_deployment", "new_model_added", "outage_alerts", ], alert_to_webhook_url: Optional[ Dict ] = None, # if user wants to separate alerts to diff channels alerting_args={}, default_webhook_url: Optional[str] = None, ): self.alerting_threshold = alerting_threshold self.alerting = alerting self.alert_types = alert_types self.internal_usage_cache = internal_usage_cache or DualCache() self.async_http_handler = AsyncHTTPHandler() self.alert_to_webhook_url = alert_to_webhook_url self.is_running = False self.alerting_args = SlackAlertingArgs(**alerting_args) self.default_webhook_url = default_webhook_url self.llm_router: Optional[litellm.Router] = None def update_values( self, alerting: Optional[List] = None, alerting_threshold: Optional[float] = None, alert_types: Optional[List] = None, alert_to_webhook_url: Optional[Dict] = None, alerting_args: Optional[Dict] = None, llm_router: Optional[litellm.Router] = None, ): if alerting is not None: self.alerting = alerting if alerting_threshold is not None: self.alerting_threshold = alerting_threshold if alert_types is not None: self.alert_types = alert_types if alerting_args is not None: self.alerting_args = SlackAlertingArgs(**alerting_args) if alert_to_webhook_url is not None: # update the dict if self.alert_to_webhook_url is None: self.alert_to_webhook_url = alert_to_webhook_url else: self.alert_to_webhook_url.update(alert_to_webhook_url) if llm_router is not None: self.llm_router = llm_router async def deployment_in_cooldown(self): pass async def deployment_removed_from_cooldown(self): pass def _all_possible_alert_types(self): # used by the UI to show all supported alert types # Note: This is not the alerts the user has configured, instead it's all possible alert types a user can select return [ "llm_exceptions", "llm_too_slow", "llm_requests_hanging", "budget_alerts", "db_exceptions", ] def _add_langfuse_trace_id_to_alert( self, request_data: Optional[dict] = None, ) -> Optional[str]: """ Returns langfuse trace url - check: -> existing_trace_id -> trace_id -> litellm_call_id """ # do nothing for now if request_data is not None: trace_id = None if ( request_data.get("metadata", {}).get("existing_trace_id", None) is not None ): trace_id = request_data["metadata"]["existing_trace_id"] elif request_data.get("metadata", {}).get("trace_id", None) is not None: trace_id = request_data["metadata"]["trace_id"] elif request_data.get("litellm_logging_obj", None) is not None and hasattr( request_data["litellm_logging_obj"], "model_call_details" ): trace_id = request_data["litellm_logging_obj"].model_call_details[ "litellm_call_id" ] if litellm.utils.langFuseLogger is not None: base_url = litellm.utils.langFuseLogger.Langfuse.base_url return f"{base_url}/trace/{trace_id}" return None def _response_taking_too_long_callback_helper( self, kwargs, # kwargs to completion start_time, end_time, # start/end time ): try: time_difference = end_time - start_time # Convert the timedelta to float (in seconds) time_difference_float = time_difference.total_seconds() litellm_params = kwargs.get("litellm_params", {}) model = kwargs.get("model", "") api_base = litellm.get_api_base(model=model, optional_params=litellm_params) messages = kwargs.get("messages", None) # if messages does not exist fallback to "input" if messages is None: messages = kwargs.get("input", None) # only use first 100 chars for alerting _messages = str(messages)[:100] return time_difference_float, model, api_base, _messages except Exception as e: raise e def _get_deployment_latencies_to_alert(self, metadata=None): if metadata is None: return None if "_latency_per_deployment" in metadata: # Translate model_id to -> api_base # _latency_per_deployment is a dictionary that looks like this: """ _latency_per_deployment: { api_base: 0.01336697916666667 } """ _message_to_send = "" _deployment_latencies = metadata["_latency_per_deployment"] if len(_deployment_latencies) == 0: return None try: # try sorting deployments by latency _deployment_latencies = sorted( _deployment_latencies.items(), key=lambda x: x[1] ) _deployment_latencies = dict(_deployment_latencies) except: pass for api_base, latency in _deployment_latencies.items(): _message_to_send += f"\n{api_base}: {round(latency,2)}s" _message_to_send = "```" + _message_to_send + "```" return _message_to_send async def response_taking_too_long_callback( self, kwargs, # kwargs to completion completion_response, # response from completion start_time, end_time, # start/end time ): if self.alerting is None or self.alert_types is None: return time_difference_float, model, api_base, messages = ( self._response_taking_too_long_callback_helper( kwargs=kwargs, start_time=start_time, end_time=end_time, ) ) if litellm.turn_off_message_logging: messages = "Message not logged. `litellm.turn_off_message_logging=True`." request_info = f"\nRequest Model: `{model}`\nAPI Base: `{api_base}`\nMessages: `{messages}`" slow_message = f"`Responses are slow - {round(time_difference_float,2)}s response time > Alerting threshold: {self.alerting_threshold}s`" if time_difference_float > self.alerting_threshold: # add deployment latencies to alert if ( kwargs is not None and "litellm_params" in kwargs and "metadata" in kwargs["litellm_params"] ): _metadata = kwargs["litellm_params"]["metadata"] request_info = litellm.utils._add_key_name_and_team_to_alert( request_info=request_info, metadata=_metadata ) _deployment_latency_map = self._get_deployment_latencies_to_alert( metadata=_metadata ) if _deployment_latency_map is not None: request_info += ( f"\nAvailable Deployment Latencies\n{_deployment_latency_map}" ) await self.send_alert( message=slow_message + request_info, level="Low", alert_type="llm_too_slow", ) async def async_update_daily_reports( self, deployment_metrics: DeploymentMetrics ) -> int: """ Store the perf by deployment in cache - Number of failed requests per deployment - Latency / output tokens per deployment 'deployment_id:daily_metrics:failed_requests' 'deployment_id:daily_metrics:latency_per_output_token' Returns int - count of metrics set (1 - if just latency, 2 - if failed + latency) """ return_val = 0 try: ## FAILED REQUESTS ## if deployment_metrics.failed_request: await self.internal_usage_cache.async_increment_cache( key="{}:{}".format( deployment_metrics.id, SlackAlertingCacheKeys.failed_requests_key.value, ), value=1, ) return_val += 1 ## LATENCY ## if deployment_metrics.latency_per_output_token is not None: await self.internal_usage_cache.async_increment_cache( key="{}:{}".format( deployment_metrics.id, SlackAlertingCacheKeys.latency_key.value ), value=deployment_metrics.latency_per_output_token, ) return_val += 1 return return_val except Exception as e: return 0 async def send_daily_reports(self, router) -> bool: """ Send a daily report on: - Top 5 deployments with most failed requests - Top 5 slowest deployments (normalized by latency/output tokens) Get the value from redis cache (if available) or in-memory and send it Cleanup: - reset values in cache -> prevent memory leak Returns: True -> if successfuly sent False -> if not sent """ ids = router.get_model_ids() # get keys failed_request_keys = [ "{}:{}".format(id, SlackAlertingCacheKeys.failed_requests_key.value) for id in ids ] latency_keys = [ "{}:{}".format(id, SlackAlertingCacheKeys.latency_key.value) for id in ids ] combined_metrics_keys = failed_request_keys + latency_keys # reduce cache calls combined_metrics_values = await self.internal_usage_cache.async_batch_get_cache( keys=combined_metrics_keys ) # [1, 2, None, ..] if combined_metrics_values is None: return False all_none = True for val in combined_metrics_values: if val is not None and val > 0: all_none = False break if all_none: return False failed_request_values = combined_metrics_values[ : len(failed_request_keys) ] # # [1, 2, None, ..] latency_values = combined_metrics_values[len(failed_request_keys) :] # find top 5 failed ## Replace None values with a placeholder value (-1 in this case) placeholder_value = 0 replaced_failed_values = [ value if value is not None else placeholder_value for value in failed_request_values ] ## Get the indices of top 5 keys with the highest numerical values (ignoring None and 0 values) top_5_failed = sorted( range(len(replaced_failed_values)), key=lambda i: replaced_failed_values[i], reverse=True, )[:5] top_5_failed = [ index for index in top_5_failed if replaced_failed_values[index] > 0 ] # find top 5 slowest # Replace None values with a placeholder value (-1 in this case) placeholder_value = 0 replaced_slowest_values = [ value if value is not None else placeholder_value for value in latency_values ] # Get the indices of top 5 values with the highest numerical values (ignoring None and 0 values) top_5_slowest = sorted( range(len(replaced_slowest_values)), key=lambda i: replaced_slowest_values[i], reverse=True, )[:5] top_5_slowest = [ index for index in top_5_slowest if replaced_slowest_values[index] > 0 ] # format alert -> return the litellm model name + api base message = f"\n\nTime: `{time.time()}`s\nHere are today's key metrics šŸ“ˆ: \n\n" message += "\n\n*ā—ļø Top Deployments with Most Failed Requests:*\n\n" if not top_5_failed: message += "\tNone\n" for i in range(len(top_5_failed)): key = failed_request_keys[top_5_failed[i]].split(":")[0] _deployment = router.get_model_info(key) if isinstance(_deployment, dict): deployment_name = _deployment["litellm_params"].get("model", "") else: return False api_base = litellm.get_api_base( model=deployment_name, optional_params=( _deployment["litellm_params"] if _deployment is not None else {} ), ) if api_base is None: api_base = "" value = replaced_failed_values[top_5_failed[i]] message += f"\t{i+1}. Deployment: `{deployment_name}`, Failed Requests: `{value}`, API Base: `{api_base}`\n" message += "\n\n*šŸ˜… Top Slowest Deployments:*\n\n" if not top_5_slowest: message += "\tNone\n" for i in range(len(top_5_slowest)): key = latency_keys[top_5_slowest[i]].split(":")[0] _deployment = router.get_model_info(key) if _deployment is not None: deployment_name = _deployment["litellm_params"].get("model", "") else: deployment_name = "" api_base = litellm.get_api_base( model=deployment_name, optional_params=( _deployment["litellm_params"] if _deployment is not None else {} ), ) value = round(replaced_slowest_values[top_5_slowest[i]], 3) message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency per output token: `{value}s/token`, API Base: `{api_base}`\n\n" # cache cleanup -> reset values to 0 latency_cache_keys = [(key, 0) for key in latency_keys] failed_request_cache_keys = [(key, 0) for key in failed_request_keys] combined_metrics_cache_keys = latency_cache_keys + failed_request_cache_keys await self.internal_usage_cache.async_batch_set_cache( cache_list=combined_metrics_cache_keys ) message += f"\n\nNext Run is in: `{time.time() + self.alerting_args.daily_report_frequency}`s" # send alert await self.send_alert(message=message, level="Low", alert_type="daily_reports") return True async def response_taking_too_long( self, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, type: Literal["hanging_request", "slow_response"] = "hanging_request", request_data: Optional[dict] = None, ): if self.alerting is None or self.alert_types is None: return if request_data is not None: model = request_data.get("model", "") messages = request_data.get("messages", None) if messages is None: # if messages does not exist fallback to "input" messages = request_data.get("input", None) # try casting messages to str and get the first 100 characters, else mark as None try: messages = str(messages) messages = messages[:100] except: messages = "" if litellm.turn_off_message_logging: messages = ( "Message not logged. `litellm.turn_off_message_logging=True`." ) request_info = f"\nRequest Model: `{model}`\nMessages: `{messages}`" else: request_info = "" if type == "hanging_request": await asyncio.sleep( self.alerting_threshold ) # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests if ( request_data is not None and request_data.get("litellm_status", "") != "success" and request_data.get("litellm_status", "") != "fail" ): if request_data.get("deployment", None) is not None and isinstance( request_data["deployment"], dict ): _api_base = litellm.get_api_base( model=model, optional_params=request_data["deployment"].get( "litellm_params", {} ), ) if _api_base is None: _api_base = "" request_info += f"\nAPI Base: {_api_base}" elif request_data.get("metadata", None) is not None and isinstance( request_data["metadata"], dict ): # In hanging requests sometime it has not made it to the point where the deployment is passed to the `request_data`` # in that case we fallback to the api base set in the request metadata _metadata = request_data["metadata"] _api_base = _metadata.get("api_base", "") request_info = litellm.utils._add_key_name_and_team_to_alert( request_info=request_info, metadata=_metadata ) if _api_base is None: _api_base = "" request_info += f"\nAPI Base: `{_api_base}`" # only alert hanging responses if they have not been marked as success alerting_message = ( f"`Requests are hanging - {self.alerting_threshold}s+ request time`" ) if "langfuse" in litellm.success_callback: langfuse_url = self._add_langfuse_trace_id_to_alert( request_data=request_data, ) if langfuse_url is not None: request_info += "\n🪢 Langfuse Trace: {}".format(langfuse_url) # add deployment latencies to alert _deployment_latency_map = self._get_deployment_latencies_to_alert( metadata=request_data.get("metadata", {}) ) if _deployment_latency_map is not None: request_info += f"\nDeployment Latencies\n{_deployment_latency_map}" await self.send_alert( message=alerting_message + request_info, level="Medium", alert_type="llm_requests_hanging", ) async def failed_tracking_alert(self, error_message: str): """Raise alert when tracking failed for specific model""" _cache: DualCache = self.internal_usage_cache message = "Failed Tracking Cost for" + error_message _cache_key = "budget_alerts:failed_tracking:{}".format(message) result = await _cache.async_get_cache(key=_cache_key) if result is None: await self.send_alert( message=message, level="High", alert_type="budget_alerts" ) await _cache.async_set_cache( key=_cache_key, value="SENT", ttl=self.alerting_args.budget_alert_ttl, ) async def budget_alerts( self, type: Literal[ "token_budget", "user_budget", "team_budget", "proxy_budget", "projected_limit_exceeded", ], user_info: CallInfo, ): ## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727 # - Alert once within 24hr period # - Cache this information # - Don't re-alert, if alert already sent _cache: DualCache = self.internal_usage_cache if self.alerting is None or self.alert_types is None: # do nothing if alerting is not switched on return if "budget_alerts" not in self.alert_types: return _id: str = "default_id" # used for caching user_info_json = user_info.model_dump(exclude_none=True) for k, v in user_info_json.items(): user_info_str = "\n{}: {}\n".format(k, v) event: Optional[ Literal["budget_crossed", "threshold_crossed", "projected_limit_exceeded"] ] = None event_group: Optional[Literal["user", "team", "key", "proxy"]] = None event_message: str = "" webhook_event: Optional[WebhookEvent] = None if type == "proxy_budget": event_group = "proxy" event_message += "Proxy Budget: " elif type == "user_budget": event_group = "user" event_message += "User Budget: " _id = user_info.user_id or _id elif type == "team_budget": event_group = "team" event_message += "Team Budget: " _id = user_info.team_id or _id elif type == "token_budget": event_group = "key" event_message += "Key Budget: " _id = user_info.token elif type == "projected_limit_exceeded": event_group = "key" event_message += "Key Budget: Projected Limit Exceeded" event = "projected_limit_exceeded" _id = user_info.token # percent of max_budget left to spend if user_info.max_budget is None: return if user_info.max_budget > 0: percent_left = ( user_info.max_budget - user_info.spend ) / user_info.max_budget else: percent_left = 0 # check if crossed budget if user_info.spend >= user_info.max_budget: event = "budget_crossed" event_message += f"Budget Crossed\n Total Budget:`{user_info.max_budget}`" elif percent_left <= 0.05: event = "threshold_crossed" event_message += "5% Threshold Crossed " elif percent_left <= 0.15: event = "threshold_crossed" event_message += "15% Threshold Crossed" if event is not None and event_group is not None: _cache_key = "budget_alerts:{}:{}".format(event, _id) result = await _cache.async_get_cache(key=_cache_key) if result is None: webhook_event = WebhookEvent( event=event, event_group=event_group, event_message=event_message, **user_info_json, ) await self.send_alert( message=event_message + "\n\n" + user_info_str, level="High", alert_type="budget_alerts", user_info=webhook_event, ) await _cache.async_set_cache( key=_cache_key, value="SENT", ttl=self.alerting_args.budget_alert_ttl, ) return return def _count_outage_alerts(self, alerts: List[int]) -> str: """ Parameters: - alerts: List[int] -> list of error codes (either 408 or 500+) Returns: - str -> formatted string. This is an alert message, giving a human-friendly description of the errors. """ error_breakdown = {"Timeout Errors": 0, "API Errors": 0, "Unknown Errors": 0} for alert in alerts: if alert == 408: error_breakdown["Timeout Errors"] += 1 elif alert >= 500: error_breakdown["API Errors"] += 1 else: error_breakdown["Unknown Errors"] += 1 error_msg = "" for key, value in error_breakdown.items(): if value > 0: error_msg += "\n{}: {}\n".format(key, value) return error_msg def _outage_alert_msg_factory( self, alert_type: Literal["Major", "Minor"], key: Literal["Model", "Region"], key_val: str, provider: str, api_base: Optional[str], outage_value: BaseOutageModel, ) -> str: """Format an alert message for slack""" headers = {f"{key} Name": key_val, "Provider": provider} if api_base is not None: headers["API Base"] = api_base # type: ignore headers_str = "\n" for k, v in headers.items(): headers_str += f"*{k}:* `{v}`\n" return f"""\n\n *āš ļø {alert_type} Service Outage* {headers_str} *Errors:* {self._count_outage_alerts(alerts=outage_value["alerts"])} *Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n """ async def region_outage_alerts( self, exception: APIError, deployment_id: str, ) -> None: """ Send slack alert if specific provider region is having an outage. Track for 408 (Timeout) and >=500 Error codes """ ## CREATE (PROVIDER+REGION) ID ## if self.llm_router is None: return deployment = self.llm_router.get_deployment(model_id=deployment_id) if deployment is None: return model = deployment.litellm_params.model ### GET PROVIDER ### provider = deployment.litellm_params.custom_llm_provider if provider is None: model, provider, _, _ = litellm.get_llm_provider(model=model) ### GET REGION ### region_name = deployment.litellm_params.region_name if region_name is None: region_name = litellm.utils._get_model_region( custom_llm_provider=provider, litellm_params=deployment.litellm_params ) if region_name is None: return ### UNIQUE CACHE KEY ### cache_key = provider + region_name outage_value: Optional[ProviderRegionOutageModel] = ( await self.internal_usage_cache.async_get_cache(key=cache_key) ) if ( getattr(exception, "status_code", None) is None or ( exception.status_code != 408 # type: ignore and exception.status_code < 500 # type: ignore ) or self.llm_router is None ): return if outage_value is None: _deployment_set = set() _deployment_set.add(deployment_id) outage_value = ProviderRegionOutageModel( provider_region_id=cache_key, alerts=[exception.status_code], # type: ignore minor_alert_sent=False, major_alert_sent=False, last_updated_at=time.time(), deployment_ids=_deployment_set, ) ## add to cache ## await self.internal_usage_cache.async_set_cache( key=cache_key, value=outage_value, ttl=self.alerting_args.region_outage_alert_ttl, ) return if len(outage_value["alerts"]) < self.alerting_args.max_outage_alert_list_size: outage_value["alerts"].append(exception.status_code) # type: ignore else: # prevent memory leaks pass _deployment_set = outage_value["deployment_ids"] _deployment_set.add(deployment_id) outage_value["deployment_ids"] = _deployment_set outage_value["last_updated_at"] = time.time() ## MINOR OUTAGE ALERT SENT ## if ( outage_value["minor_alert_sent"] == False and len(outage_value["alerts"]) >= self.alerting_args.minor_outage_alert_threshold and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment ): msg = self._outage_alert_msg_factory( alert_type="Minor", key="Region", key_val=region_name, api_base=None, outage_value=outage_value, provider=provider, ) # send minor alert await self.send_alert( message=msg, level="Medium", alert_type="outage_alerts" ) # set to true outage_value["minor_alert_sent"] = True ## MAJOR OUTAGE ALERT SENT ## elif ( outage_value["major_alert_sent"] == False and len(outage_value["alerts"]) >= self.alerting_args.major_outage_alert_threshold and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment ): msg = self._outage_alert_msg_factory( alert_type="Major", key="Region", key_val=region_name, api_base=None, outage_value=outage_value, provider=provider, ) # send minor alert await self.send_alert(message=msg, level="High", alert_type="outage_alerts") # set to true outage_value["major_alert_sent"] = True ## update cache ## await self.internal_usage_cache.async_set_cache( key=cache_key, value=outage_value ) async def outage_alerts( self, exception: APIError, deployment_id: str, ) -> None: """ Send slack alert if model is badly configured / having an outage (408, 401, 429, >=500). key = model_id value = { - model_id - threshold - alerts [] } ttl = 1hr max_alerts_size = 10 """ try: outage_value: Optional[OutageModel] = await self.internal_usage_cache.async_get_cache(key=deployment_id) # type: ignore if ( getattr(exception, "status_code", None) is None or ( exception.status_code != 408 # type: ignore and exception.status_code < 500 # type: ignore ) or self.llm_router is None ): return ### EXTRACT MODEL DETAILS ### deployment = self.llm_router.get_deployment(model_id=deployment_id) if deployment is None: return model = deployment.litellm_params.model provider = deployment.litellm_params.custom_llm_provider if provider is None: try: model, provider, _, _ = litellm.get_llm_provider(model=model) except Exception as e: provider = "" api_base = litellm.get_api_base( model=model, optional_params=deployment.litellm_params ) if outage_value is None: outage_value = OutageModel( model_id=deployment_id, alerts=[exception.status_code], # type: ignore minor_alert_sent=False, major_alert_sent=False, last_updated_at=time.time(), ) ## add to cache ## await self.internal_usage_cache.async_set_cache( key=deployment_id, value=outage_value, ttl=self.alerting_args.outage_alert_ttl, ) return if ( len(outage_value["alerts"]) < self.alerting_args.max_outage_alert_list_size ): outage_value["alerts"].append(exception.status_code) # type: ignore else: # prevent memory leaks pass outage_value["last_updated_at"] = time.time() ## MINOR OUTAGE ALERT SENT ## if ( outage_value["minor_alert_sent"] == False and len(outage_value["alerts"]) >= self.alerting_args.minor_outage_alert_threshold ): msg = self._outage_alert_msg_factory( alert_type="Minor", key="Model", key_val=model, api_base=api_base, outage_value=outage_value, provider=provider, ) # send minor alert await self.send_alert( message=msg, level="Medium", alert_type="outage_alerts" ) # set to true outage_value["minor_alert_sent"] = True elif ( outage_value["major_alert_sent"] == False and len(outage_value["alerts"]) >= self.alerting_args.major_outage_alert_threshold ): msg = self._outage_alert_msg_factory( alert_type="Major", key="Model", key_val=model, api_base=api_base, outage_value=outage_value, provider=provider, ) # send minor alert await self.send_alert( message=msg, level="High", alert_type="outage_alerts" ) # set to true outage_value["major_alert_sent"] = True ## update cache ## await self.internal_usage_cache.async_set_cache( key=deployment_id, value=outage_value ) except Exception as e: pass async def model_added_alert( self, model_name: str, litellm_model_name: str, passed_model_info: Any ): base_model_from_user = getattr(passed_model_info, "base_model", None) model_info = {} base_model = "" if base_model_from_user is not None: model_info = litellm.model_cost.get(base_model_from_user, {}) base_model = f"Base Model: `{base_model_from_user}`\n" else: model_info = litellm.model_cost.get(litellm_model_name, {}) model_info_str = "" for k, v in model_info.items(): if k == "input_cost_per_token" or k == "output_cost_per_token": # when converting to string it should not be 1.63e-06 v = "{:.8f}".format(v) model_info_str += f"{k}: {v}\n" message = f""" *šŸš… New Model Added* Model Name: `{model_name}` {base_model} Usage OpenAI Python SDK: ``` import openai client = openai.OpenAI( api_key="your_api_key", base_url={os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")} ) response = client.chat.completions.create( model="{model_name}", # model to send to the proxy messages = [ {{ "role": "user", "content": "this is a test request, write a short poem" }} ] ) ``` Model Info: ``` {model_info_str} ``` """ alert_val = self.send_alert( message=message, level="Low", alert_type="new_model_added" ) if alert_val is not None and asyncio.iscoroutine(alert_val): await alert_val async def model_removed_alert(self, model_name: str): pass async def send_webhook_alert(self, webhook_event: WebhookEvent) -> bool: """ Sends structured alert to webhook, if set. Currently only implemented for budget alerts Returns -> True if sent, False if not. """ webhook_url = os.getenv("WEBHOOK_URL", None) if webhook_url is None: raise Exception("Missing webhook_url from environment") payload = webhook_event.model_dump_json() headers = {"Content-type": "application/json"} response = await self.async_http_handler.post( url=webhook_url, headers=headers, data=payload, ) if response.status_code == 200: return True else: print("Error sending webhook alert. Error=", response.text) # noqa return False async def _check_if_using_premium_email_feature( self, premium_user: bool, email_logo_url: Optional[str] = None, email_support_contact: Optional[str] = None, ): from litellm.proxy.proxy_server import premium_user from litellm.proxy.proxy_server import CommonProxyErrors if premium_user is not True: if email_logo_url is not None or email_support_contact is not None: raise ValueError( f"Trying to Customize Email Alerting\n {CommonProxyErrors.not_premium_user.value}" ) async def send_key_created_email(self, webhook_event: WebhookEvent) -> bool: from litellm.proxy.utils import send_email if self.alerting is None or "email" not in self.alerting: # do nothing if user does not want email alerts return False from litellm.proxy.proxy_server import premium_user, prisma_client email_logo_url = os.getenv("SMTP_SENDER_LOGO", None) email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None) await self._check_if_using_premium_email_feature( premium_user, email_logo_url, email_support_contact ) event_name = webhook_event.event_message recipient_email = webhook_event.user_email recipient_user_id = webhook_event.user_id if ( recipient_email is None and recipient_user_id is not None and prisma_client is not None ): user_row = await prisma_client.db.litellm_usertable.find_unique( where={"user_id": recipient_user_id} ) if user_row is not None: recipient_email = user_row.user_email key_name = webhook_event.key_alias key_token = webhook_event.token key_budget = webhook_event.max_budget email_html_content = "Alert from LiteLLM Server" if recipient_email is None: verbose_proxy_logger.error( "Trying to send email alert to no recipient", extra=webhook_event.dict() ) email_html_content = f""" LiteLLM Logo

Hi {recipient_email},
I'm happy to provide you with an OpenAI Proxy API Key, loaded with ${key_budget} per month.

Key:

{key_token}

Usage Example

Detailed Documentation on Usage with OpenAI Python SDK, Langchain, LlamaIndex, Curl

            import openai
            client = openai.OpenAI(
                api_key="{key_token}",
                base_url={os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")}
            )

            response = client.chat.completions.create(
                model="gpt-3.5-turbo", # model to send to the proxy
                messages = [
                    {{
                        "role": "user",
                        "content": "this is a test request, write a short poem"
                    }}
                ]
            )

            
If you have any questions, please send an email to {email_support_contact}

Best,
The LiteLLM team
""" payload = webhook_event.model_dump_json() email_event = { "to": recipient_email, "subject": f"LiteLLM: {event_name}", "html": email_html_content, } response = await send_email( receiver_email=email_event["to"], subject=email_event["subject"], html=email_event["html"], ) return False async def send_email_alert_using_smtp(self, webhook_event: WebhookEvent) -> bool: """ Sends structured Email alert to an SMTP server Currently only implemented for budget alerts Returns -> True if sent, False if not. """ from litellm.proxy.utils import send_email from litellm.proxy.proxy_server import premium_user, prisma_client email_logo_url = os.getenv("SMTP_SENDER_LOGO", None) email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None) await self._check_if_using_premium_email_feature( premium_user, email_logo_url, email_support_contact ) event_name = webhook_event.event_message recipient_email = webhook_event.user_email user_name = webhook_event.user_id max_budget = webhook_event.max_budget email_html_content = "Alert from LiteLLM Server" if recipient_email is None: verbose_proxy_logger.error( "Trying to send email alert to no recipient", extra=webhook_event.dict() ) if webhook_event.event == "budget_crossed": email_html_content = f""" LiteLLM Logo

Hi {user_name},
Your LLM API usage this month has reached your account's monthly budget of ${max_budget}

API requests will be rejected until either (a) you increase your monthly budget or (b) your monthly usage resets at the beginning of the next calendar month.

If you have any questions, please send an email to {email_support_contact}

Best,
The LiteLLM team
""" payload = webhook_event.model_dump_json() email_event = { "to": recipient_email, "subject": f"LiteLLM: {event_name}", "html": email_html_content, } response = await send_email( receiver_email=email_event["to"], subject=email_event["subject"], html=email_event["html"], ) return False async def send_alert( self, message: str, level: Literal["Low", "Medium", "High"], alert_type: Literal[AlertType], user_info: Optional[WebhookEvent] = None, **kwargs, ): """ Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298 - Responses taking too long - Requests are hanging - Calls are failing - DB Read/Writes are failing - Proxy Close to max budget - Key Close to max budget Parameters: level: str - Low|Medium|High - if calls might fail (Medium) or are failing (High); Currently, no alerts would be 'Low'. message: str - what is the alert about """ if self.alerting is None: return if ( "webhook" in self.alerting and alert_type == "budget_alerts" and user_info is not None ): await self.send_webhook_alert(webhook_event=user_info) if ( "email" in self.alerting and alert_type == "budget_alerts" and user_info is not None ): # only send budget alerts over Email await self.send_email_alert_using_smtp(webhook_event=user_info) if "slack" not in self.alerting: return if alert_type not in self.alert_types: return from datetime import datetime import json # Get the current timestamp current_time = datetime.now().strftime("%H:%M:%S") _proxy_base_url = os.getenv("PROXY_BASE_URL", None) if alert_type == "daily_reports" or alert_type == "new_model_added": formatted_message = message else: formatted_message = ( f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}" ) if kwargs: for key, value in kwargs.items(): formatted_message += f"\n\n{key}: `{value}`\n\n" if _proxy_base_url is not None: formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`" # check if we find the slack webhook url in self.alert_to_webhook_url if ( self.alert_to_webhook_url is not None and alert_type in self.alert_to_webhook_url ): slack_webhook_url = self.alert_to_webhook_url[alert_type] elif self.default_webhook_url is not None: slack_webhook_url = self.default_webhook_url else: slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL", None) if slack_webhook_url is None: raise Exception("Missing SLACK_WEBHOOK_URL from environment") payload = {"text": formatted_message} headers = {"Content-type": "application/json"} response = await self.async_http_handler.post( url=slack_webhook_url, headers=headers, data=json.dumps(payload), ) if response.status_code == 200: pass else: print("Error sending slack alert. Error=", response.text) # noqa async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): """Log deployment latency""" try: if "daily_reports" in self.alert_types: model_id = ( kwargs.get("litellm_params", {}).get("model_info", {}).get("id", "") ) response_s: timedelta = end_time - start_time final_value = response_s total_tokens = 0 if isinstance(response_obj, litellm.ModelResponse): completion_tokens = response_obj.usage.completion_tokens if completion_tokens is not None and completion_tokens > 0: final_value = float( response_s.total_seconds() / completion_tokens ) await self.async_update_daily_reports( DeploymentMetrics( id=model_id, failed_request=False, latency_per_output_token=final_value, updated_at=litellm.utils.get_utc_datetime(), ) ) except Exception as e: verbose_proxy_logger.error( "[Non-Blocking Error] Slack Alerting: Got error in logging LLM deployment latency: ", e, ) pass async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): """Log failure + deployment latency""" _litellm_params = kwargs.get("litellm_params", {}) _model_info = _litellm_params.get("model_info", {}) or {} model_id = _model_info.get("id", "") try: if "daily_reports" in self.alert_types: try: await self.async_update_daily_reports( DeploymentMetrics( id=model_id, failed_request=True, latency_per_output_token=None, updated_at=litellm.utils.get_utc_datetime(), ) ) except Exception as e: verbose_logger.debug(f"Exception raises -{str(e)}") if isinstance(kwargs.get("exception", ""), APIError): if "outage_alerts" in self.alert_types: await self.outage_alerts( exception=kwargs["exception"], deployment_id=model_id, ) if "region_outage_alerts" in self.alert_types: await self.region_outage_alerts( exception=kwargs["exception"], deployment_id=model_id ) except Exception as e: pass async def _run_scheduler_helper(self, llm_router) -> bool: """ Returns: - True -> report sent - False -> report not sent """ report_sent_bool = False report_sent = await self.internal_usage_cache.async_get_cache( key=SlackAlertingCacheKeys.report_sent_key.value ) # None | float current_time = time.time() if report_sent is None: await self.internal_usage_cache.async_set_cache( key=SlackAlertingCacheKeys.report_sent_key.value, value=current_time, ) elif isinstance(report_sent, float): # Check if current time - interval >= time last sent interval_seconds = self.alerting_args.daily_report_frequency if current_time - report_sent >= interval_seconds: # Sneak in the reporting logic here await self.send_daily_reports(router=llm_router) # Also, don't forget to update the report_sent time after sending the report! await self.internal_usage_cache.async_set_cache( key=SlackAlertingCacheKeys.report_sent_key.value, value=current_time, ) report_sent_bool = True return report_sent_bool async def _run_scheduled_daily_report(self, llm_router: Optional[Any] = None): """ If 'daily_reports' enabled Ping redis cache every 5 minutes to check if we should send the report If yes -> call send_daily_report() """ if llm_router is None or self.alert_types is None: return if "daily_reports" in self.alert_types: while True: await self._run_scheduler_helper(llm_router=llm_router) interval = random.randint( self.alerting_args.report_check_interval - 3, self.alerting_args.report_check_interval + 3, ) # shuffle to prevent collisions await asyncio.sleep(interval) return async def send_weekly_spend_report(self): """ """ try: from litellm.proxy.proxy_server import _get_spend_report_for_time_range todays_date = datetime.datetime.now().date() week_before = todays_date - datetime.timedelta(days=7) weekly_spend_per_team, weekly_spend_per_tag = ( await _get_spend_report_for_time_range( start_date=week_before.strftime("%Y-%m-%d"), end_date=todays_date.strftime("%Y-%m-%d"), ) ) _weekly_spend_message = f"*šŸ’ø Weekly Spend Report for `{week_before.strftime('%m-%d-%Y')} - {todays_date.strftime('%m-%d-%Y')}` *\n" if weekly_spend_per_team is not None: _weekly_spend_message += "\n*Team Spend Report:*\n" for spend in weekly_spend_per_team: _team_spend = spend["total_spend"] _team_spend = float(_team_spend) # round to 4 decimal places _team_spend = round(_team_spend, 4) _weekly_spend_message += ( f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n" ) if weekly_spend_per_tag is not None: _weekly_spend_message += "\n*Tag Spend Report:*\n" for spend in weekly_spend_per_tag: _tag_spend = spend["total_spend"] _tag_spend = float(_tag_spend) # round to 4 decimal places _tag_spend = round(_tag_spend, 4) _weekly_spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n" await self.send_alert( message=_weekly_spend_message, level="Low", alert_type="spend_reports", ) except Exception as e: verbose_proxy_logger.error("Error sending weekly spend report", e) async def send_monthly_spend_report(self): """ """ try: from calendar import monthrange from litellm.proxy.proxy_server import _get_spend_report_for_time_range todays_date = datetime.datetime.now().date() first_day_of_month = todays_date.replace(day=1) _, last_day_of_month = monthrange(todays_date.year, todays_date.month) last_day_of_month = first_day_of_month + datetime.timedelta( days=last_day_of_month - 1 ) monthly_spend_per_team, monthly_spend_per_tag = ( await _get_spend_report_for_time_range( start_date=first_day_of_month.strftime("%Y-%m-%d"), end_date=last_day_of_month.strftime("%Y-%m-%d"), ) ) _spend_message = f"*šŸ’ø Monthly Spend Report for `{first_day_of_month.strftime('%m-%d-%Y')} - {last_day_of_month.strftime('%m-%d-%Y')}` *\n" if monthly_spend_per_team is not None: _spend_message += "\n*Team Spend Report:*\n" for spend in monthly_spend_per_team: _team_spend = spend["total_spend"] _team_spend = float(_team_spend) # round to 4 decimal places _team_spend = round(_team_spend, 4) _spend_message += ( f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n" ) if monthly_spend_per_tag is not None: _spend_message += "\n*Tag Spend Report:*\n" for spend in monthly_spend_per_tag: _tag_spend = spend["total_spend"] _tag_spend = float(_tag_spend) # round to 4 decimal places _tag_spend = round(_tag_spend, 4) _spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n" await self.send_alert( message=_spend_message, level="Low", alert_type="spend_reports", ) except Exception as e: verbose_proxy_logger.error("Error sending weekly spend report", e)