fix(slack_alerting.py): cleanup webhook event

This commit is contained in:
Krrish Dholakia 2024-05-20 16:55:01 -07:00
parent cba2b729b2
commit 867f9300e3
6 changed files with 129 additions and 189 deletions

View file

@ -39,6 +39,12 @@ class SlackAlertingArgs(LiteLLMBase):
budget_alert_ttl: int = 24 * 60 * 60 # 24 hours budget_alert_ttl: int = 24 * 60 * 60 # 24 hours
class WebhookEvent(CallInfo):
event: Literal["budget_crossed", "threshold_crossed", "projected_limit_exceeded"]
event_group: Literal["user", "key", "team", "proxy"]
event_message: str # human-readable description of event
class DeploymentMetrics(LiteLLMBase): class DeploymentMetrics(LiteLLMBase):
""" """
Metrics per deployment, stored in cache Metrics per deployment, stored in cache
@ -593,12 +599,9 @@ class SlackAlerting(CustomLogger):
"token_budget", "token_budget",
"user_budget", "user_budget",
"team_budget", "team_budget",
"user_and_proxy_budget", "proxy_budget",
"failed_budgets",
"projected_limit_exceeded", "projected_limit_exceeded",
], ],
user_max_budget: float,
user_current_spend: float,
user_info: CallInfo, user_info: CallInfo,
): ):
## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727 ## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727
@ -613,118 +616,78 @@ class SlackAlerting(CustomLogger):
if "budget_alerts" not in self.alert_types: if "budget_alerts" not in self.alert_types:
return return
_id: str = "default_id" # used for caching _id: str = "default_id" # used for caching
user_info_str = "" user_info_json = user_info.model_dump(exclude_none=True)
if type == "user_and_proxy_budget": for k, v in user_info_json.items():
user_id = user_info.user_id user_info_str = "\n{}: {}\n".format(k, v)
if user_id is not None:
_id = user_id event: Optional[
max_budget = user_info.max_budget Literal["budget_crossed", "threshold_crossed", "projected_limit_exceeded"]
spend = user_info.spend ] = None
user_email = user_info.user_email event_group: Optional[Literal["user", "team", "key", "proxy"]] = None
user_info_str = f"""\nUser ID: {user_id}\nMax Budget: ${max_budget}\nSpend: ${spend}\nUser Email: {user_email}""" event_message: str = ""
elif type == "token_budget" or type == "team_budget": webhook_event: Optional[WebhookEvent] = None
token_info = user_info if type == "proxy_budget":
token = token_info.token event_group = "proxy"
_id = token event_message += "Proxy Budget: "
spend = token_info.spend elif type == "user_budget":
max_budget = token_info.max_budget event_group = "user"
user_id = token_info.user_id event_message += "User Budget: "
user_info_str = f"""\nToken: {token}\nSpend: ${spend}\nMax Budget: ${max_budget}\nUser ID: {user_id}""" _id = user_info.user_id or _id
elif type == "projected_limit_exceeded" and user_info is not None: elif type == "team_budget":
""" event_group = "team"
Input variables: event_message += "Team Budget: "
user_info = { _id = user_info.team_id or _id
"key_alias": key_alias, elif type == "token_budget":
"projected_spend": projected_spend, event_group = "key"
"projected_exceeded_date": projected_exceeded_date, event_message += "Key Budget: "
} _id = user_info.token
user_max_budget=soft_limit, elif type == "projected_limit_exceeded":
user_current_spend=new_spend event_group = "key"
""" event_message += "Key Budget: Projected Limit Exceeded"
message = f"""\n🚨 `ProjectedLimitExceededError` 💸\n\n`Key Alias:` {user_info.key_alias} \n`Expected Day of Error`: {user_info.projected_exceeded_data} \n`Current Spend`: {user_current_spend} \n`Projected Spend at end of month`: {user_info.projected_spend} \n`Soft Limit`: {user_max_budget}""" event = "projected_limit_exceeded"
_cache_key = "budget_alerts:projected_limit_exceeded:{}".format(_id) _id = user_info.token
result = await _cache.async_get_cache(key=_cache_key)
if result is None:
await self.send_alert(
message=message,
level="High",
alert_type="budget_alerts",
user_info=user_info,
)
await _cache.async_set_cache(
key=_cache_key,
value="SENT",
ttl=self.alerting_args.budget_alert_ttl,
)
return
# percent of max_budget left to spend # percent of max_budget left to spend
if user_max_budget > 0: if user_info.max_budget > 0:
percent_left = (user_max_budget - user_current_spend) / user_max_budget percent_left = (
user_info.max_budget - user_info.spend
) / user_info.max_budget
else: else:
percent_left = 0 percent_left = 0
verbose_proxy_logger.debug(
f"Budget Alerts: Percent left: {percent_left} for {user_info_str}"
)
# check if crossed budget # check if crossed budget
if user_current_spend >= user_max_budget: if user_info.spend >= user_info.max_budget:
verbose_proxy_logger.debug("Budget Crossed for %s", user_info_str) event = "budget_crossed"
message = "Budget Crossed for" + user_info_str event_message += "Budget Crossed"
_cache_key = "budget_alerts:budget_crossed:{}".format(_id) elif percent_left <= 0.05:
event = "threshold_crossed"
event_message += "5% Threshold Crossed"
elif percent_left <= 0.15:
event = "threshold_crossed"
event_message += "15% Threshold Crossed"
if event is not None and event_group is not None:
_cache_key = "budget_alerts:{}:{}".format(event, _id)
result = await _cache.async_get_cache(key=_cache_key) result = await _cache.async_get_cache(key=_cache_key)
if result is None: if result is None:
webhook_event = WebhookEvent(
event=event,
event_group=event_group,
event_message=event_message,
**user_info_json,
)
await self.send_alert( await self.send_alert(
message=message, message=event_message + "\n\n" + user_info_str,
level="High", level="High",
alert_type="budget_alerts", alert_type="budget_alerts",
user_info=user_info, user_info=webhook_event,
) )
await _cache.async_set_cache( await _cache.async_set_cache(
key=_cache_key, key=_cache_key,
value="SENT", value="SENT",
ttl=self.alerting_args.budget_alert_ttl, ttl=self.alerting_args.budget_alert_ttl,
) )
return
# check if 5% of max budget is left
if percent_left <= 0.05:
message = "5% budget left for" + user_info_str
_cache_key = "budget_alerts:5_perc_budget_crossed:{}".format(_id)
result = await _cache.async_get_cache(key=_cache_key)
if result is None:
await self.send_alert(
message=message,
level="Medium",
alert_type="budget_alerts",
user_info=user_info,
)
await _cache.async_set_cache(
key=_cache_key,
value="SENT",
ttl=self.alerting_args.budget_alert_ttl,
)
return
# check if 15% of max budget is left
if percent_left <= 0.15:
message = "15% budget left for" + user_info_str
_cache_key = "budget_alerts:15_perc_budget_crossed:{}".format(_id)
result = await _cache.async_get_cache(key=_cache_key)
if result is None:
await self.send_alert(
message=message,
level="Low",
alert_type="budget_alerts",
user_info=user_info,
)
await _cache.async_set_cache(
key=_cache_key,
value="SENT",
ttl=self.alerting_args.budget_alert_ttl,
)
return return
return return
@ -785,9 +748,7 @@ Model Info:
async def model_removed_alert(self, model_name: str): async def model_removed_alert(self, model_name: str):
pass pass
async def send_webhook_alert( async def send_webhook_alert(self, webhook_event: WebhookEvent) -> bool:
self, alert_type: Literal["budget_alerts"], call_info: CallInfo
) -> bool:
""" """
Sends structured alert to webhook, if set. Sends structured alert to webhook, if set.
@ -800,7 +761,7 @@ Model Info:
if webhook_url is None: if webhook_url is None:
raise Exception("Missing webhook_url from environment") raise Exception("Missing webhook_url from environment")
payload = call_info.model_dump_json() payload = webhook_event.model_dump_json()
headers = {"Content-type": "application/json"} headers = {"Content-type": "application/json"}
response = await self.async_http_handler.post( response = await self.async_http_handler.post(
@ -830,7 +791,7 @@ Model Info:
"new_model_added", "new_model_added",
"cooldown_deployment", "cooldown_deployment",
], ],
user_info: Optional[CallInfo] = None, user_info: Optional[WebhookEvent] = None,
**kwargs, **kwargs,
): ):
""" """
@ -855,9 +816,7 @@ Model Info:
and alert_type == "budget_alerts" and alert_type == "budget_alerts"
and user_info is not None and user_info is not None
): ):
await self.send_webhook_alert( await self.send_webhook_alert(webhook_event=user_info)
alert_type="budget_alerts", call_info=user_info
)
if "slack" not in self.alerting: if "slack" not in self.alerting:
return return

