mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 18:54:30 +00:00
fix - refactor slack alerting
This commit is contained in:
parent
37069f45cb
commit
31979f83d2
1 changed files with 29 additions and 296 deletions
|
@ -31,6 +31,7 @@ import smtplib, re
|
|||
from email.mime.text import MIMEText
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from datetime import datetime, timedelta
|
||||
from litellm.integrations.slack_alerting import SlackAlerting
|
||||
|
||||
|
||||
def print_verbose(print_statement):
|
||||
|
@ -80,17 +81,11 @@ class ProxyLogging:
|
|||
"budget_alerts",
|
||||
"db_exceptions",
|
||||
]
|
||||
|
||||
def _all_possible_alert_types(self):
|
||||
# used by the UI to show all supported alert types
|
||||
# Note: This is not the alerts the user has configured, instead it's all possible alert types a user can select
|
||||
return [
|
||||
"llm_exceptions",
|
||||
"llm_too_slow",
|
||||
"llm_requests_hanging",
|
||||
"budget_alerts",
|
||||
"db_exceptions",
|
||||
]
|
||||
self.slack_alerting_instance = SlackAlerting(
|
||||
alerting_threshold=self.alerting_threshold,
|
||||
alerting=self.alerting,
|
||||
alert_types=self.alert_types,
|
||||
)
|
||||
|
||||
def update_values(
|
||||
self,
|
||||
|
@ -112,13 +107,18 @@ class ProxyLogging:
|
|||
self.alerting = alerting
|
||||
if alerting_threshold is not None:
|
||||
self.alerting_threshold = alerting_threshold
|
||||
if alert_types is not None:
|
||||
self.alert_types = alert_types
|
||||
|
||||
self.slack_alerting_instance.update_values(
|
||||
alerting=self.alerting,
|
||||
alerting_threshold=self.alerting_threshold,
|
||||
alert_types=self.alert_types,
|
||||
)
|
||||
|
||||
if redis_cache is not None:
|
||||
self.internal_usage_cache.redis_cache = redis_cache
|
||||
|
||||
if alert_types is not None:
|
||||
self.alert_types = alert_types
|
||||
|
||||
def _init_litellm_callbacks(self):
|
||||
print_verbose(f"INITIALIZING LITELLM CALLBACKS!")
|
||||
self.service_logging_obj = ServiceLogging()
|
||||
|
@ -127,7 +127,9 @@ class ProxyLogging:
|
|||
litellm.callbacks.append(self.max_budget_limiter)
|
||||
litellm.callbacks.append(self.cache_control_check)
|
||||
litellm.callbacks.append(self.service_logging_obj)
|
||||
litellm.success_callback.append(self.response_taking_too_long_callback)
|
||||
litellm.success_callback.append(
|
||||
self.slack_alerting_instance.response_taking_too_long_callback
|
||||
)
|
||||
for callback in litellm.callbacks:
|
||||
if callback not in litellm.input_callback:
|
||||
litellm.input_callback.append(callback)
|
||||
|
@ -176,7 +178,9 @@ class ProxyLogging:
|
|||
"""
|
||||
print_verbose(f"Inside Proxy Logging Pre-call hook!")
|
||||
### ALERTING ###
|
||||
asyncio.create_task(self.response_taking_too_long(request_data=data))
|
||||
asyncio.create_task(
|
||||
self.slack_alerting_instance.response_taking_too_long(request_data=data)
|
||||
)
|
||||
|
||||
try:
|
||||
for callback in litellm.callbacks:
|
||||
|
@ -225,169 +229,6 @@ class ProxyLogging:
|
|||
raise e
|
||||
return data
|
||||
|
||||
def _response_taking_too_long_callback(
|
||||
self,
|
||||
kwargs, # kwargs to completion
|
||||
start_time,
|
||||
end_time, # start/end time
|
||||
):
|
||||
try:
|
||||
time_difference = end_time - start_time
|
||||
# Convert the timedelta to float (in seconds)
|
||||
time_difference_float = time_difference.total_seconds()
|
||||
litellm_params = kwargs.get("litellm_params", {})
|
||||
model = kwargs.get("model", "")
|
||||
api_base = litellm.get_api_base(model=model, optional_params=litellm_params)
|
||||
messages = kwargs.get("messages", None)
|
||||
# if messages does not exist fallback to "input"
|
||||
if messages is None:
|
||||
messages = kwargs.get("input", None)
|
||||
|
||||
# only use first 100 chars for alerting
|
||||
_messages = str(messages)[:100]
|
||||
|
||||
return time_difference_float, model, api_base, _messages
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
async def response_taking_too_long_callback(
|
||||
self,
|
||||
kwargs, # kwargs to completion
|
||||
completion_response, # response from completion
|
||||
start_time,
|
||||
end_time, # start/end time
|
||||
):
|
||||
if self.alerting is None:
|
||||
return
|
||||
if "llm_too_slow" not in self.alert_types:
|
||||
return
|
||||
time_difference_float, model, api_base, messages = (
|
||||
self._response_taking_too_long_callback(
|
||||
kwargs=kwargs,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
)
|
||||
)
|
||||
request_info = f"\nRequest Model: `{model}`\nAPI Base: `{api_base}`\nMessages: `{messages}`"
|
||||
slow_message = f"`Responses are slow - {round(time_difference_float,2)}s response time > Alerting threshold: {self.alerting_threshold}s`"
|
||||
if time_difference_float > self.alerting_threshold:
|
||||
if "langfuse" in litellm.success_callback:
|
||||
request_info = self._add_langfuse_trace_id_to_alert(
|
||||
request_info=request_info, kwargs=kwargs
|
||||
)
|
||||
await self.alerting_handler(
|
||||
message=slow_message + request_info,
|
||||
level="Low",
|
||||
)
|
||||
|
||||
def _add_langfuse_trace_id_to_alert(
|
||||
self,
|
||||
request_info: str,
|
||||
request_data: Optional[dict] = None,
|
||||
kwargs: Optional[dict] = None,
|
||||
):
|
||||
import uuid
|
||||
|
||||
if request_data is not None:
|
||||
trace_id = request_data.get("metadata", {}).get(
|
||||
"trace_id", None
|
||||
) # get langfuse trace id
|
||||
if trace_id is None:
|
||||
trace_id = "litellm-alert-trace-" + str(uuid.uuid4())
|
||||
request_data["metadata"]["trace_id"] = trace_id
|
||||
elif kwargs is not None:
|
||||
_litellm_params = kwargs.get("litellm_params", {})
|
||||
trace_id = _litellm_params.get("metadata", {}).get(
|
||||
"trace_id", None
|
||||
) # get langfuse trace id
|
||||
if trace_id is None:
|
||||
trace_id = "litellm-alert-trace-" + str(uuid.uuid4())
|
||||
_litellm_params["metadata"]["trace_id"] = trace_id
|
||||
|
||||
_langfuse_host = os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
|
||||
_langfuse_project_id = os.environ.get("LANGFUSE_PROJECT_ID")
|
||||
|
||||
# langfuse urls look like: https://us.cloud.langfuse.com/project/************/traces/litellm-alert-trace-ididi9dk-09292-************
|
||||
|
||||
_langfuse_url = (
|
||||
f"{_langfuse_host}/project/{_langfuse_project_id}/traces/{trace_id}"
|
||||
)
|
||||
request_info += f"\n🪢 Langfuse Trace: {_langfuse_url}"
|
||||
return request_info
|
||||
|
||||
async def response_taking_too_long(
|
||||
self,
|
||||
start_time: Optional[float] = None,
|
||||
end_time: Optional[float] = None,
|
||||
type: Literal["hanging_request", "slow_response"] = "hanging_request",
|
||||
request_data: Optional[dict] = None,
|
||||
):
|
||||
if request_data is not None:
|
||||
model = request_data.get("model", "")
|
||||
messages = request_data.get("messages", None)
|
||||
if messages is None:
|
||||
# if messages does not exist fallback to "input"
|
||||
messages = request_data.get("input", None)
|
||||
|
||||
# try casting messages to str and get the first 100 characters, else mark as None
|
||||
try:
|
||||
messages = str(messages)
|
||||
messages = messages[:100]
|
||||
except:
|
||||
messages = ""
|
||||
request_info = f"\nRequest Model: `{model}`\nMessages: `{messages}`"
|
||||
if "langfuse" in litellm.success_callback:
|
||||
request_info = self._add_langfuse_trace_id_to_alert(
|
||||
request_info=request_info, request_data=request_data
|
||||
)
|
||||
else:
|
||||
request_info = ""
|
||||
|
||||
if type == "hanging_request":
|
||||
# Simulate a long-running operation that could take more than 5 minutes
|
||||
if "llm_requests_hanging" not in self.alert_types:
|
||||
return
|
||||
await asyncio.sleep(
|
||||
self.alerting_threshold
|
||||
) # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests
|
||||
if (
|
||||
request_data is not None
|
||||
and request_data.get("litellm_status", "") != "success"
|
||||
and request_data.get("litellm_status", "") != "fail"
|
||||
):
|
||||
if request_data.get("deployment", None) is not None and isinstance(
|
||||
request_data["deployment"], dict
|
||||
):
|
||||
_api_base = litellm.get_api_base(
|
||||
model=model,
|
||||
optional_params=request_data["deployment"].get(
|
||||
"litellm_params", {}
|
||||
),
|
||||
)
|
||||
|
||||
if _api_base is None:
|
||||
_api_base = ""
|
||||
|
||||
request_info += f"\nAPI Base: {_api_base}"
|
||||
elif request_data.get("metadata", None) is not None and isinstance(
|
||||
request_data["metadata"], dict
|
||||
):
|
||||
# In hanging requests sometime it has not made it to the point where the deployment is passed to the `request_data``
|
||||
# in that case we fallback to the api base set in the request metadata
|
||||
_metadata = request_data["metadata"]
|
||||
_api_base = _metadata.get("api_base", "")
|
||||
if _api_base is None:
|
||||
_api_base = ""
|
||||
request_info += f"\nAPI Base: `{_api_base}`"
|
||||
# only alert hanging responses if they have not been marked as success
|
||||
alerting_message = (
|
||||
f"`Requests are hanging - {self.alerting_threshold}s+ request time`"
|
||||
)
|
||||
await self.alerting_handler(
|
||||
message=alerting_message + request_info,
|
||||
level="Medium",
|
||||
)
|
||||
|
||||
async def budget_alerts(
|
||||
self,
|
||||
type: Literal[
|
||||
|
@ -406,112 +247,14 @@ class ProxyLogging:
|
|||
if self.alerting is None:
|
||||
# do nothing if alerting is not switched on
|
||||
return
|
||||
if "budget_alerts" not in self.alert_types:
|
||||
return
|
||||
_id: str = "default_id" # used for caching
|
||||
if type == "user_and_proxy_budget":
|
||||
user_info = dict(user_info)
|
||||
user_id = user_info["user_id"]
|
||||
_id = user_id
|
||||
max_budget = user_info["max_budget"]
|
||||
spend = user_info["spend"]
|
||||
user_email = user_info["user_email"]
|
||||
user_info = f"""\nUser ID: {user_id}\nMax Budget: ${max_budget}\nSpend: ${spend}\nUser Email: {user_email}"""
|
||||
elif type == "token_budget":
|
||||
token_info = dict(user_info)
|
||||
token = token_info["token"]
|
||||
_id = token
|
||||
spend = token_info["spend"]
|
||||
max_budget = token_info["max_budget"]
|
||||
user_id = token_info["user_id"]
|
||||
user_info = f"""\nToken: {token}\nSpend: ${spend}\nMax Budget: ${max_budget}\nUser ID: {user_id}"""
|
||||
elif type == "failed_tracking":
|
||||
user_id = str(user_info)
|
||||
_id = user_id
|
||||
user_info = f"\nUser ID: {user_id}\n Error {error_message}"
|
||||
message = "Failed Tracking Cost for" + user_info
|
||||
await self.alerting_handler(
|
||||
message=message,
|
||||
level="High",
|
||||
)
|
||||
return
|
||||
elif type == "projected_limit_exceeded" and user_info is not None:
|
||||
"""
|
||||
Input variables:
|
||||
user_info = {
|
||||
"key_alias": key_alias,
|
||||
"projected_spend": projected_spend,
|
||||
"projected_exceeded_date": projected_exceeded_date,
|
||||
}
|
||||
user_max_budget=soft_limit,
|
||||
user_current_spend=new_spend
|
||||
"""
|
||||
message = f"""\n🚨 `ProjectedLimitExceededError` 💸\n\n`Key Alias:` {user_info["key_alias"]} \n`Expected Day of Error`: {user_info["projected_exceeded_date"]} \n`Current Spend`: {user_current_spend} \n`Projected Spend at end of month`: {user_info["projected_spend"]} \n`Soft Limit`: {user_max_budget}"""
|
||||
await self.alerting_handler(
|
||||
message=message,
|
||||
level="High",
|
||||
)
|
||||
return
|
||||
else:
|
||||
user_info = str(user_info)
|
||||
|
||||
# percent of max_budget left to spend
|
||||
if user_max_budget > 0:
|
||||
percent_left = (user_max_budget - user_current_spend) / user_max_budget
|
||||
else:
|
||||
percent_left = 0
|
||||
verbose_proxy_logger.debug(
|
||||
f"Budget Alerts: Percent left: {percent_left} for {user_info}"
|
||||
await self.slack_alerting_instance.budget_alerts(
|
||||
type=type,
|
||||
user_max_budget=user_max_budget,
|
||||
user_current_spend=user_current_spend,
|
||||
user_info=user_info,
|
||||
error_message=error_message,
|
||||
)
|
||||
|
||||
## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727
|
||||
# - Alert once within 28d period
|
||||
# - Cache this information
|
||||
# - Don't re-alert, if alert already sent
|
||||
_cache: DualCache = self.internal_usage_cache
|
||||
|
||||
# check if crossed budget
|
||||
if user_current_spend >= user_max_budget:
|
||||
verbose_proxy_logger.debug("Budget Crossed for %s", user_info)
|
||||
message = "Budget Crossed for" + user_info
|
||||
result = await _cache.async_get_cache(key=message)
|
||||
if result is None:
|
||||
await self.alerting_handler(
|
||||
message=message,
|
||||
level="High",
|
||||
)
|
||||
await _cache.async_set_cache(key=message, value="SENT", ttl=2419200)
|
||||
return
|
||||
|
||||
# check if 5% of max budget is left
|
||||
if percent_left <= 0.05:
|
||||
message = "5% budget left for" + user_info
|
||||
cache_key = "alerting:{}".format(_id)
|
||||
result = await _cache.async_get_cache(key=cache_key)
|
||||
if result is None:
|
||||
await self.alerting_handler(
|
||||
message=message,
|
||||
level="Medium",
|
||||
)
|
||||
|
||||
await _cache.async_set_cache(key=cache_key, value="SENT", ttl=2419200)
|
||||
|
||||
return
|
||||
|
||||
# check if 15% of max budget is left
|
||||
if percent_left <= 0.15:
|
||||
message = "15% budget left for" + user_info
|
||||
result = await _cache.async_get_cache(key=message)
|
||||
if result is None:
|
||||
await self.alerting_handler(
|
||||
message=message,
|
||||
level="Low",
|
||||
)
|
||||
await _cache.async_set_cache(key=message, value="SENT", ttl=2419200)
|
||||
return
|
||||
|
||||
return
|
||||
|
||||
async def alerting_handler(
|
||||
self, message: str, level: Literal["Low", "Medium", "High"]
|
||||
):
|
||||
|
@ -545,19 +288,9 @@ class ProxyLogging:
|
|||
|
||||
for client in self.alerting:
|
||||
if client == "slack":
|
||||
slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL", None)
|
||||
if slack_webhook_url is None:
|
||||
raise Exception("Missing SLACK_WEBHOOK_URL from environment")
|
||||
payload = {"text": formatted_message}
|
||||
headers = {"Content-type": "application/json"}
|
||||
async with aiohttp.ClientSession(
|
||||
connector=aiohttp.TCPConnector(ssl=False)
|
||||
) as session:
|
||||
async with session.post(
|
||||
slack_webhook_url, json=payload, headers=headers
|
||||
) as response:
|
||||
if response.status == 200:
|
||||
pass
|
||||
await self.slack_alerting_instance.send_alert(
|
||||
message=message, level=level
|
||||
)
|
||||
elif client == "sentry":
|
||||
if litellm.utils.sentry_sdk_instance is not None:
|
||||
litellm.utils.sentry_sdk_instance.capture_message(formatted_message)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue