mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 10:44:24 +00:00
commitb12a9892b7
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Wed Apr 2 08:09:56 2025 -0700 fix(utils.py): don't modify openai_token_counter commit294de31803
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 21:22:40 2025 -0700 fix: fix linting error commitcb6e9fbe40
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 19:52:45 2025 -0700 refactor: complete migration commitbfc159172d
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 19:09:59 2025 -0700 refactor: refactor more constants commit43ffb6a558
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 18:45:24 2025 -0700 fix: test commit04dbe4310c
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 18:28:58 2025 -0700 refactor: refactor: move more constants into constants.py commit3c26284aff
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 18:14:46 2025 -0700 refactor: migrate hardcoded constants out of __init__.py commitc11e0de69d
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 18:11:21 2025 -0700 build: migrate all constants into constants.py commit7882bdc787
Author: Krrish Dholakia <krrishdholakia@gmail.com> Date: Mon Mar 24 18:07:37 2025 -0700 build: initial test banning hardcoded numbers in repo
1825 lines
66 KiB
Python
1825 lines
66 KiB
Python
#### What this does ####
|
|
# Class for sending Slack Alerts #
|
|
import asyncio
|
|
import datetime
|
|
import os
|
|
import random
|
|
import time
|
|
from datetime import timedelta
|
|
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union
|
|
|
|
from openai import APIError
|
|
|
|
import litellm
|
|
import litellm.litellm_core_utils
|
|
import litellm.litellm_core_utils.litellm_logging
|
|
import litellm.types
|
|
from litellm._logging import verbose_logger, verbose_proxy_logger
|
|
from litellm.caching.caching import DualCache
|
|
from litellm.constants import HOURS_IN_A_DAY
|
|
from litellm.integrations.custom_batch_logger import CustomBatchLogger
|
|
from litellm.litellm_core_utils.duration_parser import duration_in_seconds
|
|
from litellm.litellm_core_utils.exception_mapping_utils import (
|
|
_add_key_name_and_team_to_alert,
|
|
)
|
|
from litellm.llms.custom_httpx.http_handler import (
|
|
get_async_httpx_client,
|
|
httpxSpecialProvider,
|
|
)
|
|
from litellm.proxy._types import AlertType, CallInfo, VirtualKeyEvent, WebhookEvent
|
|
from litellm.types.integrations.slack_alerting import *
|
|
|
|
from ..email_templates.templates import *
|
|
from .batching_handler import send_to_webhook, squash_payloads
|
|
from .utils import _add_langfuse_trace_id_to_alert, process_slack_alerting_variables
|
|
|
|
if TYPE_CHECKING:
|
|
from litellm.router import Router as _Router
|
|
|
|
Router = _Router
|
|
else:
|
|
Router = Any
|
|
|
|
|
|
class SlackAlerting(CustomBatchLogger):
|
|
"""
|
|
Class for sending Slack Alerts
|
|
"""
|
|
|
|
# Class variables or attributes
|
|
def __init__(
|
|
self,
|
|
internal_usage_cache: Optional[DualCache] = None,
|
|
alerting_threshold: Optional[
|
|
float
|
|
] = None, # threshold for slow / hanging llm responses (in seconds)
|
|
alerting: Optional[List] = [],
|
|
alert_types: List[AlertType] = DEFAULT_ALERT_TYPES,
|
|
alert_to_webhook_url: Optional[
|
|
Dict[AlertType, Union[List[str], str]]
|
|
] = None, # if user wants to separate alerts to diff channels
|
|
alerting_args={},
|
|
default_webhook_url: Optional[str] = None,
|
|
**kwargs,
|
|
):
|
|
if alerting_threshold is None:
|
|
alerting_threshold = 300
|
|
self.alerting_threshold = alerting_threshold
|
|
self.alerting = alerting
|
|
self.alert_types = alert_types
|
|
self.internal_usage_cache = internal_usage_cache or DualCache()
|
|
self.async_http_handler = get_async_httpx_client(
|
|
llm_provider=httpxSpecialProvider.LoggingCallback
|
|
)
|
|
self.alert_to_webhook_url = process_slack_alerting_variables(
|
|
alert_to_webhook_url=alert_to_webhook_url
|
|
)
|
|
self.is_running = False
|
|
self.alerting_args = SlackAlertingArgs(**alerting_args)
|
|
self.default_webhook_url = default_webhook_url
|
|
self.flush_lock = asyncio.Lock()
|
|
super().__init__(**kwargs, flush_lock=self.flush_lock)
|
|
|
|
def update_values(
|
|
self,
|
|
alerting: Optional[List] = None,
|
|
alerting_threshold: Optional[float] = None,
|
|
alert_types: Optional[List[AlertType]] = None,
|
|
alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]] = None,
|
|
alerting_args: Optional[Dict] = None,
|
|
llm_router: Optional[Router] = None,
|
|
):
|
|
if alerting is not None:
|
|
self.alerting = alerting
|
|
asyncio.create_task(self.periodic_flush())
|
|
if alerting_threshold is not None:
|
|
self.alerting_threshold = alerting_threshold
|
|
if alert_types is not None:
|
|
self.alert_types = alert_types
|
|
if alerting_args is not None:
|
|
self.alerting_args = SlackAlertingArgs(**alerting_args)
|
|
if alert_to_webhook_url is not None:
|
|
# update the dict
|
|
if self.alert_to_webhook_url is None:
|
|
self.alert_to_webhook_url = process_slack_alerting_variables(
|
|
alert_to_webhook_url=alert_to_webhook_url
|
|
)
|
|
else:
|
|
_new_values = (
|
|
process_slack_alerting_variables(
|
|
alert_to_webhook_url=alert_to_webhook_url
|
|
)
|
|
or {}
|
|
)
|
|
self.alert_to_webhook_url.update(_new_values)
|
|
if llm_router is not None:
|
|
self.llm_router = llm_router
|
|
|
|
async def deployment_in_cooldown(self):
|
|
pass
|
|
|
|
async def deployment_removed_from_cooldown(self):
|
|
pass
|
|
|
|
def _all_possible_alert_types(self):
|
|
# used by the UI to show all supported alert types
|
|
# Note: This is not the alerts the user has configured, instead it's all possible alert types a user can select
|
|
# return list of all values AlertType enum
|
|
return list(AlertType)
|
|
|
|
def _response_taking_too_long_callback_helper(
|
|
self,
|
|
kwargs, # kwargs to completion
|
|
start_time,
|
|
end_time, # start/end time
|
|
):
|
|
try:
|
|
time_difference = end_time - start_time
|
|
# Convert the timedelta to float (in seconds)
|
|
time_difference_float = time_difference.total_seconds()
|
|
litellm_params = kwargs.get("litellm_params", {})
|
|
model = kwargs.get("model", "")
|
|
api_base = litellm.get_api_base(model=model, optional_params=litellm_params)
|
|
messages = kwargs.get("messages", None)
|
|
# if messages does not exist fallback to "input"
|
|
if messages is None:
|
|
messages = kwargs.get("input", None)
|
|
|
|
# only use first 100 chars for alerting
|
|
_messages = str(messages)[:100]
|
|
|
|
return time_difference_float, model, api_base, _messages
|
|
except Exception as e:
|
|
raise e
|
|
|
|
def _get_deployment_latencies_to_alert(self, metadata=None):
|
|
if metadata is None:
|
|
return None
|
|
|
|
if "_latency_per_deployment" in metadata:
|
|
# Translate model_id to -> api_base
|
|
# _latency_per_deployment is a dictionary that looks like this:
|
|
"""
|
|
_latency_per_deployment: {
|
|
api_base: 0.01336697916666667
|
|
}
|
|
"""
|
|
_message_to_send = ""
|
|
_deployment_latencies = metadata["_latency_per_deployment"]
|
|
if len(_deployment_latencies) == 0:
|
|
return None
|
|
_deployment_latency_map: Optional[dict] = None
|
|
try:
|
|
# try sorting deployments by latency
|
|
_deployment_latencies = sorted(
|
|
_deployment_latencies.items(), key=lambda x: x[1]
|
|
)
|
|
_deployment_latency_map = dict(_deployment_latencies)
|
|
except Exception:
|
|
pass
|
|
|
|
if _deployment_latency_map is None:
|
|
return
|
|
|
|
for api_base, latency in _deployment_latency_map.items():
|
|
_message_to_send += f"\n{api_base}: {round(latency,2)}s"
|
|
_message_to_send = "```" + _message_to_send + "```"
|
|
return _message_to_send
|
|
|
|
async def response_taking_too_long_callback(
|
|
self,
|
|
kwargs, # kwargs to completion
|
|
completion_response, # response from completion
|
|
start_time,
|
|
end_time, # start/end time
|
|
):
|
|
if self.alerting is None or self.alert_types is None:
|
|
return
|
|
|
|
(
|
|
time_difference_float,
|
|
model,
|
|
api_base,
|
|
messages,
|
|
) = self._response_taking_too_long_callback_helper(
|
|
kwargs=kwargs,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
)
|
|
if litellm.turn_off_message_logging or litellm.redact_messages_in_exceptions:
|
|
messages = "Message not logged. litellm.redact_messages_in_exceptions=True"
|
|
request_info = f"\nRequest Model: `{model}`\nAPI Base: `{api_base}`\nMessages: `{messages}`"
|
|
slow_message = f"`Responses are slow - {round(time_difference_float,2)}s response time > Alerting threshold: {self.alerting_threshold}s`"
|
|
alerting_metadata: dict = {}
|
|
if time_difference_float > self.alerting_threshold:
|
|
# add deployment latencies to alert
|
|
if (
|
|
kwargs is not None
|
|
and "litellm_params" in kwargs
|
|
and "metadata" in kwargs["litellm_params"]
|
|
):
|
|
_metadata: dict = kwargs["litellm_params"]["metadata"]
|
|
request_info = _add_key_name_and_team_to_alert(
|
|
request_info=request_info, metadata=_metadata
|
|
)
|
|
|
|
_deployment_latency_map = self._get_deployment_latencies_to_alert(
|
|
metadata=_metadata
|
|
)
|
|
if _deployment_latency_map is not None:
|
|
request_info += (
|
|
f"\nAvailable Deployment Latencies\n{_deployment_latency_map}"
|
|
)
|
|
|
|
if "alerting_metadata" in _metadata:
|
|
alerting_metadata = _metadata["alerting_metadata"]
|
|
await self.send_alert(
|
|
message=slow_message + request_info,
|
|
level="Low",
|
|
alert_type=AlertType.llm_too_slow,
|
|
alerting_metadata=alerting_metadata,
|
|
)
|
|
|
|
async def async_update_daily_reports(
|
|
self, deployment_metrics: DeploymentMetrics
|
|
) -> int:
|
|
"""
|
|
Store the perf by deployment in cache
|
|
- Number of failed requests per deployment
|
|
- Latency / output tokens per deployment
|
|
|
|
'deployment_id:daily_metrics:failed_requests'
|
|
'deployment_id:daily_metrics:latency_per_output_token'
|
|
|
|
Returns
|
|
int - count of metrics set (1 - if just latency, 2 - if failed + latency)
|
|
"""
|
|
|
|
return_val = 0
|
|
try:
|
|
## FAILED REQUESTS ##
|
|
if deployment_metrics.failed_request:
|
|
await self.internal_usage_cache.async_increment_cache(
|
|
key="{}:{}".format(
|
|
deployment_metrics.id,
|
|
SlackAlertingCacheKeys.failed_requests_key.value,
|
|
),
|
|
value=1,
|
|
parent_otel_span=None, # no attached request, this is a background operation
|
|
)
|
|
|
|
return_val += 1
|
|
|
|
## LATENCY ##
|
|
if deployment_metrics.latency_per_output_token is not None:
|
|
await self.internal_usage_cache.async_increment_cache(
|
|
key="{}:{}".format(
|
|
deployment_metrics.id, SlackAlertingCacheKeys.latency_key.value
|
|
),
|
|
value=deployment_metrics.latency_per_output_token,
|
|
parent_otel_span=None, # no attached request, this is a background operation
|
|
)
|
|
|
|
return_val += 1
|
|
|
|
return return_val
|
|
except Exception:
|
|
return 0
|
|
|
|
async def send_daily_reports(self, router) -> bool: # noqa: PLR0915
|
|
"""
|
|
Send a daily report on:
|
|
- Top 5 deployments with most failed requests
|
|
- Top 5 slowest deployments (normalized by latency/output tokens)
|
|
|
|
Get the value from redis cache (if available) or in-memory and send it
|
|
|
|
Cleanup:
|
|
- reset values in cache -> prevent memory leak
|
|
|
|
Returns:
|
|
True -> if successfuly sent
|
|
False -> if not sent
|
|
"""
|
|
|
|
ids = router.get_model_ids()
|
|
|
|
# get keys
|
|
failed_request_keys = [
|
|
"{}:{}".format(id, SlackAlertingCacheKeys.failed_requests_key.value)
|
|
for id in ids
|
|
]
|
|
latency_keys = [
|
|
"{}:{}".format(id, SlackAlertingCacheKeys.latency_key.value) for id in ids
|
|
]
|
|
|
|
combined_metrics_keys = failed_request_keys + latency_keys # reduce cache calls
|
|
|
|
combined_metrics_values = await self.internal_usage_cache.async_batch_get_cache(
|
|
keys=combined_metrics_keys
|
|
) # [1, 2, None, ..]
|
|
|
|
if combined_metrics_values is None:
|
|
return False
|
|
|
|
all_none = True
|
|
for val in combined_metrics_values:
|
|
if val is not None and val > 0:
|
|
all_none = False
|
|
break
|
|
|
|
if all_none:
|
|
return False
|
|
|
|
failed_request_values = combined_metrics_values[
|
|
: len(failed_request_keys)
|
|
] # # [1, 2, None, ..]
|
|
latency_values = combined_metrics_values[len(failed_request_keys) :]
|
|
|
|
# find top 5 failed
|
|
## Replace None values with a placeholder value (-1 in this case)
|
|
placeholder_value = 0
|
|
replaced_failed_values = [
|
|
value if value is not None else placeholder_value
|
|
for value in failed_request_values
|
|
]
|
|
|
|
## Get the indices of top 5 keys with the highest numerical values (ignoring None and 0 values)
|
|
top_5_failed = sorted(
|
|
range(len(replaced_failed_values)),
|
|
key=lambda i: replaced_failed_values[i],
|
|
reverse=True,
|
|
)[:5]
|
|
top_5_failed = [
|
|
index for index in top_5_failed if replaced_failed_values[index] > 0
|
|
]
|
|
|
|
# find top 5 slowest
|
|
# Replace None values with a placeholder value (-1 in this case)
|
|
placeholder_value = 0
|
|
replaced_slowest_values = [
|
|
value if value is not None else placeholder_value
|
|
for value in latency_values
|
|
]
|
|
|
|
# Get the indices of top 5 values with the highest numerical values (ignoring None and 0 values)
|
|
top_5_slowest = sorted(
|
|
range(len(replaced_slowest_values)),
|
|
key=lambda i: replaced_slowest_values[i],
|
|
reverse=True,
|
|
)[:5]
|
|
top_5_slowest = [
|
|
index for index in top_5_slowest if replaced_slowest_values[index] > 0
|
|
]
|
|
|
|
# format alert -> return the litellm model name + api base
|
|
message = f"\n\nTime: `{time.time()}`s\nHere are today's key metrics 📈: \n\n"
|
|
|
|
message += "\n\n*❗️ Top Deployments with Most Failed Requests:*\n\n"
|
|
if not top_5_failed:
|
|
message += "\tNone\n"
|
|
for i in range(len(top_5_failed)):
|
|
key = failed_request_keys[top_5_failed[i]].split(":")[0]
|
|
_deployment = router.get_model_info(key)
|
|
if isinstance(_deployment, dict):
|
|
deployment_name = _deployment["litellm_params"].get("model", "")
|
|
else:
|
|
return False
|
|
|
|
api_base = litellm.get_api_base(
|
|
model=deployment_name,
|
|
optional_params=(
|
|
_deployment["litellm_params"] if _deployment is not None else {}
|
|
),
|
|
)
|
|
if api_base is None:
|
|
api_base = ""
|
|
value = replaced_failed_values[top_5_failed[i]]
|
|
message += f"\t{i+1}. Deployment: `{deployment_name}`, Failed Requests: `{value}`, API Base: `{api_base}`\n"
|
|
|
|
message += "\n\n*😅 Top Slowest Deployments:*\n\n"
|
|
if not top_5_slowest:
|
|
message += "\tNone\n"
|
|
for i in range(len(top_5_slowest)):
|
|
key = latency_keys[top_5_slowest[i]].split(":")[0]
|
|
_deployment = router.get_model_info(key)
|
|
if _deployment is not None:
|
|
deployment_name = _deployment["litellm_params"].get("model", "")
|
|
else:
|
|
deployment_name = ""
|
|
api_base = litellm.get_api_base(
|
|
model=deployment_name,
|
|
optional_params=(
|
|
_deployment["litellm_params"] if _deployment is not None else {}
|
|
),
|
|
)
|
|
value = round(replaced_slowest_values[top_5_slowest[i]], 3)
|
|
message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency per output token: `{value}s/token`, API Base: `{api_base}`\n\n"
|
|
|
|
# cache cleanup -> reset values to 0
|
|
latency_cache_keys = [(key, 0) for key in latency_keys]
|
|
failed_request_cache_keys = [(key, 0) for key in failed_request_keys]
|
|
combined_metrics_cache_keys = latency_cache_keys + failed_request_cache_keys
|
|
await self.internal_usage_cache.async_set_cache_pipeline(
|
|
cache_list=combined_metrics_cache_keys
|
|
)
|
|
|
|
message += f"\n\nNext Run is at: `{time.time() + self.alerting_args.daily_report_frequency}`s"
|
|
|
|
# send alert
|
|
await self.send_alert(
|
|
message=message,
|
|
level="Low",
|
|
alert_type=AlertType.daily_reports,
|
|
alerting_metadata={},
|
|
)
|
|
|
|
return True
|
|
|
|
async def response_taking_too_long(
|
|
self,
|
|
start_time: Optional[datetime.datetime] = None,
|
|
end_time: Optional[datetime.datetime] = None,
|
|
type: Literal["hanging_request", "slow_response"] = "hanging_request",
|
|
request_data: Optional[dict] = None,
|
|
):
|
|
if self.alerting is None or self.alert_types is None:
|
|
return
|
|
model: str = ""
|
|
if request_data is not None:
|
|
model = request_data.get("model", "")
|
|
messages = request_data.get("messages", None)
|
|
if messages is None:
|
|
# if messages does not exist fallback to "input"
|
|
messages = request_data.get("input", None)
|
|
|
|
# try casting messages to str and get the first 100 characters, else mark as None
|
|
try:
|
|
messages = str(messages)
|
|
messages = messages[:100]
|
|
except Exception:
|
|
messages = ""
|
|
|
|
if (
|
|
litellm.turn_off_message_logging
|
|
or litellm.redact_messages_in_exceptions
|
|
):
|
|
messages = (
|
|
"Message not logged. litellm.redact_messages_in_exceptions=True"
|
|
)
|
|
request_info = f"\nRequest Model: `{model}`\nMessages: `{messages}`"
|
|
else:
|
|
request_info = ""
|
|
|
|
if type == "hanging_request":
|
|
await asyncio.sleep(
|
|
self.alerting_threshold
|
|
) # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests
|
|
alerting_metadata: dict = {}
|
|
if await self._request_is_completed(request_data=request_data) is True:
|
|
return
|
|
|
|
if request_data is not None:
|
|
if request_data.get("deployment", None) is not None and isinstance(
|
|
request_data["deployment"], dict
|
|
):
|
|
_api_base = litellm.get_api_base(
|
|
model=model,
|
|
optional_params=request_data["deployment"].get(
|
|
"litellm_params", {}
|
|
),
|
|
)
|
|
|
|
if _api_base is None:
|
|
_api_base = ""
|
|
|
|
request_info += f"\nAPI Base: {_api_base}"
|
|
elif request_data.get("metadata", None) is not None and isinstance(
|
|
request_data["metadata"], dict
|
|
):
|
|
# In hanging requests sometime it has not made it to the point where the deployment is passed to the `request_data``
|
|
# in that case we fallback to the api base set in the request metadata
|
|
_metadata: dict = request_data["metadata"]
|
|
_api_base = _metadata.get("api_base", "")
|
|
|
|
request_info = _add_key_name_and_team_to_alert(
|
|
request_info=request_info, metadata=_metadata
|
|
)
|
|
|
|
if _api_base is None:
|
|
_api_base = ""
|
|
|
|
if "alerting_metadata" in _metadata:
|
|
alerting_metadata = _metadata["alerting_metadata"]
|
|
request_info += f"\nAPI Base: `{_api_base}`"
|
|
# only alert hanging responses if they have not been marked as success
|
|
alerting_message = (
|
|
f"`Requests are hanging - {self.alerting_threshold}s+ request time`"
|
|
)
|
|
|
|
if "langfuse" in litellm.success_callback:
|
|
langfuse_url = await _add_langfuse_trace_id_to_alert(
|
|
request_data=request_data,
|
|
)
|
|
|
|
if langfuse_url is not None:
|
|
request_info += "\n🪢 Langfuse Trace: {}".format(langfuse_url)
|
|
|
|
# add deployment latencies to alert
|
|
_deployment_latency_map = self._get_deployment_latencies_to_alert(
|
|
metadata=request_data.get("metadata", {})
|
|
)
|
|
if _deployment_latency_map is not None:
|
|
request_info += f"\nDeployment Latencies\n{_deployment_latency_map}"
|
|
|
|
await self.send_alert(
|
|
message=alerting_message + request_info,
|
|
level="Medium",
|
|
alert_type=AlertType.llm_requests_hanging,
|
|
alerting_metadata=alerting_metadata,
|
|
)
|
|
|
|
async def failed_tracking_alert(self, error_message: str, failing_model: str):
|
|
"""
|
|
Raise alert when tracking failed for specific model
|
|
|
|
Args:
|
|
error_message (str): Error message
|
|
failing_model (str): Model that failed tracking
|
|
"""
|
|
if self.alerting is None or self.alert_types is None:
|
|
# do nothing if alerting is not switched on
|
|
return
|
|
if "failed_tracking_spend" not in self.alert_types:
|
|
return
|
|
|
|
_cache: DualCache = self.internal_usage_cache
|
|
message = "Failed Tracking Cost for " + error_message
|
|
_cache_key = "budget_alerts:failed_tracking:{}".format(failing_model)
|
|
result = await _cache.async_get_cache(key=_cache_key)
|
|
if result is None:
|
|
await self.send_alert(
|
|
message=message,
|
|
level="High",
|
|
alert_type=AlertType.failed_tracking_spend,
|
|
alerting_metadata={},
|
|
)
|
|
await _cache.async_set_cache(
|
|
key=_cache_key,
|
|
value="SENT",
|
|
ttl=self.alerting_args.budget_alert_ttl,
|
|
)
|
|
|
|
async def budget_alerts( # noqa: PLR0915
|
|
self,
|
|
type: Literal[
|
|
"token_budget",
|
|
"soft_budget",
|
|
"user_budget",
|
|
"team_budget",
|
|
"proxy_budget",
|
|
"projected_limit_exceeded",
|
|
],
|
|
user_info: CallInfo,
|
|
):
|
|
## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727
|
|
# - Alert once within 24hr period
|
|
# - Cache this information
|
|
# - Don't re-alert, if alert already sent
|
|
_cache: DualCache = self.internal_usage_cache
|
|
|
|
if self.alerting is None or self.alert_types is None:
|
|
# do nothing if alerting is not switched on
|
|
return
|
|
if "budget_alerts" not in self.alert_types:
|
|
return
|
|
_id: Optional[str] = "default_id" # used for caching
|
|
user_info_json = user_info.model_dump(exclude_none=True)
|
|
user_info_str = self._get_user_info_str(user_info)
|
|
event: Optional[
|
|
Literal[
|
|
"budget_crossed",
|
|
"threshold_crossed",
|
|
"projected_limit_exceeded",
|
|
"soft_budget_crossed",
|
|
]
|
|
] = None
|
|
event_group: Optional[
|
|
Literal["internal_user", "team", "key", "proxy", "customer"]
|
|
] = None
|
|
event_message: str = ""
|
|
webhook_event: Optional[WebhookEvent] = None
|
|
if type == "proxy_budget":
|
|
event_group = "proxy"
|
|
event_message += "Proxy Budget: "
|
|
elif type == "soft_budget":
|
|
event_group = "proxy"
|
|
event_message += "Soft Budget Crossed: "
|
|
elif type == "user_budget":
|
|
event_group = "internal_user"
|
|
event_message += "User Budget: "
|
|
_id = user_info.user_id or _id
|
|
elif type == "team_budget":
|
|
event_group = "team"
|
|
event_message += "Team Budget: "
|
|
_id = user_info.team_id or _id
|
|
elif type == "token_budget":
|
|
event_group = "key"
|
|
event_message += "Key Budget: "
|
|
_id = user_info.token
|
|
elif type == "projected_limit_exceeded":
|
|
event_group = "key"
|
|
event_message += "Key Budget: Projected Limit Exceeded"
|
|
event = "projected_limit_exceeded"
|
|
_id = user_info.token
|
|
|
|
# percent of max_budget left to spend
|
|
if user_info.max_budget is None and user_info.soft_budget is None:
|
|
return
|
|
percent_left: float = 0
|
|
if user_info.max_budget is not None:
|
|
if user_info.max_budget > 0:
|
|
percent_left = (
|
|
user_info.max_budget - user_info.spend
|
|
) / user_info.max_budget
|
|
|
|
# check if crossed budget
|
|
if user_info.max_budget is not None:
|
|
if user_info.spend >= user_info.max_budget:
|
|
event = "budget_crossed"
|
|
event_message += (
|
|
f"Budget Crossed\n Total Budget:`{user_info.max_budget}`"
|
|
)
|
|
elif percent_left <= SLACK_ALERTING_THRESHOLD_5_PERCENT:
|
|
event = "threshold_crossed"
|
|
event_message += "5% Threshold Crossed "
|
|
elif percent_left <= SLACK_ALERTING_THRESHOLD_15_PERCENT:
|
|
event = "threshold_crossed"
|
|
event_message += "15% Threshold Crossed"
|
|
elif user_info.soft_budget is not None:
|
|
if user_info.spend >= user_info.soft_budget:
|
|
event = "soft_budget_crossed"
|
|
if event is not None and event_group is not None:
|
|
_cache_key = "budget_alerts:{}:{}".format(event, _id)
|
|
result = await _cache.async_get_cache(key=_cache_key)
|
|
if result is None:
|
|
webhook_event = WebhookEvent(
|
|
event=event,
|
|
event_group=event_group,
|
|
event_message=event_message,
|
|
**user_info_json,
|
|
)
|
|
await self.send_alert(
|
|
message=event_message + "\n\n" + user_info_str,
|
|
level="High",
|
|
alert_type=AlertType.budget_alerts,
|
|
user_info=webhook_event,
|
|
alerting_metadata={},
|
|
)
|
|
await _cache.async_set_cache(
|
|
key=_cache_key,
|
|
value="SENT",
|
|
ttl=self.alerting_args.budget_alert_ttl,
|
|
)
|
|
|
|
return
|
|
return
|
|
|
|
def _get_user_info_str(self, user_info: CallInfo) -> str:
|
|
"""
|
|
Create a standard message for a budget alert
|
|
"""
|
|
_all_fields_as_dict = user_info.model_dump(exclude_none=True)
|
|
_all_fields_as_dict.pop("token")
|
|
msg = ""
|
|
for k, v in _all_fields_as_dict.items():
|
|
msg += f"*{k}:* `{v}`\n"
|
|
|
|
return msg
|
|
|
|
async def customer_spend_alert(
|
|
self,
|
|
token: Optional[str],
|
|
key_alias: Optional[str],
|
|
end_user_id: Optional[str],
|
|
response_cost: Optional[float],
|
|
max_budget: Optional[float],
|
|
):
|
|
if (
|
|
self.alerting is not None
|
|
and "webhook" in self.alerting
|
|
and end_user_id is not None
|
|
and token is not None
|
|
and response_cost is not None
|
|
):
|
|
# log customer spend
|
|
event = WebhookEvent(
|
|
spend=response_cost,
|
|
max_budget=max_budget,
|
|
token=token,
|
|
customer_id=end_user_id,
|
|
user_id=None,
|
|
team_id=None,
|
|
user_email=None,
|
|
key_alias=key_alias,
|
|
projected_exceeded_date=None,
|
|
projected_spend=None,
|
|
event="spend_tracked",
|
|
event_group="customer",
|
|
event_message="Customer spend tracked. Customer={}, spend={}".format(
|
|
end_user_id, response_cost
|
|
),
|
|
)
|
|
|
|
await self.send_webhook_alert(webhook_event=event)
|
|
|
|
def _count_outage_alerts(self, alerts: List[int]) -> str:
|
|
"""
|
|
Parameters:
|
|
- alerts: List[int] -> list of error codes (either 408 or 500+)
|
|
|
|
Returns:
|
|
- str -> formatted string. This is an alert message, giving a human-friendly description of the errors.
|
|
"""
|
|
error_breakdown = {"Timeout Errors": 0, "API Errors": 0, "Unknown Errors": 0}
|
|
for alert in alerts:
|
|
if alert == 408:
|
|
error_breakdown["Timeout Errors"] += 1
|
|
elif alert >= 500:
|
|
error_breakdown["API Errors"] += 1
|
|
else:
|
|
error_breakdown["Unknown Errors"] += 1
|
|
|
|
error_msg = ""
|
|
for key, value in error_breakdown.items():
|
|
if value > 0:
|
|
error_msg += "\n{}: {}\n".format(key, value)
|
|
|
|
return error_msg
|
|
|
|
def _outage_alert_msg_factory(
|
|
self,
|
|
alert_type: Literal["Major", "Minor"],
|
|
key: Literal["Model", "Region"],
|
|
key_val: str,
|
|
provider: str,
|
|
api_base: Optional[str],
|
|
outage_value: BaseOutageModel,
|
|
) -> str:
|
|
"""Format an alert message for slack"""
|
|
headers = {f"{key} Name": key_val, "Provider": provider}
|
|
if api_base is not None:
|
|
headers["API Base"] = api_base # type: ignore
|
|
|
|
headers_str = "\n"
|
|
for k, v in headers.items():
|
|
headers_str += f"*{k}:* `{v}`\n"
|
|
return f"""\n\n
|
|
*⚠️ {alert_type} Service Outage*
|
|
|
|
{headers_str}
|
|
|
|
*Errors:*
|
|
{self._count_outage_alerts(alerts=outage_value["alerts"])}
|
|
|
|
*Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n
|
|
"""
|
|
|
|
async def region_outage_alerts(
|
|
self,
|
|
exception: APIError,
|
|
deployment_id: str,
|
|
) -> None:
|
|
"""
|
|
Send slack alert if specific provider region is having an outage.
|
|
|
|
Track for 408 (Timeout) and >=500 Error codes
|
|
"""
|
|
## CREATE (PROVIDER+REGION) ID ##
|
|
if self.llm_router is None:
|
|
return
|
|
|
|
deployment = self.llm_router.get_deployment(model_id=deployment_id)
|
|
|
|
if deployment is None:
|
|
return
|
|
|
|
model = deployment.litellm_params.model
|
|
### GET PROVIDER ###
|
|
provider = deployment.litellm_params.custom_llm_provider
|
|
if provider is None:
|
|
model, provider, _, _ = litellm.get_llm_provider(model=model)
|
|
|
|
### GET REGION ###
|
|
region_name = deployment.litellm_params.region_name
|
|
if region_name is None:
|
|
region_name = litellm.utils._get_model_region(
|
|
custom_llm_provider=provider, litellm_params=deployment.litellm_params
|
|
)
|
|
|
|
if region_name is None:
|
|
return
|
|
|
|
### UNIQUE CACHE KEY ###
|
|
cache_key = provider + region_name
|
|
|
|
outage_value: Optional[
|
|
ProviderRegionOutageModel
|
|
] = await self.internal_usage_cache.async_get_cache(key=cache_key)
|
|
|
|
if (
|
|
getattr(exception, "status_code", None) is None
|
|
or (
|
|
exception.status_code != 408 # type: ignore
|
|
and exception.status_code < 500 # type: ignore
|
|
)
|
|
or self.llm_router is None
|
|
):
|
|
return
|
|
|
|
if outage_value is None:
|
|
_deployment_set = set()
|
|
_deployment_set.add(deployment_id)
|
|
outage_value = ProviderRegionOutageModel(
|
|
provider_region_id=cache_key,
|
|
alerts=[exception.status_code], # type: ignore
|
|
minor_alert_sent=False,
|
|
major_alert_sent=False,
|
|
last_updated_at=time.time(),
|
|
deployment_ids=_deployment_set,
|
|
)
|
|
|
|
## add to cache ##
|
|
await self.internal_usage_cache.async_set_cache(
|
|
key=cache_key,
|
|
value=outage_value,
|
|
ttl=self.alerting_args.region_outage_alert_ttl,
|
|
)
|
|
return
|
|
|
|
if len(outage_value["alerts"]) < self.alerting_args.max_outage_alert_list_size:
|
|
outage_value["alerts"].append(exception.status_code) # type: ignore
|
|
else: # prevent memory leaks
|
|
pass
|
|
_deployment_set = outage_value["deployment_ids"]
|
|
_deployment_set.add(deployment_id)
|
|
outage_value["deployment_ids"] = _deployment_set
|
|
outage_value["last_updated_at"] = time.time()
|
|
|
|
## MINOR OUTAGE ALERT SENT ##
|
|
if (
|
|
outage_value["minor_alert_sent"] is False
|
|
and len(outage_value["alerts"])
|
|
>= self.alerting_args.minor_outage_alert_threshold
|
|
and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment
|
|
):
|
|
msg = self._outage_alert_msg_factory(
|
|
alert_type="Minor",
|
|
key="Region",
|
|
key_val=region_name,
|
|
api_base=None,
|
|
outage_value=outage_value,
|
|
provider=provider,
|
|
)
|
|
# send minor alert
|
|
await self.send_alert(
|
|
message=msg,
|
|
level="Medium",
|
|
alert_type=AlertType.outage_alerts,
|
|
alerting_metadata={},
|
|
)
|
|
# set to true
|
|
outage_value["minor_alert_sent"] = True
|
|
|
|
## MAJOR OUTAGE ALERT SENT ##
|
|
elif (
|
|
outage_value["major_alert_sent"] is False
|
|
and len(outage_value["alerts"])
|
|
>= self.alerting_args.major_outage_alert_threshold
|
|
and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment
|
|
):
|
|
msg = self._outage_alert_msg_factory(
|
|
alert_type="Major",
|
|
key="Region",
|
|
key_val=region_name,
|
|
api_base=None,
|
|
outage_value=outage_value,
|
|
provider=provider,
|
|
)
|
|
|
|
# send minor alert
|
|
await self.send_alert(
|
|
message=msg,
|
|
level="High",
|
|
alert_type=AlertType.outage_alerts,
|
|
alerting_metadata={},
|
|
)
|
|
# set to true
|
|
outage_value["major_alert_sent"] = True
|
|
|
|
## update cache ##
|
|
await self.internal_usage_cache.async_set_cache(
|
|
key=cache_key, value=outage_value
|
|
)
|
|
|
|
async def outage_alerts(
|
|
self,
|
|
exception: APIError,
|
|
deployment_id: str,
|
|
) -> None:
|
|
"""
|
|
Send slack alert if model is badly configured / having an outage (408, 401, 429, >=500).
|
|
|
|
key = model_id
|
|
|
|
value = {
|
|
- model_id
|
|
- threshold
|
|
- alerts []
|
|
}
|
|
|
|
ttl = 1hr
|
|
max_alerts_size = 10
|
|
"""
|
|
try:
|
|
outage_value: Optional[OutageModel] = await self.internal_usage_cache.async_get_cache(key=deployment_id) # type: ignore
|
|
if (
|
|
getattr(exception, "status_code", None) is None
|
|
or (
|
|
exception.status_code != 408 # type: ignore
|
|
and exception.status_code < 500 # type: ignore
|
|
)
|
|
or self.llm_router is None
|
|
):
|
|
return
|
|
|
|
### EXTRACT MODEL DETAILS ###
|
|
deployment = self.llm_router.get_deployment(model_id=deployment_id)
|
|
if deployment is None:
|
|
return
|
|
|
|
model = deployment.litellm_params.model
|
|
provider = deployment.litellm_params.custom_llm_provider
|
|
if provider is None:
|
|
try:
|
|
model, provider, _, _ = litellm.get_llm_provider(model=model)
|
|
except Exception:
|
|
provider = ""
|
|
api_base = litellm.get_api_base(
|
|
model=model, optional_params=deployment.litellm_params
|
|
)
|
|
|
|
if outage_value is None:
|
|
outage_value = OutageModel(
|
|
model_id=deployment_id,
|
|
alerts=[exception.status_code], # type: ignore
|
|
minor_alert_sent=False,
|
|
major_alert_sent=False,
|
|
last_updated_at=time.time(),
|
|
)
|
|
|
|
## add to cache ##
|
|
await self.internal_usage_cache.async_set_cache(
|
|
key=deployment_id,
|
|
value=outage_value,
|
|
ttl=self.alerting_args.outage_alert_ttl,
|
|
)
|
|
return
|
|
|
|
if (
|
|
len(outage_value["alerts"])
|
|
< self.alerting_args.max_outage_alert_list_size
|
|
):
|
|
outage_value["alerts"].append(exception.status_code) # type: ignore
|
|
else: # prevent memory leaks
|
|
pass
|
|
|
|
outage_value["last_updated_at"] = time.time()
|
|
|
|
## MINOR OUTAGE ALERT SENT ##
|
|
if (
|
|
outage_value["minor_alert_sent"] is False
|
|
and len(outage_value["alerts"])
|
|
>= self.alerting_args.minor_outage_alert_threshold
|
|
):
|
|
msg = self._outage_alert_msg_factory(
|
|
alert_type="Minor",
|
|
key="Model",
|
|
key_val=model,
|
|
api_base=api_base,
|
|
outage_value=outage_value,
|
|
provider=provider,
|
|
)
|
|
# send minor alert
|
|
await self.send_alert(
|
|
message=msg,
|
|
level="Medium",
|
|
alert_type=AlertType.outage_alerts,
|
|
alerting_metadata={},
|
|
)
|
|
# set to true
|
|
outage_value["minor_alert_sent"] = True
|
|
elif (
|
|
outage_value["major_alert_sent"] is False
|
|
and len(outage_value["alerts"])
|
|
>= self.alerting_args.major_outage_alert_threshold
|
|
):
|
|
msg = self._outage_alert_msg_factory(
|
|
alert_type="Major",
|
|
key="Model",
|
|
key_val=model,
|
|
api_base=api_base,
|
|
outage_value=outage_value,
|
|
provider=provider,
|
|
)
|
|
# send minor alert
|
|
await self.send_alert(
|
|
message=msg,
|
|
level="High",
|
|
alert_type=AlertType.outage_alerts,
|
|
alerting_metadata={},
|
|
)
|
|
# set to true
|
|
outage_value["major_alert_sent"] = True
|
|
|
|
## update cache ##
|
|
await self.internal_usage_cache.async_set_cache(
|
|
key=deployment_id, value=outage_value
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
async def model_added_alert(
|
|
self, model_name: str, litellm_model_name: str, passed_model_info: Any
|
|
):
|
|
base_model_from_user = getattr(passed_model_info, "base_model", None)
|
|
model_info = {}
|
|
base_model = ""
|
|
if base_model_from_user is not None:
|
|
model_info = litellm.model_cost.get(base_model_from_user, {})
|
|
base_model = f"Base Model: `{base_model_from_user}`\n"
|
|
else:
|
|
model_info = litellm.model_cost.get(litellm_model_name, {})
|
|
model_info_str = ""
|
|
for k, v in model_info.items():
|
|
if k == "input_cost_per_token" or k == "output_cost_per_token":
|
|
# when converting to string it should not be 1.63e-06
|
|
v = "{:.8f}".format(v)
|
|
|
|
model_info_str += f"{k}: {v}\n"
|
|
|
|
message = f"""
|
|
*🚅 New Model Added*
|
|
Model Name: `{model_name}`
|
|
{base_model}
|
|
|
|
Usage OpenAI Python SDK:
|
|
```
|
|
import openai
|
|
client = openai.OpenAI(
|
|
api_key="your_api_key",
|
|
base_url={os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")}
|
|
)
|
|
|
|
response = client.chat.completions.create(
|
|
model="{model_name}", # model to send to the proxy
|
|
messages = [
|
|
{{
|
|
"role": "user",
|
|
"content": "this is a test request, write a short poem"
|
|
}}
|
|
]
|
|
)
|
|
```
|
|
|
|
Model Info:
|
|
```
|
|
{model_info_str}
|
|
```
|
|
"""
|
|
|
|
alert_val = self.send_alert(
|
|
message=message,
|
|
level="Low",
|
|
alert_type=AlertType.new_model_added,
|
|
alerting_metadata={},
|
|
)
|
|
|
|
if alert_val is not None and asyncio.iscoroutine(alert_val):
|
|
await alert_val
|
|
|
|
async def model_removed_alert(self, model_name: str):
|
|
pass
|
|
|
|
async def send_webhook_alert(self, webhook_event: WebhookEvent) -> bool:
|
|
"""
|
|
Sends structured alert to webhook, if set.
|
|
|
|
Currently only implemented for budget alerts
|
|
|
|
Returns -> True if sent, False if not.
|
|
|
|
Raises Exception
|
|
- if WEBHOOK_URL is not set
|
|
"""
|
|
|
|
webhook_url = os.getenv("WEBHOOK_URL", None)
|
|
if webhook_url is None:
|
|
raise Exception("Missing webhook_url from environment")
|
|
|
|
payload = webhook_event.model_dump_json()
|
|
headers = {"Content-type": "application/json"}
|
|
|
|
response = await self.async_http_handler.post(
|
|
url=webhook_url,
|
|
headers=headers,
|
|
data=payload,
|
|
)
|
|
if response.status_code == 200:
|
|
return True
|
|
else:
|
|
print("Error sending webhook alert. Error=", response.text) # noqa
|
|
|
|
return False
|
|
|
|
async def _check_if_using_premium_email_feature(
|
|
self,
|
|
premium_user: bool,
|
|
email_logo_url: Optional[str] = None,
|
|
email_support_contact: Optional[str] = None,
|
|
):
|
|
from litellm.proxy.proxy_server import CommonProxyErrors, premium_user
|
|
|
|
if premium_user is not True:
|
|
if email_logo_url is not None or email_support_contact is not None:
|
|
raise ValueError(
|
|
f"Trying to Customize Email Alerting\n {CommonProxyErrors.not_premium_user.value}"
|
|
)
|
|
return
|
|
|
|
async def send_key_created_or_user_invited_email(
|
|
self, webhook_event: WebhookEvent
|
|
) -> bool:
|
|
try:
|
|
from litellm.proxy.utils import send_email
|
|
|
|
if self.alerting is None or "email" not in self.alerting:
|
|
# do nothing if user does not want email alerts
|
|
verbose_proxy_logger.error(
|
|
"Error sending email alert - 'email' not in self.alerting %s",
|
|
self.alerting,
|
|
)
|
|
return False
|
|
from litellm.proxy.proxy_server import premium_user, prisma_client
|
|
|
|
email_logo_url = os.getenv(
|
|
"SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None)
|
|
)
|
|
email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None)
|
|
await self._check_if_using_premium_email_feature(
|
|
premium_user, email_logo_url, email_support_contact
|
|
)
|
|
if email_logo_url is None:
|
|
email_logo_url = LITELLM_LOGO_URL
|
|
if email_support_contact is None:
|
|
email_support_contact = LITELLM_SUPPORT_CONTACT
|
|
|
|
event_name = webhook_event.event_message
|
|
recipient_email = webhook_event.user_email
|
|
recipient_user_id = webhook_event.user_id
|
|
if (
|
|
recipient_email is None
|
|
and recipient_user_id is not None
|
|
and prisma_client is not None
|
|
):
|
|
user_row = await prisma_client.db.litellm_usertable.find_unique(
|
|
where={"user_id": recipient_user_id}
|
|
)
|
|
|
|
if user_row is not None:
|
|
recipient_email = user_row.user_email
|
|
|
|
key_token = webhook_event.token
|
|
key_budget = webhook_event.max_budget
|
|
base_url = os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")
|
|
|
|
email_html_content = "Alert from LiteLLM Server"
|
|
if recipient_email is None:
|
|
verbose_proxy_logger.error(
|
|
"Trying to send email alert to no recipient",
|
|
extra=webhook_event.dict(),
|
|
)
|
|
|
|
if webhook_event.event == "key_created":
|
|
email_html_content = KEY_CREATED_EMAIL_TEMPLATE.format(
|
|
email_logo_url=email_logo_url,
|
|
recipient_email=recipient_email,
|
|
key_budget=key_budget,
|
|
key_token=key_token,
|
|
base_url=base_url,
|
|
email_support_contact=email_support_contact,
|
|
)
|
|
elif webhook_event.event == "internal_user_created":
|
|
# GET TEAM NAME
|
|
team_id = webhook_event.team_id
|
|
team_name = "Default Team"
|
|
if team_id is not None and prisma_client is not None:
|
|
team_row = await prisma_client.db.litellm_teamtable.find_unique(
|
|
where={"team_id": team_id}
|
|
)
|
|
if team_row is not None:
|
|
team_name = team_row.team_alias or "-"
|
|
email_html_content = USER_INVITED_EMAIL_TEMPLATE.format(
|
|
email_logo_url=email_logo_url,
|
|
recipient_email=recipient_email,
|
|
team_name=team_name,
|
|
base_url=base_url,
|
|
email_support_contact=email_support_contact,
|
|
)
|
|
else:
|
|
verbose_proxy_logger.error(
|
|
"Trying to send email alert on unknown webhook event",
|
|
extra=webhook_event.model_dump(),
|
|
)
|
|
|
|
webhook_event.model_dump_json()
|
|
email_event = {
|
|
"to": recipient_email,
|
|
"subject": f"LiteLLM: {event_name}",
|
|
"html": email_html_content,
|
|
}
|
|
|
|
await send_email(
|
|
receiver_email=email_event["to"],
|
|
subject=email_event["subject"],
|
|
html=email_event["html"],
|
|
)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
verbose_proxy_logger.error("Error sending email alert %s", str(e))
|
|
return False
|
|
|
|
async def send_email_alert_using_smtp(
|
|
self, webhook_event: WebhookEvent, alert_type: str
|
|
) -> bool:
|
|
"""
|
|
Sends structured Email alert to an SMTP server
|
|
|
|
Currently only implemented for budget alerts
|
|
|
|
Returns -> True if sent, False if not.
|
|
"""
|
|
from litellm.proxy.proxy_server import premium_user
|
|
from litellm.proxy.utils import send_email
|
|
|
|
email_logo_url = os.getenv(
|
|
"SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None)
|
|
)
|
|
email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None)
|
|
await self._check_if_using_premium_email_feature(
|
|
premium_user, email_logo_url, email_support_contact
|
|
)
|
|
|
|
if email_logo_url is None:
|
|
email_logo_url = LITELLM_LOGO_URL
|
|
if email_support_contact is None:
|
|
email_support_contact = LITELLM_SUPPORT_CONTACT
|
|
|
|
event_name = webhook_event.event_message
|
|
recipient_email = webhook_event.user_email
|
|
user_name = webhook_event.user_id
|
|
max_budget = webhook_event.max_budget
|
|
email_html_content = "Alert from LiteLLM Server"
|
|
if recipient_email is None:
|
|
verbose_proxy_logger.error(
|
|
"Trying to send email alert to no recipient", extra=webhook_event.dict()
|
|
)
|
|
|
|
if webhook_event.event == "budget_crossed":
|
|
email_html_content = f"""
|
|
<img src="{email_logo_url}" alt="LiteLLM Logo" width="150" height="50" />
|
|
|
|
<p> Hi {user_name}, <br/>
|
|
|
|
Your LLM API usage this month has reached your account's <b> monthly budget of ${max_budget} </b> <br /> <br />
|
|
|
|
API requests will be rejected until either (a) you increase your monthly budget or (b) your monthly usage resets at the beginning of the next calendar month. <br /> <br />
|
|
|
|
If you have any questions, please send an email to {email_support_contact} <br /> <br />
|
|
|
|
Best, <br />
|
|
The LiteLLM team <br />
|
|
"""
|
|
|
|
webhook_event.model_dump_json()
|
|
email_event = {
|
|
"to": recipient_email,
|
|
"subject": f"LiteLLM: {event_name}",
|
|
"html": email_html_content,
|
|
}
|
|
|
|
await send_email(
|
|
receiver_email=email_event["to"],
|
|
subject=email_event["subject"],
|
|
html=email_event["html"],
|
|
)
|
|
if webhook_event.event_group == "team":
|
|
from litellm.integrations.email_alerting import send_team_budget_alert
|
|
|
|
await send_team_budget_alert(webhook_event=webhook_event)
|
|
|
|
return False
|
|
|
|
async def send_alert(
|
|
self,
|
|
message: str,
|
|
level: Literal["Low", "Medium", "High"],
|
|
alert_type: AlertType,
|
|
alerting_metadata: dict,
|
|
user_info: Optional[WebhookEvent] = None,
|
|
**kwargs,
|
|
):
|
|
"""
|
|
Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298
|
|
|
|
- Responses taking too long
|
|
- Requests are hanging
|
|
- Calls are failing
|
|
- DB Read/Writes are failing
|
|
- Proxy Close to max budget
|
|
- Key Close to max budget
|
|
|
|
Parameters:
|
|
level: str - Low|Medium|High - if calls might fail (Medium) or are failing (High); Currently, no alerts would be 'Low'.
|
|
message: str - what is the alert about
|
|
"""
|
|
if self.alerting is None:
|
|
return
|
|
|
|
if (
|
|
"webhook" in self.alerting
|
|
and alert_type == "budget_alerts"
|
|
and user_info is not None
|
|
):
|
|
await self.send_webhook_alert(webhook_event=user_info)
|
|
|
|
if (
|
|
"email" in self.alerting
|
|
and alert_type == "budget_alerts"
|
|
and user_info is not None
|
|
):
|
|
# only send budget alerts over Email
|
|
await self.send_email_alert_using_smtp(
|
|
webhook_event=user_info, alert_type=alert_type
|
|
)
|
|
|
|
if "slack" not in self.alerting:
|
|
return
|
|
if alert_type not in self.alert_types:
|
|
return
|
|
|
|
from datetime import datetime
|
|
|
|
# Get the current timestamp
|
|
current_time = datetime.now().strftime("%H:%M:%S")
|
|
_proxy_base_url = os.getenv("PROXY_BASE_URL", None)
|
|
if alert_type == "daily_reports" or alert_type == "new_model_added":
|
|
formatted_message = message
|
|
else:
|
|
formatted_message = (
|
|
f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}"
|
|
)
|
|
|
|
if kwargs:
|
|
for key, value in kwargs.items():
|
|
formatted_message += f"\n\n{key}: `{value}`\n\n"
|
|
if alerting_metadata:
|
|
for key, value in alerting_metadata.items():
|
|
formatted_message += f"\n\n*Alerting Metadata*: \n{key}: `{value}`\n\n"
|
|
if _proxy_base_url is not None:
|
|
formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`"
|
|
|
|
# check if we find the slack webhook url in self.alert_to_webhook_url
|
|
if (
|
|
self.alert_to_webhook_url is not None
|
|
and alert_type in self.alert_to_webhook_url
|
|
):
|
|
slack_webhook_url: Optional[
|
|
Union[str, List[str]]
|
|
] = self.alert_to_webhook_url[alert_type]
|
|
elif self.default_webhook_url is not None:
|
|
slack_webhook_url = self.default_webhook_url
|
|
else:
|
|
slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL", None)
|
|
|
|
if slack_webhook_url is None:
|
|
raise ValueError("Missing SLACK_WEBHOOK_URL from environment")
|
|
payload = {"text": formatted_message}
|
|
headers = {"Content-type": "application/json"}
|
|
|
|
if isinstance(slack_webhook_url, list):
|
|
for url in slack_webhook_url:
|
|
self.log_queue.append(
|
|
{
|
|
"url": url,
|
|
"headers": headers,
|
|
"payload": payload,
|
|
"alert_type": alert_type,
|
|
}
|
|
)
|
|
else:
|
|
self.log_queue.append(
|
|
{
|
|
"url": slack_webhook_url,
|
|
"headers": headers,
|
|
"payload": payload,
|
|
"alert_type": alert_type,
|
|
}
|
|
)
|
|
|
|
if len(self.log_queue) >= self.batch_size:
|
|
await self.flush_queue()
|
|
|
|
async def async_send_batch(self):
|
|
if not self.log_queue:
|
|
return
|
|
|
|
squashed_queue = squash_payloads(self.log_queue)
|
|
tasks = [
|
|
send_to_webhook(
|
|
slackAlertingInstance=self, item=item["item"], count=item["count"]
|
|
)
|
|
for item in squashed_queue.values()
|
|
]
|
|
await asyncio.gather(*tasks)
|
|
self.log_queue.clear()
|
|
|
|
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
|
|
"""Log deployment latency"""
|
|
try:
|
|
if "daily_reports" in self.alert_types:
|
|
litellm_params = kwargs.get("litellm_params", {}) or {}
|
|
model_info = litellm_params.get("model_info", {}) or {}
|
|
model_id = model_info.get("id", "") or ""
|
|
response_s: timedelta = end_time - start_time
|
|
|
|
final_value = response_s
|
|
|
|
if isinstance(response_obj, litellm.ModelResponse) and (
|
|
hasattr(response_obj, "usage")
|
|
and response_obj.usage is not None # type: ignore
|
|
and hasattr(response_obj.usage, "completion_tokens") # type: ignore
|
|
):
|
|
completion_tokens = response_obj.usage.completion_tokens # type: ignore
|
|
if completion_tokens is not None and completion_tokens > 0:
|
|
final_value = float(
|
|
response_s.total_seconds() / completion_tokens
|
|
)
|
|
if isinstance(final_value, timedelta):
|
|
final_value = final_value.total_seconds()
|
|
|
|
await self.async_update_daily_reports(
|
|
DeploymentMetrics(
|
|
id=model_id,
|
|
failed_request=False,
|
|
latency_per_output_token=final_value,
|
|
updated_at=litellm.utils.get_utc_datetime(),
|
|
)
|
|
)
|
|
except Exception as e:
|
|
verbose_proxy_logger.error(
|
|
f"[Non-Blocking Error] Slack Alerting: Got error in logging LLM deployment latency: {str(e)}"
|
|
)
|
|
pass
|
|
|
|
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
|
|
"""Log failure + deployment latency"""
|
|
_litellm_params = kwargs.get("litellm_params", {})
|
|
_model_info = _litellm_params.get("model_info", {}) or {}
|
|
model_id = _model_info.get("id", "")
|
|
try:
|
|
if "daily_reports" in self.alert_types:
|
|
try:
|
|
await self.async_update_daily_reports(
|
|
DeploymentMetrics(
|
|
id=model_id,
|
|
failed_request=True,
|
|
latency_per_output_token=None,
|
|
updated_at=litellm.utils.get_utc_datetime(),
|
|
)
|
|
)
|
|
except Exception as e:
|
|
verbose_logger.debug(f"Exception raises -{str(e)}")
|
|
|
|
if isinstance(kwargs.get("exception", ""), APIError):
|
|
if "outage_alerts" in self.alert_types:
|
|
await self.outage_alerts(
|
|
exception=kwargs["exception"],
|
|
deployment_id=model_id,
|
|
)
|
|
|
|
if "region_outage_alerts" in self.alert_types:
|
|
await self.region_outage_alerts(
|
|
exception=kwargs["exception"], deployment_id=model_id
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
async def _run_scheduler_helper(self, llm_router) -> bool:
|
|
"""
|
|
Returns:
|
|
- True -> report sent
|
|
- False -> report not sent
|
|
"""
|
|
report_sent_bool = False
|
|
|
|
report_sent = await self.internal_usage_cache.async_get_cache(
|
|
key=SlackAlertingCacheKeys.report_sent_key.value,
|
|
parent_otel_span=None,
|
|
) # None | float
|
|
|
|
current_time = time.time()
|
|
|
|
if report_sent is None:
|
|
await self.internal_usage_cache.async_set_cache(
|
|
key=SlackAlertingCacheKeys.report_sent_key.value,
|
|
value=current_time,
|
|
)
|
|
elif isinstance(report_sent, float):
|
|
# Check if current time - interval >= time last sent
|
|
interval_seconds = self.alerting_args.daily_report_frequency
|
|
|
|
if current_time - report_sent >= interval_seconds:
|
|
# Sneak in the reporting logic here
|
|
await self.send_daily_reports(router=llm_router)
|
|
# Also, don't forget to update the report_sent time after sending the report!
|
|
await self.internal_usage_cache.async_set_cache(
|
|
key=SlackAlertingCacheKeys.report_sent_key.value,
|
|
value=current_time,
|
|
)
|
|
report_sent_bool = True
|
|
|
|
return report_sent_bool
|
|
|
|
async def _run_scheduled_daily_report(self, llm_router: Optional[Any] = None):
|
|
"""
|
|
If 'daily_reports' enabled
|
|
|
|
Ping redis cache every 5 minutes to check if we should send the report
|
|
|
|
If yes -> call send_daily_report()
|
|
"""
|
|
if llm_router is None or self.alert_types is None:
|
|
return
|
|
|
|
if "daily_reports" in self.alert_types:
|
|
while True:
|
|
await self._run_scheduler_helper(llm_router=llm_router)
|
|
interval = random.randint(
|
|
self.alerting_args.report_check_interval - 3,
|
|
self.alerting_args.report_check_interval + 3,
|
|
) # shuffle to prevent collisions
|
|
await asyncio.sleep(interval)
|
|
return
|
|
|
|
async def send_weekly_spend_report(
|
|
self,
|
|
time_range: str = "7d",
|
|
):
|
|
"""
|
|
Send a spend report for a configurable time range.
|
|
|
|
Args:
|
|
time_range: A string specifying the time range for the report, e.g., "1d", "7d", "30d"
|
|
"""
|
|
if self.alerting is None or "spend_reports" not in self.alert_types:
|
|
return
|
|
|
|
try:
|
|
from litellm.proxy.spend_tracking.spend_management_endpoints import (
|
|
_get_spend_report_for_time_range,
|
|
)
|
|
|
|
# Parse the time range
|
|
days = int(time_range[:-1])
|
|
if time_range[-1].lower() != "d":
|
|
raise ValueError("Time range must be specified in days, e.g., '7d'")
|
|
|
|
todays_date = datetime.datetime.now().date()
|
|
start_date = todays_date - datetime.timedelta(days=days)
|
|
|
|
_event_cache_key = f"weekly_spend_report_sent_{start_date.strftime('%Y-%m-%d')}_{todays_date.strftime('%Y-%m-%d')}"
|
|
if await self.internal_usage_cache.async_get_cache(key=_event_cache_key):
|
|
return
|
|
|
|
_resp = await _get_spend_report_for_time_range(
|
|
start_date=start_date.strftime("%Y-%m-%d"),
|
|
end_date=todays_date.strftime("%Y-%m-%d"),
|
|
)
|
|
if _resp is None or _resp == ([], []):
|
|
return
|
|
|
|
spend_per_team, spend_per_tag = _resp
|
|
|
|
_spend_message = f"*💸 Spend Report for `{start_date.strftime('%m-%d-%Y')} - {todays_date.strftime('%m-%d-%Y')}` ({days} days)*\n"
|
|
|
|
if spend_per_team is not None:
|
|
_spend_message += "\n*Team Spend Report:*\n"
|
|
for spend in spend_per_team:
|
|
_team_spend = round(float(spend["total_spend"]), 4)
|
|
_spend_message += (
|
|
f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n"
|
|
)
|
|
|
|
if spend_per_tag is not None:
|
|
_spend_message += "\n*Tag Spend Report:*\n"
|
|
for spend in spend_per_tag:
|
|
_tag_spend = round(float(spend["total_spend"]), 4)
|
|
_spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n"
|
|
|
|
await self.send_alert(
|
|
message=_spend_message,
|
|
level="Low",
|
|
alert_type=AlertType.spend_reports,
|
|
alerting_metadata={},
|
|
)
|
|
|
|
await self.internal_usage_cache.async_set_cache(
|
|
key=_event_cache_key,
|
|
value="SENT",
|
|
ttl=duration_in_seconds(time_range),
|
|
)
|
|
|
|
except ValueError as ve:
|
|
verbose_proxy_logger.error(f"Invalid time range format: {ve}")
|
|
except Exception as e:
|
|
verbose_proxy_logger.error(f"Error sending spend report: {e}")
|
|
|
|
async def send_monthly_spend_report(self):
|
|
""" """
|
|
try:
|
|
from calendar import monthrange
|
|
|
|
from litellm.proxy.spend_tracking.spend_management_endpoints import (
|
|
_get_spend_report_for_time_range,
|
|
)
|
|
|
|
todays_date = datetime.datetime.now().date()
|
|
first_day_of_month = todays_date.replace(day=1)
|
|
_, last_day_of_month = monthrange(todays_date.year, todays_date.month)
|
|
last_day_of_month = first_day_of_month + datetime.timedelta(
|
|
days=last_day_of_month - 1
|
|
)
|
|
|
|
_event_cache_key = f"monthly_spend_report_sent_{first_day_of_month.strftime('%Y-%m-%d')}_{last_day_of_month.strftime('%Y-%m-%d')}"
|
|
if await self.internal_usage_cache.async_get_cache(key=_event_cache_key):
|
|
return
|
|
|
|
_resp = await _get_spend_report_for_time_range(
|
|
start_date=first_day_of_month.strftime("%Y-%m-%d"),
|
|
end_date=last_day_of_month.strftime("%Y-%m-%d"),
|
|
)
|
|
|
|
if _resp is None or _resp == ([], []):
|
|
return
|
|
|
|
monthly_spend_per_team, monthly_spend_per_tag = _resp
|
|
|
|
_spend_message = f"*💸 Monthly Spend Report for `{first_day_of_month.strftime('%m-%d-%Y')} - {last_day_of_month.strftime('%m-%d-%Y')}` *\n"
|
|
|
|
if monthly_spend_per_team is not None:
|
|
_spend_message += "\n*Team Spend Report:*\n"
|
|
for spend in monthly_spend_per_team:
|
|
_team_spend = spend["total_spend"]
|
|
_team_spend = float(_team_spend)
|
|
# round to 4 decimal places
|
|
_team_spend = round(_team_spend, 4)
|
|
_spend_message += (
|
|
f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n"
|
|
)
|
|
|
|
if monthly_spend_per_tag is not None:
|
|
_spend_message += "\n*Tag Spend Report:*\n"
|
|
for spend in monthly_spend_per_tag:
|
|
_tag_spend = spend["total_spend"]
|
|
_tag_spend = float(_tag_spend)
|
|
# round to 4 decimal places
|
|
_tag_spend = round(_tag_spend, 4)
|
|
_spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n"
|
|
|
|
await self.send_alert(
|
|
message=_spend_message,
|
|
level="Low",
|
|
alert_type=AlertType.spend_reports,
|
|
alerting_metadata={},
|
|
)
|
|
|
|
await self.internal_usage_cache.async_set_cache(
|
|
key=_event_cache_key,
|
|
value="SENT",
|
|
ttl=(30 * HOURS_IN_A_DAY * 60 * 60), # 1 month
|
|
)
|
|
|
|
except Exception as e:
|
|
verbose_proxy_logger.exception("Error sending weekly spend report %s", e)
|
|
|
|
async def send_fallback_stats_from_prometheus(self):
|
|
"""
|
|
Helper to send fallback statistics from prometheus server -> to slack
|
|
|
|
This runs once per day and sends an overview of all the fallback statistics
|
|
"""
|
|
try:
|
|
from litellm.integrations.prometheus_helpers.prometheus_api import (
|
|
get_fallback_metric_from_prometheus,
|
|
)
|
|
|
|
# call prometheuslogger.
|
|
falllback_success_info_prometheus = (
|
|
await get_fallback_metric_from_prometheus()
|
|
)
|
|
|
|
fallback_message = (
|
|
f"*Fallback Statistics:*\n{falllback_success_info_prometheus}"
|
|
)
|
|
|
|
await self.send_alert(
|
|
message=fallback_message,
|
|
level="Low",
|
|
alert_type=AlertType.fallback_reports,
|
|
alerting_metadata={},
|
|
)
|
|
|
|
except Exception as e:
|
|
verbose_proxy_logger.error("Error sending weekly spend report %s", e)
|
|
|
|
pass
|
|
|
|
async def send_virtual_key_event_slack(
|
|
self,
|
|
key_event: VirtualKeyEvent,
|
|
alert_type: AlertType,
|
|
event_name: str,
|
|
):
|
|
"""
|
|
Handles sending Virtual Key related alerts
|
|
|
|
Example:
|
|
- New Virtual Key Created
|
|
- Internal User Updated
|
|
- Team Created, Updated, Deleted
|
|
"""
|
|
try:
|
|
message = f"`{event_name}`\n"
|
|
|
|
key_event_dict = key_event.model_dump()
|
|
|
|
# Add Created by information first
|
|
message += "*Action Done by:*\n"
|
|
for key, value in key_event_dict.items():
|
|
if "created_by" in key:
|
|
message += f"{key}: `{value}`\n"
|
|
|
|
# Add args sent to function in the alert
|
|
message += "\n*Arguments passed:*\n"
|
|
request_kwargs = key_event.request_kwargs
|
|
for key, value in request_kwargs.items():
|
|
if key == "user_api_key_dict":
|
|
continue
|
|
message += f"{key}: `{value}`\n"
|
|
|
|
await self.send_alert(
|
|
message=message,
|
|
level="High",
|
|
alert_type=alert_type,
|
|
alerting_metadata={},
|
|
)
|
|
|
|
except Exception as e:
|
|
verbose_proxy_logger.error(
|
|
"Error sending send_virtual_key_event_slack %s", e
|
|
)
|
|
|
|
return
|
|
|
|
async def _request_is_completed(self, request_data: Optional[dict]) -> bool:
|
|
"""
|
|
Returns True if the request is completed - either as a success or failure
|
|
"""
|
|
if request_data is None:
|
|
return False
|
|
|
|
if (
|
|
request_data.get("litellm_status", "") != "success"
|
|
and request_data.get("litellm_status", "") != "fail"
|
|
):
|
|
## CHECK IF CACHE IS UPDATED
|
|
litellm_call_id = request_data.get("litellm_call_id", "")
|
|
status: Optional[str] = await self.internal_usage_cache.async_get_cache(
|
|
key="request_status:{}".format(litellm_call_id), local_only=True
|
|
)
|
|
if status is not None and (status == "success" or status == "fail"):
|
|
return True
|
|
return False
|