litellm-mirror/litellm/integrations/slack_alerting.py
2024-05-24 17:59:15 -07:00

1231 lines
45 KiB
Python

#### What this does ####
# Class for sending Slack Alerts #
import dotenv, os
from litellm.proxy._types import UserAPIKeyAuth, CallInfo
from litellm._logging import verbose_logger, verbose_proxy_logger
import litellm, threading
from typing import List, Literal, Any, Union, Optional, Dict
from litellm.caching import DualCache
import asyncio, time
import aiohttp
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler
import datetime
from pydantic import BaseModel
from enum import Enum
from datetime import datetime as dt, timedelta, timezone
from litellm.integrations.custom_logger import CustomLogger
from litellm.proxy._types import WebhookEvent
import random
# we use this for the email header, please send a test email if you change this. verify it looks good on email
LITELLM_LOGO_URL = "https://litellm-listing.s3.amazonaws.com/litellm_logo.png"
class LiteLLMBase(BaseModel):
"""
Implements default functions, all pydantic objects should have.
"""
def json(self, **kwargs):
try:
return self.model_dump() # noqa
except:
# if using pydantic v1
return self.dict()
class SlackAlertingArgs(LiteLLMBase):
default_daily_report_frequency: int = 12 * 60 * 60 # 12 hours
daily_report_frequency: int = int(
os.getenv("SLACK_DAILY_REPORT_FREQUENCY", default_daily_report_frequency)
)
report_check_interval: int = 5 * 60 # 5 minutes
budget_alert_ttl: int = 24 * 60 * 60 # 24 hours
class DeploymentMetrics(LiteLLMBase):
"""
Metrics per deployment, stored in cache
Used for daily reporting
"""
id: str
"""id of deployment in router model list"""
failed_request: bool
"""did it fail the request?"""
latency_per_output_token: Optional[float]
"""latency/output token of deployment"""
updated_at: dt
"""Current time of deployment being updated"""
class SlackAlertingCacheKeys(Enum):
"""
Enum for deployment daily metrics keys - {deployment_id}:{enum}
"""
failed_requests_key = "failed_requests_daily_metrics"
latency_key = "latency_daily_metrics"
report_sent_key = "daily_metrics_report_sent"
class SlackAlerting(CustomLogger):
"""
Class for sending Slack Alerts
"""
# Class variables or attributes
def __init__(
self,
internal_usage_cache: Optional[DualCache] = None,
alerting_threshold: float = 300, # threshold for slow / hanging llm responses (in seconds)
alerting: Optional[List] = [],
alert_types: List[
Literal[
"llm_exceptions",
"llm_too_slow",
"llm_requests_hanging",
"budget_alerts",
"db_exceptions",
"daily_reports",
"spend_reports",
"cooldown_deployment",
"new_model_added",
]
] = [
"llm_exceptions",
"llm_too_slow",
"llm_requests_hanging",
"budget_alerts",
"db_exceptions",
"daily_reports",
"spend_reports",
"cooldown_deployment",
"new_model_added",
],
alert_to_webhook_url: Optional[
Dict
] = None, # if user wants to separate alerts to diff channels
alerting_args={},
default_webhook_url: Optional[str] = None,
):
self.alerting_threshold = alerting_threshold
self.alerting = alerting
self.alert_types = alert_types
self.internal_usage_cache = internal_usage_cache or DualCache()
self.async_http_handler = AsyncHTTPHandler()
self.alert_to_webhook_url = alert_to_webhook_url
self.is_running = False
self.alerting_args = SlackAlertingArgs(**alerting_args)
self.default_webhook_url = default_webhook_url
def update_values(
self,
alerting: Optional[List] = None,
alerting_threshold: Optional[float] = None,
alert_types: Optional[List] = None,
alert_to_webhook_url: Optional[Dict] = None,
alerting_args: Optional[Dict] = None,
):
if alerting is not None:
self.alerting = alerting
if alerting_threshold is not None:
self.alerting_threshold = alerting_threshold
if alert_types is not None:
self.alert_types = alert_types
if alerting_args is not None:
self.alerting_args = SlackAlertingArgs(**alerting_args)
if alert_to_webhook_url is not None:
# update the dict
if self.alert_to_webhook_url is None:
self.alert_to_webhook_url = alert_to_webhook_url
else:
self.alert_to_webhook_url.update(alert_to_webhook_url)
async def deployment_in_cooldown(self):
pass
async def deployment_removed_from_cooldown(self):
pass
def _all_possible_alert_types(self):
# used by the UI to show all supported alert types
# Note: This is not the alerts the user has configured, instead it's all possible alert types a user can select
return [
"llm_exceptions",
"llm_too_slow",
"llm_requests_hanging",
"budget_alerts",
"db_exceptions",
]
def _add_langfuse_trace_id_to_alert(
self,
request_data: Optional[dict] = None,
) -> Optional[str]:
"""
Returns langfuse trace url
- check:
-> existing_trace_id
-> trace_id
-> litellm_call_id
"""
# do nothing for now
if request_data is not None:
trace_id = None
if (
request_data.get("metadata", {}).get("existing_trace_id", None)
is not None
):
trace_id = request_data["metadata"]["existing_trace_id"]
elif request_data.get("metadata", {}).get("trace_id", None) is not None:
trace_id = request_data["metadata"]["trace_id"]
elif request_data.get("litellm_logging_obj", None) is not None and hasattr(
request_data["litellm_logging_obj"], "model_call_details"
):
trace_id = request_data["litellm_logging_obj"].model_call_details[
"litellm_call_id"
]
if litellm.utils.langFuseLogger is not None:
base_url = litellm.utils.langFuseLogger.Langfuse.base_url
return f"{base_url}/trace/{trace_id}"
return None
def _response_taking_too_long_callback_helper(
self,
kwargs, # kwargs to completion
start_time,
end_time, # start/end time
):
try:
time_difference = end_time - start_time
# Convert the timedelta to float (in seconds)
time_difference_float = time_difference.total_seconds()
litellm_params = kwargs.get("litellm_params", {})
model = kwargs.get("model", "")
api_base = litellm.get_api_base(model=model, optional_params=litellm_params)
messages = kwargs.get("messages", None)
# if messages does not exist fallback to "input"
if messages is None:
messages = kwargs.get("input", None)
# only use first 100 chars for alerting
_messages = str(messages)[:100]
return time_difference_float, model, api_base, _messages
except Exception as e:
raise e
def _get_deployment_latencies_to_alert(self, metadata=None):
if metadata is None:
return None
if "_latency_per_deployment" in metadata:
# Translate model_id to -> api_base
# _latency_per_deployment is a dictionary that looks like this:
"""
_latency_per_deployment: {
api_base: 0.01336697916666667
}
"""
_message_to_send = ""
_deployment_latencies = metadata["_latency_per_deployment"]
if len(_deployment_latencies) == 0:
return None
try:
# try sorting deployments by latency
_deployment_latencies = sorted(
_deployment_latencies.items(), key=lambda x: x[1]
)
_deployment_latencies = dict(_deployment_latencies)
except:
pass
for api_base, latency in _deployment_latencies.items():
_message_to_send += f"\n{api_base}: {round(latency,2)}s"
_message_to_send = "```" + _message_to_send + "```"
return _message_to_send
async def response_taking_too_long_callback(
self,
kwargs, # kwargs to completion
completion_response, # response from completion
start_time,
end_time, # start/end time
):
if self.alerting is None or self.alert_types is None:
return
time_difference_float, model, api_base, messages = (
self._response_taking_too_long_callback_helper(
kwargs=kwargs,
start_time=start_time,
end_time=end_time,
)
)
if litellm.turn_off_message_logging:
messages = "Message not logged. `litellm.turn_off_message_logging=True`."
request_info = f"\nRequest Model: `{model}`\nAPI Base: `{api_base}`\nMessages: `{messages}`"
slow_message = f"`Responses are slow - {round(time_difference_float,2)}s response time > Alerting threshold: {self.alerting_threshold}s`"
if time_difference_float > self.alerting_threshold:
# add deployment latencies to alert
if (
kwargs is not None
and "litellm_params" in kwargs
and "metadata" in kwargs["litellm_params"]
):
_metadata = kwargs["litellm_params"]["metadata"]
request_info = litellm.utils._add_key_name_and_team_to_alert(
request_info=request_info, metadata=_metadata
)
_deployment_latency_map = self._get_deployment_latencies_to_alert(
metadata=_metadata
)
if _deployment_latency_map is not None:
request_info += (
f"\nAvailable Deployment Latencies\n{_deployment_latency_map}"
)
await self.send_alert(
message=slow_message + request_info,
level="Low",
alert_type="llm_too_slow",
)
async def async_update_daily_reports(
self, deployment_metrics: DeploymentMetrics
) -> int:
"""
Store the perf by deployment in cache
- Number of failed requests per deployment
- Latency / output tokens per deployment
'deployment_id:daily_metrics:failed_requests'
'deployment_id:daily_metrics:latency_per_output_token'
Returns
int - count of metrics set (1 - if just latency, 2 - if failed + latency)
"""
return_val = 0
try:
## FAILED REQUESTS ##
if deployment_metrics.failed_request:
await self.internal_usage_cache.async_increment_cache(
key="{}:{}".format(
deployment_metrics.id,
SlackAlertingCacheKeys.failed_requests_key.value,
),
value=1,
)
return_val += 1
## LATENCY ##
if deployment_metrics.latency_per_output_token is not None:
await self.internal_usage_cache.async_increment_cache(
key="{}:{}".format(
deployment_metrics.id, SlackAlertingCacheKeys.latency_key.value
),
value=deployment_metrics.latency_per_output_token,
)
return_val += 1
return return_val
except Exception as e:
return 0
async def send_daily_reports(self, router) -> bool:
"""
Send a daily report on:
- Top 5 deployments with most failed requests
- Top 5 slowest deployments (normalized by latency/output tokens)
Get the value from redis cache (if available) or in-memory and send it
Cleanup:
- reset values in cache -> prevent memory leak
Returns:
True -> if successfuly sent
False -> if not sent
"""
ids = router.get_model_ids()
# get keys
failed_request_keys = [
"{}:{}".format(id, SlackAlertingCacheKeys.failed_requests_key.value)
for id in ids
]
latency_keys = [
"{}:{}".format(id, SlackAlertingCacheKeys.latency_key.value) for id in ids
]
combined_metrics_keys = failed_request_keys + latency_keys # reduce cache calls
combined_metrics_values = await self.internal_usage_cache.async_batch_get_cache(
keys=combined_metrics_keys
) # [1, 2, None, ..]
if combined_metrics_values is None:
return False
all_none = True
for val in combined_metrics_values:
if val is not None and val > 0:
all_none = False
break
if all_none:
return False
failed_request_values = combined_metrics_values[
: len(failed_request_keys)
] # # [1, 2, None, ..]
latency_values = combined_metrics_values[len(failed_request_keys) :]
# find top 5 failed
## Replace None values with a placeholder value (-1 in this case)
placeholder_value = 0
replaced_failed_values = [
value if value is not None else placeholder_value
for value in failed_request_values
]
## Get the indices of top 5 keys with the highest numerical values (ignoring None and 0 values)
top_5_failed = sorted(
range(len(replaced_failed_values)),
key=lambda i: replaced_failed_values[i],
reverse=True,
)[:5]
top_5_failed = [
index for index in top_5_failed if replaced_failed_values[index] > 0
]
# find top 5 slowest
# Replace None values with a placeholder value (-1 in this case)
placeholder_value = 0
replaced_slowest_values = [
value if value is not None else placeholder_value
for value in latency_values
]
# Get the indices of top 5 values with the highest numerical values (ignoring None and 0 values)
top_5_slowest = sorted(
range(len(replaced_slowest_values)),
key=lambda i: replaced_slowest_values[i],
reverse=True,
)[:5]
top_5_slowest = [
index for index in top_5_slowest if replaced_slowest_values[index] > 0
]
# format alert -> return the litellm model name + api base
message = f"\n\nTime: `{time.time()}`s\nHere are today's key metrics 📈: \n\n"
message += "\n\n*❗️ Top Deployments with Most Failed Requests:*\n\n"
if not top_5_failed:
message += "\tNone\n"
for i in range(len(top_5_failed)):
key = failed_request_keys[top_5_failed[i]].split(":")[0]
_deployment = router.get_model_info(key)
if isinstance(_deployment, dict):
deployment_name = _deployment["litellm_params"].get("model", "")
else:
return False
api_base = litellm.get_api_base(
model=deployment_name,
optional_params=(
_deployment["litellm_params"] if _deployment is not None else {}
),
)
if api_base is None:
api_base = ""
value = replaced_failed_values[top_5_failed[i]]
message += f"\t{i+1}. Deployment: `{deployment_name}`, Failed Requests: `{value}`, API Base: `{api_base}`\n"
message += "\n\n*😅 Top Slowest Deployments:*\n\n"
if not top_5_slowest:
message += "\tNone\n"
for i in range(len(top_5_slowest)):
key = latency_keys[top_5_slowest[i]].split(":")[0]
_deployment = router.get_model_info(key)
if _deployment is not None:
deployment_name = _deployment["litellm_params"].get("model", "")
else:
deployment_name = ""
api_base = litellm.get_api_base(
model=deployment_name,
optional_params=(
_deployment["litellm_params"] if _deployment is not None else {}
),
)
value = round(replaced_slowest_values[top_5_slowest[i]], 3)
message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency per output token: `{value}s/token`, API Base: `{api_base}`\n\n"
# cache cleanup -> reset values to 0
latency_cache_keys = [(key, 0) for key in latency_keys]
failed_request_cache_keys = [(key, 0) for key in failed_request_keys]
combined_metrics_cache_keys = latency_cache_keys + failed_request_cache_keys
await self.internal_usage_cache.async_batch_set_cache(
cache_list=combined_metrics_cache_keys
)
message += f"\n\nNext Run is in: `{time.time() + self.alerting_args.daily_report_frequency}`s"
# send alert
await self.send_alert(message=message, level="Low", alert_type="daily_reports")
return True
async def response_taking_too_long(
self,
start_time: Optional[datetime.datetime] = None,
end_time: Optional[datetime.datetime] = None,
type: Literal["hanging_request", "slow_response"] = "hanging_request",
request_data: Optional[dict] = None,
):
if self.alerting is None or self.alert_types is None:
return
if request_data is not None:
model = request_data.get("model", "")
messages = request_data.get("messages", None)
if messages is None:
# if messages does not exist fallback to "input"
messages = request_data.get("input", None)
# try casting messages to str and get the first 100 characters, else mark as None
try:
messages = str(messages)
messages = messages[:100]
except:
messages = ""
if litellm.turn_off_message_logging:
messages = (
"Message not logged. `litellm.turn_off_message_logging=True`."
)
request_info = f"\nRequest Model: `{model}`\nMessages: `{messages}`"
else:
request_info = ""
if type == "hanging_request":
await asyncio.sleep(
self.alerting_threshold
) # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests
if (
request_data is not None
and request_data.get("litellm_status", "") != "success"
and request_data.get("litellm_status", "") != "fail"
):
if request_data.get("deployment", None) is not None and isinstance(
request_data["deployment"], dict
):
_api_base = litellm.get_api_base(
model=model,
optional_params=request_data["deployment"].get(
"litellm_params", {}
),
)
if _api_base is None:
_api_base = ""
request_info += f"\nAPI Base: {_api_base}"
elif request_data.get("metadata", None) is not None and isinstance(
request_data["metadata"], dict
):
# In hanging requests sometime it has not made it to the point where the deployment is passed to the `request_data``
# in that case we fallback to the api base set in the request metadata
_metadata = request_data["metadata"]
_api_base = _metadata.get("api_base", "")
request_info = litellm.utils._add_key_name_and_team_to_alert(
request_info=request_info, metadata=_metadata
)
if _api_base is None:
_api_base = ""
request_info += f"\nAPI Base: `{_api_base}`"
# only alert hanging responses if they have not been marked as success
alerting_message = (
f"`Requests are hanging - {self.alerting_threshold}s+ request time`"
)
if "langfuse" in litellm.success_callback:
langfuse_url = self._add_langfuse_trace_id_to_alert(
request_data=request_data,
)
if langfuse_url is not None:
request_info += "\n🪢 Langfuse Trace: {}".format(langfuse_url)
# add deployment latencies to alert
_deployment_latency_map = self._get_deployment_latencies_to_alert(
metadata=request_data.get("metadata", {})
)
if _deployment_latency_map is not None:
request_info += f"\nDeployment Latencies\n{_deployment_latency_map}"
await self.send_alert(
message=alerting_message + request_info,
level="Medium",
alert_type="llm_requests_hanging",
)
async def failed_tracking_alert(self, error_message: str):
"""Raise alert when tracking failed for specific model"""
_cache: DualCache = self.internal_usage_cache
message = "Failed Tracking Cost for" + error_message
_cache_key = "budget_alerts:failed_tracking:{}".format(message)
result = await _cache.async_get_cache(key=_cache_key)
if result is None:
await self.send_alert(
message=message, level="High", alert_type="budget_alerts"
)
await _cache.async_set_cache(
key=_cache_key,
value="SENT",
ttl=self.alerting_args.budget_alert_ttl,
)
async def budget_alerts(
self,
type: Literal[
"token_budget",
"user_budget",
"team_budget",
"proxy_budget",
"projected_limit_exceeded",
],
user_info: CallInfo,
):
## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727
# - Alert once within 24hr period
# - Cache this information
# - Don't re-alert, if alert already sent
_cache: DualCache = self.internal_usage_cache
if self.alerting is None or self.alert_types is None:
# do nothing if alerting is not switched on
return
if "budget_alerts" not in self.alert_types:
return
_id: str = "default_id" # used for caching
user_info_json = user_info.model_dump(exclude_none=True)
for k, v in user_info_json.items():
user_info_str = "\n{}: {}\n".format(k, v)
event: Optional[
Literal["budget_crossed", "threshold_crossed", "projected_limit_exceeded"]
] = None
event_group: Optional[Literal["user", "team", "key", "proxy"]] = None
event_message: str = ""
webhook_event: Optional[WebhookEvent] = None
if type == "proxy_budget":
event_group = "proxy"
event_message += "Proxy Budget: "
elif type == "user_budget":
event_group = "user"
event_message += "User Budget: "
_id = user_info.user_id or _id
elif type == "team_budget":
event_group = "team"
event_message += "Team Budget: "
_id = user_info.team_id or _id
elif type == "token_budget":
event_group = "key"
event_message += "Key Budget: "
_id = user_info.token
elif type == "projected_limit_exceeded":
event_group = "key"
event_message += "Key Budget: Projected Limit Exceeded"
event = "projected_limit_exceeded"
_id = user_info.token
# percent of max_budget left to spend
if user_info.max_budget > 0:
percent_left = (
user_info.max_budget - user_info.spend
) / user_info.max_budget
else:
percent_left = 0
# check if crossed budget
if user_info.spend >= user_info.max_budget:
event = "budget_crossed"
event_message += f"Budget Crossed\n Total Budget:`{user_info.max_budget}`"
elif percent_left <= 0.05:
event = "threshold_crossed"
event_message += "5% Threshold Crossed "
elif percent_left <= 0.15:
event = "threshold_crossed"
event_message += "15% Threshold Crossed"
if event is not None and event_group is not None:
_cache_key = "budget_alerts:{}:{}".format(event, _id)
result = await _cache.async_get_cache(key=_cache_key)
if result is None:
webhook_event = WebhookEvent(
event=event,
event_group=event_group,
event_message=event_message,
**user_info_json,
)
await self.send_alert(
message=event_message + "\n\n" + user_info_str,
level="High",
alert_type="budget_alerts",
user_info=webhook_event,
)
await _cache.async_set_cache(
key=_cache_key,
value="SENT",
ttl=self.alerting_args.budget_alert_ttl,
)
return
return
async def model_added_alert(
self, model_name: str, litellm_model_name: str, passed_model_info: Any
):
base_model_from_user = getattr(passed_model_info, "base_model", None)
model_info = {}
base_model = ""
if base_model_from_user is not None:
model_info = litellm.model_cost.get(base_model_from_user, {})
base_model = f"Base Model: `{base_model_from_user}`\n"
else:
model_info = litellm.model_cost.get(litellm_model_name, {})
model_info_str = ""
for k, v in model_info.items():
if k == "input_cost_per_token" or k == "output_cost_per_token":
# when converting to string it should not be 1.63e-06
v = "{:.8f}".format(v)
model_info_str += f"{k}: {v}\n"
message = f"""
*🚅 New Model Added*
Model Name: `{model_name}`
{base_model}
Usage OpenAI Python SDK:
```
import openai
client = openai.OpenAI(
api_key="your_api_key",
base_url={os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")}
)
response = client.chat.completions.create(
model="{model_name}", # model to send to the proxy
messages = [
{{
"role": "user",
"content": "this is a test request, write a short poem"
}}
]
)
```
Model Info:
```
{model_info_str}
```
"""
await self.send_alert(
message=message, level="Low", alert_type="new_model_added"
)
pass
async def model_removed_alert(self, model_name: str):
pass
async def send_webhook_alert(self, webhook_event: WebhookEvent) -> bool:
"""
Sends structured alert to webhook, if set.
Currently only implemented for budget alerts
Returns -> True if sent, False if not.
"""
webhook_url = os.getenv("WEBHOOK_URL", None)
if webhook_url is None:
raise Exception("Missing webhook_url from environment")
payload = webhook_event.model_dump_json()
headers = {"Content-type": "application/json"}
response = await self.async_http_handler.post(
url=webhook_url,
headers=headers,
data=payload,
)
if response.status_code == 200:
return True
else:
print("Error sending webhook alert. Error=", response.text) # noqa
return False
async def send_key_created_email(self, webhook_event: WebhookEvent) -> bool:
from litellm.proxy.utils import send_email
if self.alerting is None or "email" not in self.alerting:
# do nothing if user does not want email alerts
return False
# make sure this is a premium user
from litellm.proxy.proxy_server import premium_user
from litellm.proxy.proxy_server import CommonProxyErrors, prisma_client
if premium_user != True:
raise Exception(
f"Trying to use Email Alerting on key creation\n {CommonProxyErrors.not_premium_user.value}"
)
event_name = webhook_event.event_message
recipient_email = webhook_event.user_email
recipient_user_id = webhook_event.user_id
if (
recipient_email is None
and recipient_user_id is not None
and prisma_client is not None
):
user_row = await prisma_client.db.litellm_usertable.find_unique(
where={"user_id": recipient_user_id}
)
if user_row is not None:
recipient_email = user_row.user_email
key_name = webhook_event.key_alias
key_token = webhook_event.token
key_budget = webhook_event.max_budget
email_html_content = "Alert from LiteLLM Server"
if recipient_email is None:
verbose_proxy_logger.error(
"Trying to send email alert to no recipient", extra=webhook_event.dict()
)
email_html_content = f"""
<img src="{LITELLM_LOGO_URL}" alt="LiteLLM Logo" width="150" height="50" />
<p> Hi {recipient_email}, <br/>
I'm happy to provide you with an OpenAI Proxy API Key, loaded with ${key_budget} per month. <br /> <br />
<b>
Key: <pre>{key_token}</pre> <br>
</b>
<h2>Usage Example</h2>
Detailed Documentation on <a href="https://docs.litellm.ai/docs/proxy/user_keys">Usage with OpenAI Python SDK, Langchain, LlamaIndex, Curl</a>
<pre>
import openai
client = openai.OpenAI(
api_key="{key_token}",
base_url={os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")}
)
response = client.chat.completions.create(
model="gpt-3.5-turbo", # model to send to the proxy
messages = [
{{
"role": "user",
"content": "this is a test request, write a short poem"
}}
]
)
</pre>
If you have any questions, please send an email to support@berri.ai <br /> <br />
Best, <br />
The LiteLLM team <br />
"""
payload = webhook_event.model_dump_json()
email_event = {
"to": recipient_email,
"subject": f"LiteLLM: {event_name}",
"html": email_html_content,
}
response = await send_email(
receiver_email=email_event["to"],
subject=email_event["subject"],
html=email_event["html"],
)
return False
async def send_email_alert_using_smtp(self, webhook_event: WebhookEvent) -> bool:
"""
Sends structured Email alert to an SMTP server
Currently only implemented for budget alerts
Returns -> True if sent, False if not.
"""
from litellm.proxy.utils import send_email
event_name = webhook_event.event_message
recipient_email = webhook_event.user_email
user_name = webhook_event.user_id
max_budget = webhook_event.max_budget
email_html_content = "Alert from LiteLLM Server"
if recipient_email is None:
verbose_proxy_logger.error(
"Trying to send email alert to no recipient", extra=webhook_event.dict()
)
if webhook_event.event == "budget_crossed":
email_html_content = f"""
<img src="{LITELLM_LOGO_URL}" alt="LiteLLM Logo" width="150" height="50" />
<p> Hi {user_name}, <br/>
Your LLM API usage this month has reached your account's <b> monthly budget of ${max_budget} </b> <br /> <br />
API requests will be rejected until either (a) you increase your monthly budget or (b) your monthly usage resets at the beginning of the next calendar month. <br /> <br />
If you have any questions, please send an email to support@berri.ai <br /> <br />
Best, <br />
The LiteLLM team <br />
"""
payload = webhook_event.model_dump_json()
email_event = {
"to": recipient_email,
"subject": f"LiteLLM: {event_name}",
"html": email_html_content,
}
response = await send_email(
receiver_email=email_event["to"],
subject=email_event["subject"],
html=email_event["html"],
)
return False
async def send_alert(
self,
message: str,
level: Literal["Low", "Medium", "High"],
alert_type: Literal[
"llm_exceptions",
"llm_too_slow",
"llm_requests_hanging",
"budget_alerts",
"db_exceptions",
"daily_reports",
"spend_reports",
"new_model_added",
"cooldown_deployment",
],
user_info: Optional[WebhookEvent] = None,
**kwargs,
):
"""
Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298
- Responses taking too long
- Requests are hanging
- Calls are failing
- DB Read/Writes are failing
- Proxy Close to max budget
- Key Close to max budget
Parameters:
level: str - Low|Medium|High - if calls might fail (Medium) or are failing (High); Currently, no alerts would be 'Low'.
message: str - what is the alert about
"""
if self.alerting is None:
return
if (
"webhook" in self.alerting
and alert_type == "budget_alerts"
and user_info is not None
):
await self.send_webhook_alert(webhook_event=user_info)
if (
"email" in self.alerting
and alert_type == "budget_alerts"
and user_info is not None
):
# only send budget alerts over Email
await self.send_email_alert_using_smtp(webhook_event=user_info)
if "slack" not in self.alerting:
return
if alert_type not in self.alert_types:
return
from datetime import datetime
import json
# Get the current timestamp
current_time = datetime.now().strftime("%H:%M:%S")
_proxy_base_url = os.getenv("PROXY_BASE_URL", None)
if alert_type == "daily_reports" or alert_type == "new_model_added":
formatted_message = message
else:
formatted_message = (
f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}"
)
if kwargs:
for key, value in kwargs.items():
formatted_message += f"\n\n{key}: `{value}`\n\n"
if _proxy_base_url is not None:
formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`"
# check if we find the slack webhook url in self.alert_to_webhook_url
if (
self.alert_to_webhook_url is not None
and alert_type in self.alert_to_webhook_url
):
slack_webhook_url = self.alert_to_webhook_url[alert_type]
elif self.default_webhook_url is not None:
slack_webhook_url = self.default_webhook_url
else:
slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL", None)
if slack_webhook_url is None:
raise Exception("Missing SLACK_WEBHOOK_URL from environment")
payload = {"text": formatted_message}
headers = {"Content-type": "application/json"}
response = await self.async_http_handler.post(
url=slack_webhook_url,
headers=headers,
data=json.dumps(payload),
)
if response.status_code == 200:
pass
else:
print("Error sending slack alert. Error=", response.text) # noqa
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
"""Log deployment latency"""
try:
if "daily_reports" in self.alert_types:
model_id = (
kwargs.get("litellm_params", {}).get("model_info", {}).get("id", "")
)
response_s: timedelta = end_time - start_time
final_value = response_s
total_tokens = 0
if isinstance(response_obj, litellm.ModelResponse):
completion_tokens = response_obj.usage.completion_tokens
if completion_tokens is not None and completion_tokens > 0:
final_value = float(
response_s.total_seconds() / completion_tokens
)
await self.async_update_daily_reports(
DeploymentMetrics(
id=model_id,
failed_request=False,
latency_per_output_token=final_value,
updated_at=litellm.utils.get_utc_datetime(),
)
)
except Exception as e:
verbose_proxy_logger.error(
"[Non-Blocking Error] Slack Alerting: Got error in logging LLM deployment latency: ",
e,
)
pass
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
"""Log failure + deployment latency"""
if "daily_reports" in self.alert_types:
model_id = (
kwargs.get("litellm_params", {}).get("model_info", {}).get("id", "")
)
await self.async_update_daily_reports(
DeploymentMetrics(
id=model_id,
failed_request=True,
latency_per_output_token=None,
updated_at=litellm.utils.get_utc_datetime(),
)
)
async def _run_scheduler_helper(self, llm_router) -> bool:
"""
Returns:
- True -> report sent
- False -> report not sent
"""
report_sent_bool = False
report_sent = await self.internal_usage_cache.async_get_cache(
key=SlackAlertingCacheKeys.report_sent_key.value
) # None | float
current_time = time.time()
if report_sent is None:
await self.internal_usage_cache.async_set_cache(
key=SlackAlertingCacheKeys.report_sent_key.value,
value=current_time,
)
elif isinstance(report_sent, float):
# Check if current time - interval >= time last sent
interval_seconds = self.alerting_args.daily_report_frequency
if current_time - report_sent >= interval_seconds:
# Sneak in the reporting logic here
await self.send_daily_reports(router=llm_router)
# Also, don't forget to update the report_sent time after sending the report!
await self.internal_usage_cache.async_set_cache(
key=SlackAlertingCacheKeys.report_sent_key.value,
value=current_time,
)
report_sent_bool = True
return report_sent_bool
async def _run_scheduled_daily_report(self, llm_router: Optional[Any] = None):
"""
If 'daily_reports' enabled
Ping redis cache every 5 minutes to check if we should send the report
If yes -> call send_daily_report()
"""
if llm_router is None or self.alert_types is None:
return
if "daily_reports" in self.alert_types:
while True:
await self._run_scheduler_helper(llm_router=llm_router)
interval = random.randint(
self.alerting_args.report_check_interval - 3,
self.alerting_args.report_check_interval + 3,
) # shuffle to prevent collisions
await asyncio.sleep(interval)
return
async def send_weekly_spend_report(self):
""" """
try:
from litellm.proxy.proxy_server import _get_spend_report_for_time_range
todays_date = datetime.datetime.now().date()
week_before = todays_date - datetime.timedelta(days=7)
weekly_spend_per_team, weekly_spend_per_tag = (
await _get_spend_report_for_time_range(
start_date=week_before.strftime("%Y-%m-%d"),
end_date=todays_date.strftime("%Y-%m-%d"),
)
)
_weekly_spend_message = f"*💸 Weekly Spend Report for `{week_before.strftime('%m-%d-%Y')} - {todays_date.strftime('%m-%d-%Y')}` *\n"
if weekly_spend_per_team is not None:
_weekly_spend_message += "\n*Team Spend Report:*\n"
for spend in weekly_spend_per_team:
_team_spend = spend["total_spend"]
_team_spend = float(_team_spend)
# round to 4 decimal places
_team_spend = round(_team_spend, 4)
_weekly_spend_message += (
f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n"
)
if weekly_spend_per_tag is not None:
_weekly_spend_message += "\n*Tag Spend Report:*\n"
for spend in weekly_spend_per_tag:
_tag_spend = spend["total_spend"]
_tag_spend = float(_tag_spend)
# round to 4 decimal places
_tag_spend = round(_tag_spend, 4)
_weekly_spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n"
await self.send_alert(
message=_weekly_spend_message,
level="Low",
alert_type="spend_reports",
)
except Exception as e:
verbose_proxy_logger.error("Error sending weekly spend report", e)
async def send_monthly_spend_report(self):
""" """
try:
from calendar import monthrange
from litellm.proxy.proxy_server import _get_spend_report_for_time_range
todays_date = datetime.datetime.now().date()
first_day_of_month = todays_date.replace(day=1)
_, last_day_of_month = monthrange(todays_date.year, todays_date.month)
last_day_of_month = first_day_of_month + datetime.timedelta(
days=last_day_of_month - 1
)
monthly_spend_per_team, monthly_spend_per_tag = (
await _get_spend_report_for_time_range(
start_date=first_day_of_month.strftime("%Y-%m-%d"),
end_date=last_day_of_month.strftime("%Y-%m-%d"),
)
)
_spend_message = f"*💸 Monthly Spend Report for `{first_day_of_month.strftime('%m-%d-%Y')} - {last_day_of_month.strftime('%m-%d-%Y')}` *\n"
if monthly_spend_per_team is not None:
_spend_message += "\n*Team Spend Report:*\n"
for spend in monthly_spend_per_team:
_team_spend = spend["total_spend"]
_team_spend = float(_team_spend)
# round to 4 decimal places
_team_spend = round(_team_spend, 4)
_spend_message += (
f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n"
)
if monthly_spend_per_tag is not None:
_spend_message += "\n*Tag Spend Report:*\n"
for spend in monthly_spend_per_tag:
_tag_spend = spend["total_spend"]
_tag_spend = float(_tag_spend)
# round to 4 decimal places
_tag_spend = round(_tag_spend, 4)
_spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n"
await self.send_alert(
message=_spend_message,
level="Low",
alert_type="spend_reports",
)
except Exception as e:
verbose_proxy_logger.error("Error sending weekly spend report", e)