fix - refactor slack alerting

This commit is contained in:
Ishaan Jaff 2024-04-23 18:34:11 -07:00
parent afdefca4ab
commit c5a0b3a8d4

View file

@ -31,6 +31,7 @@ import smtplib, re
from email.mime.text import MIMEText from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta from datetime import datetime, timedelta
from litellm.integrations.slack_alerting import SlackAlerting
def print_verbose(print_statement): def print_verbose(print_statement):
@ -80,17 +81,11 @@ class ProxyLogging:
"budget_alerts", "budget_alerts",
"db_exceptions", "db_exceptions",
] ]
self.slack_alerting_instance = SlackAlerting(
def _all_possible_alert_types(self): alerting_threshold=self.alerting_threshold,
# used by the UI to show all supported alert types alerting=self.alerting,
# Note: This is not the alerts the user has configured, instead it's all possible alert types a user can select alert_types=self.alert_types,
return [ )
"llm_exceptions",
"llm_too_slow",
"llm_requests_hanging",
"budget_alerts",
"db_exceptions",
]
def update_values( def update_values(
self, self,
@ -112,13 +107,18 @@ class ProxyLogging:
self.alerting = alerting self.alerting = alerting
if alerting_threshold is not None: if alerting_threshold is not None:
self.alerting_threshold = alerting_threshold self.alerting_threshold = alerting_threshold
if alert_types is not None:
self.alert_types = alert_types
self.slack_alerting_instance.update_values(
alerting=self.alerting,
alerting_threshold=self.alerting_threshold,
alert_types=self.alert_types,
)
if redis_cache is not None: if redis_cache is not None:
self.internal_usage_cache.redis_cache = redis_cache self.internal_usage_cache.redis_cache = redis_cache
if alert_types is not None:
self.alert_types = alert_types
def _init_litellm_callbacks(self): def _init_litellm_callbacks(self):
print_verbose(f"INITIALIZING LITELLM CALLBACKS!") print_verbose(f"INITIALIZING LITELLM CALLBACKS!")
self.service_logging_obj = ServiceLogging() self.service_logging_obj = ServiceLogging()
@ -127,7 +127,9 @@ class ProxyLogging:
litellm.callbacks.append(self.max_budget_limiter) litellm.callbacks.append(self.max_budget_limiter)
litellm.callbacks.append(self.cache_control_check) litellm.callbacks.append(self.cache_control_check)
litellm.callbacks.append(self.service_logging_obj) litellm.callbacks.append(self.service_logging_obj)
litellm.success_callback.append(self.response_taking_too_long_callback) litellm.success_callback.append(
self.slack_alerting_instance.response_taking_too_long_callback
)
for callback in litellm.callbacks: for callback in litellm.callbacks:
if callback not in litellm.input_callback: if callback not in litellm.input_callback:
litellm.input_callback.append(callback) litellm.input_callback.append(callback)
@ -176,7 +178,9 @@ class ProxyLogging:
""" """
print_verbose(f"Inside Proxy Logging Pre-call hook!") print_verbose(f"Inside Proxy Logging Pre-call hook!")
### ALERTING ### ### ALERTING ###
asyncio.create_task(self.response_taking_too_long(request_data=data)) asyncio.create_task(
self.slack_alerting_instance.response_taking_too_long(request_data=data)
)
try: try:
for callback in litellm.callbacks: for callback in litellm.callbacks:
@ -225,169 +229,6 @@ class ProxyLogging:
raise e raise e
return data return data
def _response_taking_too_long_callback(
self,
kwargs, # kwargs to completion
start_time,
end_time, # start/end time
):
try:
time_difference = end_time - start_time
# Convert the timedelta to float (in seconds)
time_difference_float = time_difference.total_seconds()
litellm_params = kwargs.get("litellm_params", {})
model = kwargs.get("model", "")
api_base = litellm.get_api_base(model=model, optional_params=litellm_params)
messages = kwargs.get("messages", None)
# if messages does not exist fallback to "input"
if messages is None:
messages = kwargs.get("input", None)
# only use first 100 chars for alerting
_messages = str(messages)[:100]
return time_difference_float, model, api_base, _messages
except Exception as e:
raise e
async def response_taking_too_long_callback(
self,
kwargs, # kwargs to completion
completion_response, # response from completion
start_time,
end_time, # start/end time
):
if self.alerting is None:
return
if "llm_too_slow" not in self.alert_types:
return
time_difference_float, model, api_base, messages = (
self._response_taking_too_long_callback(
kwargs=kwargs,
start_time=start_time,
end_time=end_time,
)
)
request_info = f"\nRequest Model: `{model}`\nAPI Base: `{api_base}`\nMessages: `{messages}`"
slow_message = f"`Responses are slow - {round(time_difference_float,2)}s response time > Alerting threshold: {self.alerting_threshold}s`"
if time_difference_float > self.alerting_threshold:
if "langfuse" in litellm.success_callback:
request_info = self._add_langfuse_trace_id_to_alert(
request_info=request_info, kwargs=kwargs
)
await self.alerting_handler(
message=slow_message + request_info,
level="Low",
)
def _add_langfuse_trace_id_to_alert(
self,
request_info: str,
request_data: Optional[dict] = None,
kwargs: Optional[dict] = None,
):
import uuid
if request_data is not None:
trace_id = request_data.get("metadata", {}).get(
"trace_id", None
) # get langfuse trace id
if trace_id is None:
trace_id = "litellm-alert-trace-" + str(uuid.uuid4())
request_data["metadata"]["trace_id"] = trace_id
elif kwargs is not None:
_litellm_params = kwargs.get("litellm_params", {})
trace_id = _litellm_params.get("metadata", {}).get(
"trace_id", None
) # get langfuse trace id
if trace_id is None:
trace_id = "litellm-alert-trace-" + str(uuid.uuid4())
_litellm_params["metadata"]["trace_id"] = trace_id
_langfuse_host = os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
_langfuse_project_id = os.environ.get("LANGFUSE_PROJECT_ID")
# langfuse urls look like: https://us.cloud.langfuse.com/project/************/traces/litellm-alert-trace-ididi9dk-09292-************
_langfuse_url = (
f"{_langfuse_host}/project/{_langfuse_project_id}/traces/{trace_id}"
)
request_info += f"\n🪢 Langfuse Trace: {_langfuse_url}"
return request_info
async def response_taking_too_long(
self,
start_time: Optional[float] = None,
end_time: Optional[float] = None,
type: Literal["hanging_request", "slow_response"] = "hanging_request",
request_data: Optional[dict] = None,
):
if request_data is not None:
model = request_data.get("model", "")
messages = request_data.get("messages", None)
if messages is None:
# if messages does not exist fallback to "input"
messages = request_data.get("input", None)
# try casting messages to str and get the first 100 characters, else mark as None
try:
messages = str(messages)
messages = messages[:100]
except:
messages = ""
request_info = f"\nRequest Model: `{model}`\nMessages: `{messages}`"
if "langfuse" in litellm.success_callback:
request_info = self._add_langfuse_trace_id_to_alert(
request_info=request_info, request_data=request_data
)
else:
request_info = ""
if type == "hanging_request":
# Simulate a long-running operation that could take more than 5 minutes
if "llm_requests_hanging" not in self.alert_types:
return
await asyncio.sleep(
self.alerting_threshold
) # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests
if (
request_data is not None
and request_data.get("litellm_status", "") != "success"
and request_data.get("litellm_status", "") != "fail"
):
if request_data.get("deployment", None) is not None and isinstance(
request_data["deployment"], dict
):
_api_base = litellm.get_api_base(
model=model,
optional_params=request_data["deployment"].get(
"litellm_params", {}
),
)
if _api_base is None:
_api_base = ""
request_info += f"\nAPI Base: {_api_base}"
elif request_data.get("metadata", None) is not None and isinstance(
request_data["metadata"], dict
):
# In hanging requests sometime it has not made it to the point where the deployment is passed to the `request_data``
# in that case we fallback to the api base set in the request metadata
_metadata = request_data["metadata"]
_api_base = _metadata.get("api_base", "")
if _api_base is None:
_api_base = ""
request_info += f"\nAPI Base: `{_api_base}`"
# only alert hanging responses if they have not been marked as success
alerting_message = (
f"`Requests are hanging - {self.alerting_threshold}s+ request time`"
)
await self.alerting_handler(
message=alerting_message + request_info,
level="Medium",
)
async def budget_alerts( async def budget_alerts(
self, self,
type: Literal[ type: Literal[
@ -406,112 +247,14 @@ class ProxyLogging:
if self.alerting is None: if self.alerting is None:
# do nothing if alerting is not switched on # do nothing if alerting is not switched on
return return
if "budget_alerts" not in self.alert_types: await self.slack_alerting_instance.budget_alerts(
return type=type,
_id: str = "default_id" # used for caching user_max_budget=user_max_budget,
if type == "user_and_proxy_budget": user_current_spend=user_current_spend,
user_info = dict(user_info) user_info=user_info,
user_id = user_info["user_id"] error_message=error_message,
_id = user_id
max_budget = user_info["max_budget"]
spend = user_info["spend"]
user_email = user_info["user_email"]
user_info = f"""\nUser ID: {user_id}\nMax Budget: ${max_budget}\nSpend: ${spend}\nUser Email: {user_email}"""
elif type == "token_budget":
token_info = dict(user_info)
token = token_info["token"]
_id = token
spend = token_info["spend"]
max_budget = token_info["max_budget"]
user_id = token_info["user_id"]
user_info = f"""\nToken: {token}\nSpend: ${spend}\nMax Budget: ${max_budget}\nUser ID: {user_id}"""
elif type == "failed_tracking":
user_id = str(user_info)
_id = user_id
user_info = f"\nUser ID: {user_id}\n Error {error_message}"
message = "Failed Tracking Cost for" + user_info
await self.alerting_handler(
message=message,
level="High",
)
return
elif type == "projected_limit_exceeded" and user_info is not None:
"""
Input variables:
user_info = {
"key_alias": key_alias,
"projected_spend": projected_spend,
"projected_exceeded_date": projected_exceeded_date,
}
user_max_budget=soft_limit,
user_current_spend=new_spend
"""
message = f"""\n🚨 `ProjectedLimitExceededError` 💸\n\n`Key Alias:` {user_info["key_alias"]} \n`Expected Day of Error`: {user_info["projected_exceeded_date"]} \n`Current Spend`: {user_current_spend} \n`Projected Spend at end of month`: {user_info["projected_spend"]} \n`Soft Limit`: {user_max_budget}"""
await self.alerting_handler(
message=message,
level="High",
)
return
else:
user_info = str(user_info)
# percent of max_budget left to spend
if user_max_budget > 0:
percent_left = (user_max_budget - user_current_spend) / user_max_budget
else:
percent_left = 0
verbose_proxy_logger.debug(
f"Budget Alerts: Percent left: {percent_left} for {user_info}"
) )
## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727
# - Alert once within 28d period
# - Cache this information
# - Don't re-alert, if alert already sent
_cache: DualCache = self.internal_usage_cache
# check if crossed budget
if user_current_spend >= user_max_budget:
verbose_proxy_logger.debug("Budget Crossed for %s", user_info)
message = "Budget Crossed for" + user_info
result = await _cache.async_get_cache(key=message)
if result is None:
await self.alerting_handler(
message=message,
level="High",
)
await _cache.async_set_cache(key=message, value="SENT", ttl=2419200)
return
# check if 5% of max budget is left
if percent_left <= 0.05:
message = "5% budget left for" + user_info
cache_key = "alerting:{}".format(_id)
result = await _cache.async_get_cache(key=cache_key)
if result is None:
await self.alerting_handler(
message=message,
level="Medium",
)
await _cache.async_set_cache(key=cache_key, value="SENT", ttl=2419200)
return
# check if 15% of max budget is left
if percent_left <= 0.15:
message = "15% budget left for" + user_info
result = await _cache.async_get_cache(key=message)
if result is None:
await self.alerting_handler(
message=message,
level="Low",
)
await _cache.async_set_cache(key=message, value="SENT", ttl=2419200)
return
return
async def alerting_handler( async def alerting_handler(
self, message: str, level: Literal["Low", "Medium", "High"] self, message: str, level: Literal["Low", "Medium", "High"]
): ):
@ -545,19 +288,9 @@ class ProxyLogging:
for client in self.alerting: for client in self.alerting:
if client == "slack": if client == "slack":
slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL", None) await self.slack_alerting_instance.send_alert(
if slack_webhook_url is None: message=message, level=level
raise Exception("Missing SLACK_WEBHOOK_URL from environment") )
payload = {"text": formatted_message}
headers = {"Content-type": "application/json"}
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=False)
) as session:
async with session.post(
slack_webhook_url, json=payload, headers=headers
) as response:
if response.status == 200:
pass
elif client == "sentry": elif client == "sentry":
if litellm.utils.sentry_sdk_instance is not None: if litellm.utils.sentry_sdk_instance is not None:
litellm.utils.sentry_sdk_instance.capture_message(formatted_message) litellm.utils.sentry_sdk_instance.capture_message(formatted_message)