View file

@ -23,4 +23,4 @@ general_settings:
alerting: ["slack", "webhook"] alerting: ["slack", "webhook"]
environment_variables: environment_variables:
WEBHOOK_URL: http://0.0.0.0:8000 WEBHOOK_URL: https://webhook.site/6ab090e8-c55f-4a23-b075-3209f5c57906

View file

@ -1031,11 +1031,12 @@ class TokenCountResponse(LiteLLMBase):
class CallInfo(LiteLLMBase): class CallInfo(LiteLLMBase):
"""Used for slack budget alerting""" """Used for slack budget alerting"""
spend: float
max_budget: float
token: str = Field(description="Hashed value of that key") token: str = Field(description="Hashed value of that key")
spend: float = Field(description="The spend for that key")
max_budget: Optional[float] = None
user_id: Optional[str] = None user_id: Optional[str] = None
team_id: Optional[str] = None
user_email: Optional[str] = None user_email: Optional[str] = None
key_alias: Optional[str] = None key_alias: Optional[str] = None
projected_exceeded_data: Optional[str] = None projected_exceeded_date: Optional[str] = None
projected_spend: Optional[float] = None projected_spend: Optional[float] = None

View file

@ -597,9 +597,7 @@ async def user_api_key_auth(
) )
asyncio.create_task( asyncio.create_task(
proxy_logging_obj.budget_alerts( proxy_logging_obj.budget_alerts(
user_max_budget=litellm.max_budget, type="proxy_budget",
user_current_spend=global_proxy_spend,
type="user_and_proxy_budget",
user_info=user_info, user_info=user_info,
) )
) )
@ -923,17 +921,16 @@ async def user_api_key_auth(
and user_current_spend is not None and user_current_spend is not None
): ):
call_info = CallInfo( call_info = CallInfo(
token=valid_token["token"], token=valid_token.token,
spend=valid_token["spend"], spend=user_current_spend,
max_budget=user_max_budget,
user_id=_user.get("user_id", None), user_id=_user.get("user_id", None),
user_email=_user.get("user_email", None), user_email=_user.get("user_email", None),
key_alias=valid_token["key_alias"], key_alias=valid_token.key_alias,
) )
asyncio.create_task( asyncio.create_task(
proxy_logging_obj.budget_alerts( proxy_logging_obj.budget_alerts(
user_max_budget=user_max_budget, type="user_budget",
user_current_spend=user_current_spend,
type="user_and_proxy_budget",
user_info=call_info, user_info=call_info,
) )
) )
@ -955,19 +952,17 @@ async def user_api_key_auth(
and user_current_spend is not None and user_current_spend is not None
): ):
call_info = CallInfo( call_info = CallInfo(
token=valid_token["token"], token=valid_token.token,
spend=valid_token["spend"], spend=user_current_spend,
max_budget=valid_token["max_budget"], max_budget=user_max_budget,
user_id=getattr(user_id_information, "user_id", None), user_id=getattr(user_id_information, "user_id", None),
user_email=getattr( user_email=getattr(
user_id_information, "user_email", None user_id_information, "user_email", None
), ),
key_alias=valid_token["key_alias"], key_alias=valid_token.key_alias,
) )
asyncio.create_task( asyncio.create_task(
proxy_logging_obj.budget_alerts( proxy_logging_obj.budget_alerts(
user_max_budget=user_max_budget,
user_current_spend=user_current_spend,
type="user_budget", type="user_budget",
user_info=call_info, user_info=call_info,
) )
@ -1000,15 +995,14 @@ async def user_api_key_auth(
# Check 4. Token Spend is under budget # Check 4. Token Spend is under budget
if valid_token.spend is not None and valid_token.max_budget is not None: if valid_token.spend is not None and valid_token.max_budget is not None:
call_info = CallInfo( call_info = CallInfo(
token=valid_token["token"], token=valid_token.token,
spend=valid_token["spend"], spend=valid_token.spend,
max_budget=valid_token["max_budget"], max_budget=valid_token.max_budget,
user_id=valid_token["user_id"], user_id=valid_token.user_id,
team_id=valid_token.team_id,
) )
asyncio.create_task( asyncio.create_task(
proxy_logging_obj.budget_alerts( proxy_logging_obj.budget_alerts(
user_max_budget=valid_token.max_budget,
user_current_spend=valid_token.spend,
type="token_budget", type="token_budget",
user_info=call_info, user_info=call_info,
) )
@ -1055,33 +1049,8 @@ async def user_api_key_auth(
raise Exception( raise Exception(
f"ExceededModelBudget: Current spend for model: {current_model_spend}; Max Budget for Model: {current_model_budget}" f"ExceededModelBudget: Current spend for model: {current_model_spend}; Max Budget for Model: {current_model_budget}"
) )
# Check 6. Token spend is under Team budget
if (
valid_token.spend is not None
and hasattr(valid_token, "team_max_budget")
and valid_token.team_max_budget is not None
):
call_info = CallInfo(
token=valid_token["token"],
spend=valid_token["spend"],
max_budget=valid_token["max_budget"],
user_id=valid_token["user_id"],
)
asyncio.create_task(
proxy_logging_obj.budget_alerts(
user_max_budget=valid_token.team_max_budget,
user_current_spend=valid_token.spend,
type="team_budget",
user_info=call_info,
)
)
if valid_token.spend >= valid_token.team_max_budget: # Check 6. Team spend is under Team budget
raise Exception(
f"ExceededTokenBudget: Current spend for token: {valid_token.spend}; Max Budget for Team: {valid_token.team_max_budget}"
)
# Check 7. Team spend is under Team budget
if ( if (
hasattr(valid_token, "team_spend") hasattr(valid_token, "team_spend")
and valid_token.team_spend is not None and valid_token.team_spend is not None
@ -1089,15 +1058,13 @@ async def user_api_key_auth(
and valid_token.team_max_budget is not None and valid_token.team_max_budget is not None
): ):
call_info = CallInfo( call_info = CallInfo(
token=valid_token["token"], token=valid_token.token,
spend=valid_token["spend"], spend=valid_token.team_spend,
max_budget=valid_token["max_budget"], max_budget=valid_token.team_max_budget,
user_id=valid_token["user_id"], user_id=valid_token.user_id,
) )
asyncio.create_task( asyncio.create_task(
proxy_logging_obj.budget_alerts( proxy_logging_obj.budget_alerts(
user_max_budget=valid_token.team_max_budget,
user_current_spend=valid_token.team_spend,
type="team_budget", type="team_budget",
user_info=call_info, user_info=call_info,
) )
@ -1150,12 +1117,11 @@ async def user_api_key_auth(
spend=global_proxy_spend, spend=global_proxy_spend,
max_budget=litellm.max_budget, max_budget=litellm.max_budget,
user_id=litellm_proxy_admin_name, user_id=litellm_proxy_admin_name,
team_id=valid_token.team_id,
) )
asyncio.create_task( asyncio.create_task(
proxy_logging_obj.budget_alerts( proxy_logging_obj.budget_alerts(
user_max_budget=litellm.max_budget, type="proxy_budget",
user_current_spend=global_proxy_spend,
type="user_and_proxy_budget",
user_info=call_info, user_info=call_info,
) )
) )
@ -1344,7 +1310,7 @@ async def user_api_key_auth(
elif isinstance(e, ProxyException): elif isinstance(e, ProxyException):
raise e raise e
raise ProxyException( raise ProxyException(
message="Authentication Error, " + str(e), message="Authentication Error 1234, " + str(e),
type="auth_error", type="auth_error",
param=getattr(e, "param", "None"), param=getattr(e, "param", "None"),
code=status.HTTP_401_UNAUTHORIZED, code=status.HTTP_401_UNAUTHORIZED,
@ -1761,14 +1727,14 @@ async def update_cache(
""" """
### UPDATE KEY SPEND ### ### UPDATE KEY SPEND ###
async def _update_key_cache(): async def _update_key_cache(token: str, response_cost: float):
# Fetch the existing cost for the given token # Fetch the existing cost for the given token
if isinstance(token, str) and token.startswith("sk-"): if isinstance(token, str) and token.startswith("sk-"):
hashed_token = hash_token(token=token) hashed_token = hash_token(token=token)
else: else:
hashed_token = token hashed_token = token
verbose_proxy_logger.debug("_update_key_cache: hashed_token=%s", hashed_token) verbose_proxy_logger.debug("_update_key_cache: hashed_token=%s", hashed_token)
existing_spend_obj = await user_api_key_cache.async_get_cache(key=hashed_token) existing_spend_obj: LiteLLM_VerificationTokenView = await user_api_key_cache.async_get_cache(key=hashed_token) # type: ignore
verbose_proxy_logger.debug( verbose_proxy_logger.debug(
f"_update_key_cache: existing_spend_obj={existing_spend_obj}" f"_update_key_cache: existing_spend_obj={existing_spend_obj}"
) )
@ -1777,7 +1743,7 @@ async def update_cache(
) )
if existing_spend_obj is None: if existing_spend_obj is None:
existing_spend = 0 existing_spend = 0
existing_spend_obj = LiteLLM_VerificationTokenView() existing_spend_obj = LiteLLM_VerificationTokenView(token=token)
else: else:
existing_spend = existing_spend_obj.spend existing_spend = existing_spend_obj.spend
# Calculate the new cost by adding the existing cost and response_cost # Calculate the new cost by adding the existing cost and response_cost
@ -1791,7 +1757,9 @@ async def update_cache(
and ( and (
_is_projected_spend_over_limit( _is_projected_spend_over_limit(
current_spend=new_spend, current_spend=new_spend,
soft_budget_limit=existing_spend_obj.litellm_budget_table.soft_budget, soft_budget_limit=existing_spend_obj.litellm_budget_table[
"soft_budget"
],
) )
== True == True
) )
@ -1807,20 +1775,18 @@ async def update_cache(
) )
call_info = CallInfo( call_info = CallInfo(
token=existing_spend_obj.token or "", token=existing_spend_obj.token or "",
spend=existing_spend_obj.spend, spend=new_spend,
key_alias=existing_spend_obj.key_alias, key_alias=existing_spend_obj.key_alias,
max_budget=existing_spend_obj.max_budget, max_budget=soft_limit,
user_id=existing_spend_obj.user_id, user_id=existing_spend_obj.user_id,
projected_spend=projected_spend, projected_spend=projected_spend,
projected_exceeded_data=projected_exceeded_date, projected_exceeded_date=projected_exceeded_date,
) )
# alert user # alert user
asyncio.create_task( asyncio.create_task(
proxy_logging_obj.budget_alerts( proxy_logging_obj.budget_alerts(
type="projected_limit_exceeded", type="projected_limit_exceeded",
user_info=call_info, user_info=call_info,
user_max_budget=soft_limit,
user_current_spend=new_spend,
) )
) )
# set cooldown on alert # set cooldown on alert
@ -1830,7 +1796,7 @@ async def update_cache(
existing_spend_obj is not None existing_spend_obj is not None
and getattr(existing_spend_obj, "team_spend", None) is not None and getattr(existing_spend_obj, "team_spend", None) is not None
): ):
existing_team_spend = existing_spend_obj.team_spend existing_team_spend = existing_spend_obj.team_spend or 0
# Calculate the new cost by adding the existing cost and response_cost # Calculate the new cost by adding the existing cost and response_cost
existing_spend_obj.team_spend = existing_team_spend + response_cost existing_spend_obj.team_spend = existing_team_spend + response_cost
@ -1947,8 +1913,8 @@ async def update_cache(
f"An error occurred updating end user cache: {str(e)}\n\n{traceback.format_exc()}" f"An error occurred updating end user cache: {str(e)}\n\n{traceback.format_exc()}"
) )
if token is not None: if token is not None and response_cost is not None:
asyncio.create_task(_update_key_cache()) asyncio.create_task(_update_key_cache(token=token, response_cost=response_cost))
asyncio.create_task(_update_user_cache()) asyncio.create_task(_update_user_cache())
@ -10231,7 +10197,7 @@ async def test_endpoint(request: Request):
async def health_services_endpoint( async def health_services_endpoint(
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
service: Literal[ service: Literal[
"slack_budget_alerts", "langfuse", "slack", "openmeter" "slack_budget_alerts", "langfuse", "slack", "openmeter", "webhook"
] = fastapi.Query(description="Specify the service being hit."), ] = fastapi.Query(description="Specify the service being hit."),
): ):
""" """
@ -10246,7 +10212,13 @@ async def health_services_endpoint(
raise HTTPException( raise HTTPException(
status_code=400, detail={"error": "Service must be specified."} status_code=400, detail={"error": "Service must be specified."}
) )
if service not in ["slack_budget_alerts", "langfuse", "slack", "openmeter"]: if service not in [
"slack_budget_alerts",
"langfuse",
"slack",
"openmeter",
"webhook",
]:
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,
detail={ detail={
@ -10282,6 +10254,20 @@ async def health_services_endpoint(
"message": "Mock LLM request made - check langfuse.", "message": "Mock LLM request made - check langfuse.",
} }
if service == "webhook":
user_info = CallInfo(
token=user_api_key_dict.token or "",
spend=1,
max_budget=0,
user_id=user_api_key_dict.user_id,
key_alias=user_api_key_dict.key_alias,
team_id=user_api_key_dict.team_id,
)
await proxy_logging_obj.budget_alerts(
type="user_budget",
user_info=user_info,
)
if service == "slack" or service == "slack_budget_alerts": if service == "slack" or service == "slack_budget_alerts":
if "slack" in general_settings.get("alerting", []): if "slack" in general_settings.get("alerting", []):
# test_message = f"""\n🚨 `ProjectedLimitExceededError` 💸\n\n`Key Alias:` litellm-ui-test-alert \n`Expected Day of Error`: 28th March \n`Current Spend`: $100.00 \n`Projected Spend at end of month`: $1000.00 \n`Soft Limit`: $700""" # test_message = f"""\n🚨 `ProjectedLimitExceededError` 💸\n\n`Key Alias:` litellm-ui-test-alert \n`Expected Day of Error`: 28th March \n`Current Spend`: $100.00 \n`Projected Spend at end of month`: $1000.00 \n`Soft Limit`: $700"""
@ -10357,6 +10343,7 @@ async def health_services_endpoint(
}, },
) )
except Exception as e: except Exception as e:
traceback.print_exc()
if isinstance(e, HTTPException): if isinstance(e, HTTPException):
raise ProxyException( raise ProxyException(
message=getattr(e, "detail", f"Authentication Error({str(e)})"), message=getattr(e, "detail", f"Authentication Error({str(e)})"),

View file

@ -290,12 +290,9 @@ class ProxyLogging:
"token_budget", "token_budget",
"user_budget", "user_budget",
"team_budget", "team_budget",
"user_and_proxy_budget", "proxy_budget",
"failed_budgets",
"projected_limit_exceeded", "projected_limit_exceeded",
], ],
user_max_budget: float,
user_current_spend: float,
user_info: CallInfo, user_info: CallInfo,
): ):
if self.alerting is None: if self.alerting is None:
@ -303,8 +300,6 @@ class ProxyLogging:
return return
await self.slack_alerting_instance.budget_alerts( await self.slack_alerting_instance.budget_alerts(
type=type, type=type,
user_max_budget=user_max_budget,
user_current_spend=user_current_spend,
user_info=user_info, user_info=user_info,
) )

View file

@ -431,7 +431,6 @@ async def test_send_daily_reports_all_zero_or_none():
"user_budget", "user_budget",
"team_budget", "team_budget",
"user_and_proxy_budget", "user_and_proxy_budget",
"failed_budgets",
"projected_limit_exceeded", "projected_limit_exceeded",
], ],
) )
@ -470,7 +469,6 @@ async def test_send_token_budget_crossed_alerts(alerting_type):
"user_budget", "user_budget",
"team_budget", "team_budget",
"user_and_proxy_budget", "user_and_proxy_budget",
"failed_budgets",
"projected_limit_exceeded", "projected_limit_exceeded",
], ],
) )