feat(slack_alerting.py): support webhook for budget alerts

Closes https://github.com/BerriAI/litellm/issues/3743
This commit is contained in:
Krrish Dholakia 2024-05-20 15:30:56 -07:00
parent 41556b3d02
commit c0d5e83570
6 changed files with 255 additions and 101 deletions

View file

@ -1,7 +1,7 @@
#### What this does ####
# Class for sending Slack Alerts #
import dotenv, os
from litellm.proxy._types import UserAPIKeyAuth
from litellm.proxy._types import UserAPIKeyAuth, CallInfo
from litellm._logging import verbose_logger, verbose_proxy_logger
import litellm, threading
from typing import List, Literal, Any, Union, Optional, Dict
@ -571,20 +571,35 @@ class SlackAlerting(CustomLogger):
alert_type="llm_requests_hanging",
)
async def failed_tracking_alert(self, error_message: str):
"""Raise alert when tracking failed for specific model"""
_cache: DualCache = self.internal_usage_cache
message = "Failed Tracking Cost for" + error_message
_cache_key = "budget_alerts:failed_tracking:{}".format(message)
result = await _cache.async_get_cache(key=_cache_key)
if result is None:
await self.send_alert(
message=message, level="High", alert_type="budget_alerts"
)
await _cache.async_set_cache(
key=_cache_key,
value="SENT",
ttl=self.alerting_args.budget_alert_ttl,
)
async def budget_alerts(
self,
type: Literal[
"token_budget",
"user_budget",
"team_budget",
"user_and_proxy_budget",
"failed_budgets",
"failed_tracking",
"projected_limit_exceeded",
],
user_max_budget: float,
user_current_spend: float,
user_info=None,
error_message="",
user_info: CallInfo,
):
## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727
# - Alert once within 24hr period
@ -598,39 +613,23 @@ class SlackAlerting(CustomLogger):
if "budget_alerts" not in self.alert_types:
return
_id: str = "default_id" # used for caching
user_info_str = ""
if type == "user_and_proxy_budget":
user_info = dict(user_info)
user_id = user_info["user_id"]
_id = user_id
max_budget = user_info["max_budget"]
spend = user_info["spend"]
user_email = user_info["user_email"]
user_info = f"""\nUser ID: {user_id}\nMax Budget: ${max_budget}\nSpend: ${spend}\nUser Email: {user_email}"""
elif type == "token_budget":
token_info = dict(user_info)
token = token_info["token"]
user_id = user_info.user_id
if user_id is not None:
_id = user_id
max_budget = user_info.max_budget
spend = user_info.spend
user_email = user_info.user_email
user_info_str = f"""\nUser ID: {user_id}\nMax Budget: ${max_budget}\nSpend: ${spend}\nUser Email: {user_email}"""
elif type == "token_budget" or type == "team_budget":
token_info = user_info
token = token_info.token
_id = token
spend = token_info["spend"]
max_budget = token_info["max_budget"]
user_id = token_info["user_id"]
user_info = f"""\nToken: {token}\nSpend: ${spend}\nMax Budget: ${max_budget}\nUser ID: {user_id}"""
elif type == "failed_tracking":
user_id = str(user_info)
_id = user_id
user_info = f"\nUser ID: {user_id}\n Error {error_message}"
message = "Failed Tracking Cost for" + user_info
_cache_key = "budget_alerts:failed_tracking:{}".format(_id)
result = await _cache.async_get_cache(key=_cache_key)
if result is None:
await self.send_alert(
message=message, level="High", alert_type="budget_alerts"
)
await _cache.async_set_cache(
key=_cache_key,
value="SENT",
ttl=self.alerting_args.budget_alert_ttl,
)
return
spend = token_info.spend
max_budget = token_info.max_budget
user_id = token_info.user_id
user_info_str = f"""\nToken: {token}\nSpend: ${spend}\nMax Budget: ${max_budget}\nUser ID: {user_id}"""
elif type == "projected_limit_exceeded" and user_info is not None:
"""
Input variables:
@ -642,12 +641,15 @@ class SlackAlerting(CustomLogger):
user_max_budget=soft_limit,
user_current_spend=new_spend
"""
message = f"""\n🚨 `ProjectedLimitExceededError` 💸\n\n`Key Alias:` {user_info["key_alias"]} \n`Expected Day of Error`: {user_info["projected_exceeded_date"]} \n`Current Spend`: {user_current_spend} \n`Projected Spend at end of month`: {user_info["projected_spend"]} \n`Soft Limit`: {user_max_budget}"""
message = f"""\n🚨 `ProjectedLimitExceededError` 💸\n\n`Key Alias:` {user_info.key_alias} \n`Expected Day of Error`: {user_info.projected_exceeded_data} \n`Current Spend`: {user_current_spend} \n`Projected Spend at end of month`: {user_info.projected_spend} \n`Soft Limit`: {user_max_budget}"""
_cache_key = "budget_alerts:projected_limit_exceeded:{}".format(_id)
result = await _cache.async_get_cache(key=_cache_key)
if result is None:
await self.send_alert(
message=message, level="High", alert_type="budget_alerts"
message=message,
level="High",
alert_type="budget_alerts",
user_info=user_info,
)
await _cache.async_set_cache(
key=_cache_key,
@ -655,8 +657,6 @@ class SlackAlerting(CustomLogger):
ttl=self.alerting_args.budget_alert_ttl,
)
return
else:
user_info = str(user_info)
# percent of max_budget left to spend
if user_max_budget > 0:
@ -664,18 +664,21 @@ class SlackAlerting(CustomLogger):
else:
percent_left = 0
verbose_proxy_logger.debug(
f"Budget Alerts: Percent left: {percent_left} for {user_info}"
f"Budget Alerts: Percent left: {percent_left} for {user_info_str}"
)
# check if crossed budget
if user_current_spend >= user_max_budget:
verbose_proxy_logger.debug("Budget Crossed for %s", user_info)
message = "Budget Crossed for" + user_info
verbose_proxy_logger.debug("Budget Crossed for %s", user_info_str)
message = "Budget Crossed for" + user_info_str
_cache_key = "budget_alerts:budget_crossed:{}".format(_id)
result = await _cache.async_get_cache(key=_cache_key)
if result is None:
await self.send_alert(
message=message, level="High", alert_type="budget_alerts"
message=message,
level="High",
alert_type="budget_alerts",
user_info=user_info,
)
await _cache.async_set_cache(
key=_cache_key,
@ -686,7 +689,7 @@ class SlackAlerting(CustomLogger):
# check if 5% of max budget is left
if percent_left <= 0.05:
message = "5% budget left for" + user_info
message = "5% budget left for" + user_info_str
_cache_key = "budget_alerts:5_perc_budget_crossed:{}".format(_id)
result = await _cache.async_get_cache(key=_cache_key)
if result is None:
@ -694,6 +697,7 @@ class SlackAlerting(CustomLogger):
message=message,
level="Medium",
alert_type="budget_alerts",
user_info=user_info,
)
await _cache.async_set_cache(
@ -706,7 +710,7 @@ class SlackAlerting(CustomLogger):
# check if 15% of max budget is left
if percent_left <= 0.15:
message = "15% budget left for" + user_info
message = "15% budget left for" + user_info_str
_cache_key = "budget_alerts:15_perc_budget_crossed:{}".format(_id)
result = await _cache.async_get_cache(key=_cache_key)
if result is None:
@ -714,6 +718,7 @@ class SlackAlerting(CustomLogger):
message=message,
level="Low",
alert_type="budget_alerts",
user_info=user_info,
)
await _cache.async_set_cache(
key=_cache_key,
@ -780,6 +785,36 @@ Model Info:
async def model_removed_alert(self, model_name: str):
pass
async def send_webhook_alert(
self, alert_type: Literal["budget_alerts"], call_info: CallInfo
) -> bool:
"""
Sends structured alert to webhook, if set.
Currently only implemented for budget alerts
Returns -> True if sent, False if not.
"""
webhook_url = os.getenv("WEBHOOK_URL", None)
if webhook_url is None:
raise Exception("Missing webhook_url from environment")
payload = call_info.model_dump_json()
headers = {"Content-type": "application/json"}
response = await self.async_http_handler.post(
url=webhook_url,
headers=headers,
data=payload,
)
if response.status_code == 200:
return True
else:
print("Error sending webhook alert. Error=", response.text) # noqa
return False
async def send_alert(
self,
message: str,
@ -795,6 +830,7 @@ Model Info:
"new_model_added",
"cooldown_deployment",
],
user_info: Optional[CallInfo] = None,
**kwargs,
):
"""
@ -814,6 +850,18 @@ Model Info:
if self.alerting is None:
return
if (
"webhook" in self.alerting
and alert_type == "budget_alerts"
and user_info is not None
):
await self.send_webhook_alert(
alert_type="budget_alerts", call_info=user_info
)
if "slack" not in self.alerting:
return
if alert_type not in self.alert_types:
return

View file

@ -17,4 +17,10 @@ model_list:
api_key: os.environ/AZURE_API_KEY # The `os.environ/` prefix tells litellm to read this from the env. See https://docs.litellm.ai/docs/simple_proxy#load-api-keys-from-vault
router_settings:
enable_pre_call_checks: true
enable_pre_call_checks: true
general_settings:
alerting: ["slack", "webhook"]
environment_variables:
WEBHOOK_URL: http://0.0.0.0:8000

View file

@ -1026,3 +1026,16 @@ class TokenCountResponse(LiteLLMBase):
request_model: str
model_used: str
tokenizer_type: str
class CallInfo(LiteLLMBase):
"""Used for slack budget alerting"""
token: str = Field(description="Hashed value of that key")
spend: float = Field(description="The spend for that key")
max_budget: Optional[float] = None
user_id: Optional[str] = None
user_email: Optional[str] = None
key_alias: Optional[str] = None
projected_exceeded_data: Optional[str] = None
projected_spend: Optional[float] = None

View file

@ -589,12 +589,12 @@ async def user_api_key_auth(
ttl=UserAPIKeyCacheTTLEnum.global_proxy_spend.value,
)
if global_proxy_spend is not None:
user_info = {
"user_id": litellm_proxy_admin_name,
"max_budget": litellm.max_budget,
"spend": global_proxy_spend,
"user_email": "",
}
user_info = CallInfo(
user_id=litellm_proxy_admin_name,
max_budget=litellm.max_budget,
spend=global_proxy_spend,
token=valid_token["token"],
)
asyncio.create_task(
proxy_logging_obj.budget_alerts(
user_max_budget=litellm.max_budget,
@ -922,12 +922,19 @@ async def user_api_key_auth(
user_max_budget is not None
and user_current_spend is not None
):
call_info = CallInfo(
token=valid_token["token"],
spend=valid_token["spend"],
user_id=_user.get("user_id", None),
user_email=_user.get("user_email", None),
key_alias=valid_token["key_alias"],
)
asyncio.create_task(
proxy_logging_obj.budget_alerts(
user_max_budget=user_max_budget,
user_current_spend=user_current_spend,
type="user_and_proxy_budget",
user_info=_user,
user_info=call_info,
)
)
@ -947,12 +954,22 @@ async def user_api_key_auth(
user_max_budget is not None
and user_current_spend is not None
):
call_info = CallInfo(
token=valid_token["token"],
spend=valid_token["spend"],
max_budget=valid_token["max_budget"],
user_id=getattr(user_id_information, "user_id", None),
user_email=getattr(
user_id_information, "user_email", None
),
key_alias=valid_token["key_alias"],
)
asyncio.create_task(
proxy_logging_obj.budget_alerts(
user_max_budget=user_max_budget,
user_current_spend=user_current_spend,
type="user_budget",
user_info=user_id_information,
user_info=call_info,
)
)
@ -982,12 +999,18 @@ async def user_api_key_auth(
# Check 4. Token Spend is under budget
if valid_token.spend is not None and valid_token.max_budget is not None:
call_info = CallInfo(
token=valid_token["token"],
spend=valid_token["spend"],
max_budget=valid_token["max_budget"],
user_id=valid_token["user_id"],
)
asyncio.create_task(
proxy_logging_obj.budget_alerts(
user_max_budget=valid_token.max_budget,
user_current_spend=valid_token.spend,
type="token_budget",
user_info=valid_token,
user_info=call_info,
)
)
@ -1038,12 +1061,18 @@ async def user_api_key_auth(
and hasattr(valid_token, "team_max_budget")
and valid_token.team_max_budget is not None
):
call_info = CallInfo(
token=valid_token["token"],
spend=valid_token["spend"],
max_budget=valid_token["max_budget"],
user_id=valid_token["user_id"],
)
asyncio.create_task(
proxy_logging_obj.budget_alerts(
user_max_budget=valid_token.team_max_budget,
user_current_spend=valid_token.spend,
type="token_budget",
user_info=valid_token,
type="team_budget",
user_info=call_info,
)
)
@ -1059,12 +1088,18 @@ async def user_api_key_auth(
and hasattr(valid_token, "team_max_budget")
and valid_token.team_max_budget is not None
):
call_info = CallInfo(
token=valid_token["token"],
spend=valid_token["spend"],
max_budget=valid_token["max_budget"],
user_id=valid_token["user_id"],
)
asyncio.create_task(
proxy_logging_obj.budget_alerts(
user_max_budget=valid_token.team_max_budget,
user_current_spend=valid_token.team_spend,
type="token_budget",
user_info=valid_token,
type="team_budget",
user_info=call_info,
)
)
@ -1110,18 +1145,18 @@ async def user_api_key_auth(
)
if global_proxy_spend is not None:
user_info = {
"user_id": litellm_proxy_admin_name,
"max_budget": litellm.max_budget,
"spend": global_proxy_spend,
"user_email": "",
}
call_info = CallInfo(
token=valid_token["token"],
spend=global_proxy_spend,
max_budget=litellm.max_budget,
user_id=litellm_proxy_admin_name,
)
asyncio.create_task(
proxy_logging_obj.budget_alerts(
user_max_budget=litellm.max_budget,
user_current_spend=global_proxy_spend,
type="user_and_proxy_budget",
user_info=user_info,
user_info=call_info,
)
)
_ = common_checks(
@ -1513,13 +1548,8 @@ async def _PROXY_track_cost_callback(
model = kwargs.get("model", "")
metadata = kwargs.get("litellm_params", {}).get("metadata", {})
error_msg += f"\n Args to _PROXY_track_cost_callback\n model: {model}\n metadata: {metadata}\n"
user_id = user_id or "not-found"
asyncio.create_task(
proxy_logging_obj.budget_alerts(
user_max_budget=0,
user_current_spend=0,
type="failed_tracking",
user_info=user_id,
proxy_logging_obj.failed_tracking_alert(
error_message=error_msg,
)
)
@ -1766,22 +1796,29 @@ async def update_cache(
== True
)
):
key_alias = existing_spend_obj.key_alias
projected_spend, projected_exceeded_date = _get_projected_spend_over_limit(
current_spend=new_spend,
soft_budget_limit=existing_spend_obj.litellm_budget_table.soft_budget,
soft_budget_limit=existing_spend_obj.litellm_budget_table.get(
"soft_budget", None
),
) # type: ignore
soft_limit = existing_spend_obj.litellm_budget_table.get(
"soft_budget", float("inf")
)
call_info = CallInfo(
token=existing_spend_obj.token or "",
spend=existing_spend_obj.spend,
key_alias=existing_spend_obj.key_alias,
max_budget=existing_spend_obj.max_budget,
user_id=existing_spend_obj.user_id,
projected_spend=projected_spend,
projected_exceeded_data=projected_exceeded_date,
)
soft_limit = existing_spend_obj.litellm_budget_table.soft_budget
user_info = {
"key_alias": key_alias,
"projected_spend": projected_spend,
"projected_exceeded_date": projected_exceeded_date,
}
# alert user
asyncio.create_task(
proxy_logging_obj.budget_alerts(
type="projected_limit_exceeded",
user_info=user_info,
user_info=call_info,
user_max_budget=soft_limit,
user_current_spend=new_spend,
)

View file

@ -11,6 +11,7 @@ from litellm.proxy._types import (
LiteLLM_EndUserTable,
LiteLLM_TeamTable,
Member,
CallInfo,
)
from litellm.caching import DualCache, RedisCache
from litellm.router import Deployment, ModelInfo, LiteLLM_Params
@ -276,20 +277,26 @@ class ProxyLogging:
raise e
return data
async def failed_tracking_alert(self, error_message: str):
if self.alerting is None:
return
await self.slack_alerting_instance.failed_tracking_alert(
error_message=error_message
)
async def budget_alerts(
self,
type: Literal[
"token_budget",
"user_budget",
"team_budget",
"user_and_proxy_budget",
"failed_budgets",
"failed_tracking",
"projected_limit_exceeded",
],
user_max_budget: float,
user_current_spend: float,
user_info=None,
error_message="",
user_info: CallInfo,
):
if self.alerting is None:
# do nothing if alerting is not switched on
@ -299,7 +306,6 @@ class ProxyLogging:
user_max_budget=user_max_budget,
user_current_spend=user_current_spend,
user_info=user_info,
error_message=error_message,
)
async def alerting_handler(
@ -355,7 +361,11 @@ class ProxyLogging:
for client in self.alerting:
if client == "slack":
await self.slack_alerting_instance.send_alert(
message=message, level=level, alert_type=alert_type, **extra_kwargs
message=message,
level=level,
alert_type=alert_type,
user_info=None,
**extra_kwargs,
)
elif client == "sentry":
if litellm.utils.sentry_sdk_instance is not None:

View file

@ -1,7 +1,7 @@
# What is this?
## Tests slack alerting on proxy logging object
import sys, json, uuid
import sys, json, uuid, random
import os
import io, asyncio
from datetime import datetime, timedelta
@ -22,6 +22,7 @@ import unittest.mock
from unittest.mock import AsyncMock
import pytest
from litellm.router import AlertingConfig, Router
from litellm.proxy._types import CallInfo
@pytest.mark.parametrize(
@ -417,9 +418,9 @@ async def test_send_daily_reports_all_zero_or_none():
[
"token_budget",
"user_budget",
"team_budget",
"user_and_proxy_budget",
"failed_budgets",
"failed_tracking",
"projected_limit_exceeded",
],
)
@ -428,19 +429,18 @@ async def test_send_token_budget_crossed_alerts(alerting_type):
slack_alerting = SlackAlerting()
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert:
if alerting_type == "failed_tracking":
user_info = "ishaan@berri.ai"
else:
user_info = {
"token": "50e55ca5bfbd0759697538e8d23c0cd5031f52d9e19e176d7233b20c7c4d3403",
"spend": uuid.uuid4(),
"max_budget": None,
"user_id": "ishaan@berri.ai",
"user_email": "ishaan@berri.ai",
"key_alias": "my-test-key",
"projected_exceeded_date": "10/20/2024",
"projected_spend": 200,
}
user_info = {
"token": "50e55ca5bfbd0759697538e8d23c0cd5031f52d9e19e176d7233b20c7c4d3403",
"spend": random.uniform(1.5, 1.9),
"max_budget": None,
"user_id": "ishaan@berri.ai",
"user_email": "ishaan@berri.ai",
"key_alias": "my-test-key",
"projected_exceeded_date": "10/20/2024",
"projected_spend": 200,
}
user_info = CallInfo(**user_info)
for _ in range(50):
await slack_alerting.budget_alerts(
@ -450,3 +450,43 @@ async def test_send_token_budget_crossed_alerts(alerting_type):
user_max_budget=100,
)
mock_send_alert.assert_awaited_once()
@pytest.mark.parametrize(
"alerting_type",
[
"token_budget",
"user_budget",
"team_budget",
"user_and_proxy_budget",
"failed_budgets",
"projected_limit_exceeded",
],
)
@pytest.mark.asyncio
async def test_webhook_alerting(alerting_type):
slack_alerting = SlackAlerting(alerting=["webhook"])
with patch.object(
slack_alerting, "send_webhook_alert", new=AsyncMock()
) as mock_send_alert:
user_info = {
"token": "50e55ca5bfbd0759697538e8d23c0cd5031f52d9e19e176d7233b20c7c4d3403",
"spend": random.uniform(1.5, 1.9),
"max_budget": None,
"user_id": "ishaan@berri.ai",
"user_email": "ishaan@berri.ai",
"key_alias": "my-test-key",
"projected_exceeded_date": "10/20/2024",
"projected_spend": 200,
}
user_info = CallInfo(**user_info)
for _ in range(50):
await slack_alerting.budget_alerts(
type=alerting_type,
user_info=user_info,
user_current_spend=86,
user_max_budget=100,
)
mock_send_alert.assert_awaited_once()