From fd13a830abe9bc7f5d77f3a1659f3eabeac3e5e2 Mon Sep 17 00:00:00 2001 From: Krrish Dholakia Date: Mon, 20 May 2024 16:55:01 -0700 Subject: [PATCH] fix(slack_alerting.py): cleanup webhook event --- litellm/integrations/slack_alerting.py | 171 +++++++++--------------- litellm/proxy/_super_secret_config.yaml | 2 +- litellm/proxy/_types.py | 7 +- litellm/proxy/proxy_server.py | 129 ++++++++---------- litellm/proxy/utils.py | 7 +- litellm/tests/test_alerting.py | 2 - 6 files changed, 129 insertions(+), 189 deletions(-) diff --git a/litellm/integrations/slack_alerting.py b/litellm/integrations/slack_alerting.py index c1e2735ddf..6c12f0a7d4 100644 --- a/litellm/integrations/slack_alerting.py +++ b/litellm/integrations/slack_alerting.py @@ -39,6 +39,12 @@ class SlackAlertingArgs(LiteLLMBase): budget_alert_ttl: int = 24 * 60 * 60 # 24 hours +class WebhookEvent(CallInfo): + event: Literal["budget_crossed", "threshold_crossed", "projected_limit_exceeded"] + event_group: Literal["user", "key", "team", "proxy"] + event_message: str # human-readable description of event + + class DeploymentMetrics(LiteLLMBase): """ Metrics per deployment, stored in cache @@ -593,12 +599,9 @@ class SlackAlerting(CustomLogger): "token_budget", "user_budget", "team_budget", - "user_and_proxy_budget", - "failed_budgets", + "proxy_budget", "projected_limit_exceeded", ], - user_max_budget: float, - user_current_spend: float, user_info: CallInfo, ): ## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727 @@ -613,118 +616,78 @@ class SlackAlerting(CustomLogger): if "budget_alerts" not in self.alert_types: return _id: str = "default_id" # used for caching - user_info_str = "" - if type == "user_and_proxy_budget": - user_id = user_info.user_id - if user_id is not None: - _id = user_id - max_budget = user_info.max_budget - spend = user_info.spend - user_email = user_info.user_email - user_info_str = f"""\nUser ID: {user_id}\nMax Budget: ${max_budget}\nSpend: ${spend}\nUser Email: {user_email}""" - elif type == "token_budget" or type == "team_budget": - token_info = user_info - token = token_info.token - _id = token - spend = token_info.spend - max_budget = token_info.max_budget - user_id = token_info.user_id - user_info_str = f"""\nToken: {token}\nSpend: ${spend}\nMax Budget: ${max_budget}\nUser ID: {user_id}""" - elif type == "projected_limit_exceeded" and user_info is not None: - """ - Input variables: - user_info = { - "key_alias": key_alias, - "projected_spend": projected_spend, - "projected_exceeded_date": projected_exceeded_date, - } - user_max_budget=soft_limit, - user_current_spend=new_spend - """ - message = f"""\n🚨 `ProjectedLimitExceededError` 💸\n\n`Key Alias:` {user_info.key_alias} \n`Expected Day of Error`: {user_info.projected_exceeded_data} \n`Current Spend`: {user_current_spend} \n`Projected Spend at end of month`: {user_info.projected_spend} \n`Soft Limit`: {user_max_budget}""" - _cache_key = "budget_alerts:projected_limit_exceeded:{}".format(_id) - result = await _cache.async_get_cache(key=_cache_key) - if result is None: - await self.send_alert( - message=message, - level="High", - alert_type="budget_alerts", - user_info=user_info, - ) - await _cache.async_set_cache( - key=_cache_key, - value="SENT", - ttl=self.alerting_args.budget_alert_ttl, - ) - return + user_info_json = user_info.model_dump(exclude_none=True) + for k, v in user_info_json.items(): + user_info_str = "\n{}: {}\n".format(k, v) + + event: Optional[ + Literal["budget_crossed", "threshold_crossed", "projected_limit_exceeded"] + ] = None + event_group: Optional[Literal["user", "team", "key", "proxy"]] = None + event_message: str = "" + webhook_event: Optional[WebhookEvent] = None + if type == "proxy_budget": + event_group = "proxy" + event_message += "Proxy Budget: " + elif type == "user_budget": + event_group = "user" + event_message += "User Budget: " + _id = user_info.user_id or _id + elif type == "team_budget": + event_group = "team" + event_message += "Team Budget: " + _id = user_info.team_id or _id + elif type == "token_budget": + event_group = "key" + event_message += "Key Budget: " + _id = user_info.token + elif type == "projected_limit_exceeded": + event_group = "key" + event_message += "Key Budget: Projected Limit Exceeded" + event = "projected_limit_exceeded" + _id = user_info.token # percent of max_budget left to spend - if user_max_budget > 0: - percent_left = (user_max_budget - user_current_spend) / user_max_budget + if user_info.max_budget > 0: + percent_left = ( + user_info.max_budget - user_info.spend + ) / user_info.max_budget else: percent_left = 0 - verbose_proxy_logger.debug( - f"Budget Alerts: Percent left: {percent_left} for {user_info_str}" - ) # check if crossed budget - if user_current_spend >= user_max_budget: - verbose_proxy_logger.debug("Budget Crossed for %s", user_info_str) - message = "Budget Crossed for" + user_info_str - _cache_key = "budget_alerts:budget_crossed:{}".format(_id) + if user_info.spend >= user_info.max_budget: + event = "budget_crossed" + event_message += "Budget Crossed" + elif percent_left <= 0.05: + event = "threshold_crossed" + event_message += "5% Threshold Crossed" + elif percent_left <= 0.15: + event = "threshold_crossed" + event_message += "15% Threshold Crossed" + + if event is not None and event_group is not None: + _cache_key = "budget_alerts:{}:{}".format(event, _id) result = await _cache.async_get_cache(key=_cache_key) if result is None: + webhook_event = WebhookEvent( + event=event, + event_group=event_group, + event_message=event_message, + **user_info_json, + ) await self.send_alert( - message=message, + message=event_message + "\n\n" + user_info_str, level="High", alert_type="budget_alerts", - user_info=user_info, + user_info=webhook_event, ) await _cache.async_set_cache( key=_cache_key, value="SENT", ttl=self.alerting_args.budget_alert_ttl, ) - return - # check if 5% of max budget is left - if percent_left <= 0.05: - message = "5% budget left for" + user_info_str - _cache_key = "budget_alerts:5_perc_budget_crossed:{}".format(_id) - result = await _cache.async_get_cache(key=_cache_key) - if result is None: - await self.send_alert( - message=message, - level="Medium", - alert_type="budget_alerts", - user_info=user_info, - ) - - await _cache.async_set_cache( - key=_cache_key, - value="SENT", - ttl=self.alerting_args.budget_alert_ttl, - ) - - return - - # check if 15% of max budget is left - if percent_left <= 0.15: - message = "15% budget left for" + user_info_str - _cache_key = "budget_alerts:15_perc_budget_crossed:{}".format(_id) - result = await _cache.async_get_cache(key=_cache_key) - if result is None: - await self.send_alert( - message=message, - level="Low", - alert_type="budget_alerts", - user_info=user_info, - ) - await _cache.async_set_cache( - key=_cache_key, - value="SENT", - ttl=self.alerting_args.budget_alert_ttl, - ) return return @@ -785,9 +748,7 @@ Model Info: async def model_removed_alert(self, model_name: str): pass - async def send_webhook_alert( - self, alert_type: Literal["budget_alerts"], call_info: CallInfo - ) -> bool: + async def send_webhook_alert(self, webhook_event: WebhookEvent) -> bool: """ Sends structured alert to webhook, if set. @@ -800,7 +761,7 @@ Model Info: if webhook_url is None: raise Exception("Missing webhook_url from environment") - payload = call_info.model_dump_json() + payload = webhook_event.model_dump_json() headers = {"Content-type": "application/json"} response = await self.async_http_handler.post( @@ -830,7 +791,7 @@ Model Info: "new_model_added", "cooldown_deployment", ], - user_info: Optional[CallInfo] = None, + user_info: Optional[WebhookEvent] = None, **kwargs, ): """ @@ -855,9 +816,7 @@ Model Info: and alert_type == "budget_alerts" and user_info is not None ): - await self.send_webhook_alert( - alert_type="budget_alerts", call_info=user_info - ) + await self.send_webhook_alert(webhook_event=user_info) if "slack" not in self.alerting: return diff --git a/litellm/proxy/_super_secret_config.yaml b/litellm/proxy/_super_secret_config.yaml index 4277fbacb9..a5d91c5a13 100644 --- a/litellm/proxy/_super_secret_config.yaml +++ b/litellm/proxy/_super_secret_config.yaml @@ -23,4 +23,4 @@ general_settings: alerting: ["slack", "webhook"] environment_variables: - WEBHOOK_URL: http://0.0.0.0:8000 + WEBHOOK_URL: https://webhook.site/6ab090e8-c55f-4a23-b075-3209f5c57906 diff --git a/litellm/proxy/_types.py b/litellm/proxy/_types.py index 443e603c25..fc7a3de796 100644 --- a/litellm/proxy/_types.py +++ b/litellm/proxy/_types.py @@ -1031,11 +1031,12 @@ class TokenCountResponse(LiteLLMBase): class CallInfo(LiteLLMBase): """Used for slack budget alerting""" + spend: float + max_budget: float token: str = Field(description="Hashed value of that key") - spend: float = Field(description="The spend for that key") - max_budget: Optional[float] = None user_id: Optional[str] = None + team_id: Optional[str] = None user_email: Optional[str] = None key_alias: Optional[str] = None - projected_exceeded_data: Optional[str] = None + projected_exceeded_date: Optional[str] = None projected_spend: Optional[float] = None diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index a36e291634..f2ed8dbff4 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -597,9 +597,7 @@ async def user_api_key_auth( ) asyncio.create_task( proxy_logging_obj.budget_alerts( - user_max_budget=litellm.max_budget, - user_current_spend=global_proxy_spend, - type="user_and_proxy_budget", + type="proxy_budget", user_info=user_info, ) ) @@ -923,17 +921,16 @@ async def user_api_key_auth( and user_current_spend is not None ): call_info = CallInfo( - token=valid_token["token"], - spend=valid_token["spend"], + token=valid_token.token, + spend=user_current_spend, + max_budget=user_max_budget, user_id=_user.get("user_id", None), user_email=_user.get("user_email", None), - key_alias=valid_token["key_alias"], + key_alias=valid_token.key_alias, ) asyncio.create_task( proxy_logging_obj.budget_alerts( - user_max_budget=user_max_budget, - user_current_spend=user_current_spend, - type="user_and_proxy_budget", + type="user_budget", user_info=call_info, ) ) @@ -955,19 +952,17 @@ async def user_api_key_auth( and user_current_spend is not None ): call_info = CallInfo( - token=valid_token["token"], - spend=valid_token["spend"], - max_budget=valid_token["max_budget"], + token=valid_token.token, + spend=user_current_spend, + max_budget=user_max_budget, user_id=getattr(user_id_information, "user_id", None), user_email=getattr( user_id_information, "user_email", None ), - key_alias=valid_token["key_alias"], + key_alias=valid_token.key_alias, ) asyncio.create_task( proxy_logging_obj.budget_alerts( - user_max_budget=user_max_budget, - user_current_spend=user_current_spend, type="user_budget", user_info=call_info, ) @@ -1000,15 +995,14 @@ async def user_api_key_auth( # Check 4. Token Spend is under budget if valid_token.spend is not None and valid_token.max_budget is not None: call_info = CallInfo( - token=valid_token["token"], - spend=valid_token["spend"], - max_budget=valid_token["max_budget"], - user_id=valid_token["user_id"], + token=valid_token.token, + spend=valid_token.spend, + max_budget=valid_token.max_budget, + user_id=valid_token.user_id, + team_id=valid_token.team_id, ) asyncio.create_task( proxy_logging_obj.budget_alerts( - user_max_budget=valid_token.max_budget, - user_current_spend=valid_token.spend, type="token_budget", user_info=call_info, ) @@ -1055,33 +1049,8 @@ async def user_api_key_auth( raise Exception( f"ExceededModelBudget: Current spend for model: {current_model_spend}; Max Budget for Model: {current_model_budget}" ) - # Check 6. Token spend is under Team budget - if ( - valid_token.spend is not None - and hasattr(valid_token, "team_max_budget") - and valid_token.team_max_budget is not None - ): - call_info = CallInfo( - token=valid_token["token"], - spend=valid_token["spend"], - max_budget=valid_token["max_budget"], - user_id=valid_token["user_id"], - ) - asyncio.create_task( - proxy_logging_obj.budget_alerts( - user_max_budget=valid_token.team_max_budget, - user_current_spend=valid_token.spend, - type="team_budget", - user_info=call_info, - ) - ) - if valid_token.spend >= valid_token.team_max_budget: - raise Exception( - f"ExceededTokenBudget: Current spend for token: {valid_token.spend}; Max Budget for Team: {valid_token.team_max_budget}" - ) - - # Check 7. Team spend is under Team budget + # Check 6. Team spend is under Team budget if ( hasattr(valid_token, "team_spend") and valid_token.team_spend is not None @@ -1089,15 +1058,13 @@ async def user_api_key_auth( and valid_token.team_max_budget is not None ): call_info = CallInfo( - token=valid_token["token"], - spend=valid_token["spend"], - max_budget=valid_token["max_budget"], - user_id=valid_token["user_id"], + token=valid_token.token, + spend=valid_token.team_spend, + max_budget=valid_token.team_max_budget, + user_id=valid_token.user_id, ) asyncio.create_task( proxy_logging_obj.budget_alerts( - user_max_budget=valid_token.team_max_budget, - user_current_spend=valid_token.team_spend, type="team_budget", user_info=call_info, ) @@ -1150,12 +1117,11 @@ async def user_api_key_auth( spend=global_proxy_spend, max_budget=litellm.max_budget, user_id=litellm_proxy_admin_name, + team_id=valid_token.team_id, ) asyncio.create_task( proxy_logging_obj.budget_alerts( - user_max_budget=litellm.max_budget, - user_current_spend=global_proxy_spend, - type="user_and_proxy_budget", + type="proxy_budget", user_info=call_info, ) ) @@ -1344,7 +1310,7 @@ async def user_api_key_auth( elif isinstance(e, ProxyException): raise e raise ProxyException( - message="Authentication Error, " + str(e), + message="Authentication Error 1234, " + str(e), type="auth_error", param=getattr(e, "param", "None"), code=status.HTTP_401_UNAUTHORIZED, @@ -1761,14 +1727,14 @@ async def update_cache( """ ### UPDATE KEY SPEND ### - async def _update_key_cache(): + async def _update_key_cache(token: str, response_cost: float): # Fetch the existing cost for the given token if isinstance(token, str) and token.startswith("sk-"): hashed_token = hash_token(token=token) else: hashed_token = token verbose_proxy_logger.debug("_update_key_cache: hashed_token=%s", hashed_token) - existing_spend_obj = await user_api_key_cache.async_get_cache(key=hashed_token) + existing_spend_obj: LiteLLM_VerificationTokenView = await user_api_key_cache.async_get_cache(key=hashed_token) # type: ignore verbose_proxy_logger.debug( f"_update_key_cache: existing_spend_obj={existing_spend_obj}" ) @@ -1777,7 +1743,7 @@ async def update_cache( ) if existing_spend_obj is None: existing_spend = 0 - existing_spend_obj = LiteLLM_VerificationTokenView() + existing_spend_obj = LiteLLM_VerificationTokenView(token=token) else: existing_spend = existing_spend_obj.spend # Calculate the new cost by adding the existing cost and response_cost @@ -1791,7 +1757,9 @@ async def update_cache( and ( _is_projected_spend_over_limit( current_spend=new_spend, - soft_budget_limit=existing_spend_obj.litellm_budget_table.soft_budget, + soft_budget_limit=existing_spend_obj.litellm_budget_table[ + "soft_budget" + ], ) == True ) @@ -1807,20 +1775,18 @@ async def update_cache( ) call_info = CallInfo( token=existing_spend_obj.token or "", - spend=existing_spend_obj.spend, + spend=new_spend, key_alias=existing_spend_obj.key_alias, - max_budget=existing_spend_obj.max_budget, + max_budget=soft_limit, user_id=existing_spend_obj.user_id, projected_spend=projected_spend, - projected_exceeded_data=projected_exceeded_date, + projected_exceeded_date=projected_exceeded_date, ) # alert user asyncio.create_task( proxy_logging_obj.budget_alerts( type="projected_limit_exceeded", user_info=call_info, - user_max_budget=soft_limit, - user_current_spend=new_spend, ) ) # set cooldown on alert @@ -1830,7 +1796,7 @@ async def update_cache( existing_spend_obj is not None and getattr(existing_spend_obj, "team_spend", None) is not None ): - existing_team_spend = existing_spend_obj.team_spend + existing_team_spend = existing_spend_obj.team_spend or 0 # Calculate the new cost by adding the existing cost and response_cost existing_spend_obj.team_spend = existing_team_spend + response_cost @@ -1947,8 +1913,8 @@ async def update_cache( f"An error occurred updating end user cache: {str(e)}\n\n{traceback.format_exc()}" ) - if token is not None: - asyncio.create_task(_update_key_cache()) + if token is not None and response_cost is not None: + asyncio.create_task(_update_key_cache(token=token, response_cost=response_cost)) asyncio.create_task(_update_user_cache()) @@ -10231,7 +10197,7 @@ async def test_endpoint(request: Request): async def health_services_endpoint( user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), service: Literal[ - "slack_budget_alerts", "langfuse", "slack", "openmeter" + "slack_budget_alerts", "langfuse", "slack", "openmeter", "webhook" ] = fastapi.Query(description="Specify the service being hit."), ): """ @@ -10246,7 +10212,13 @@ async def health_services_endpoint( raise HTTPException( status_code=400, detail={"error": "Service must be specified."} ) - if service not in ["slack_budget_alerts", "langfuse", "slack", "openmeter"]: + if service not in [ + "slack_budget_alerts", + "langfuse", + "slack", + "openmeter", + "webhook", + ]: raise HTTPException( status_code=400, detail={ @@ -10282,6 +10254,20 @@ async def health_services_endpoint( "message": "Mock LLM request made - check langfuse.", } + if service == "webhook": + user_info = CallInfo( + token=user_api_key_dict.token or "", + spend=1, + max_budget=0, + user_id=user_api_key_dict.user_id, + key_alias=user_api_key_dict.key_alias, + team_id=user_api_key_dict.team_id, + ) + await proxy_logging_obj.budget_alerts( + type="user_budget", + user_info=user_info, + ) + if service == "slack" or service == "slack_budget_alerts": if "slack" in general_settings.get("alerting", []): # test_message = f"""\n🚨 `ProjectedLimitExceededError` 💸\n\n`Key Alias:` litellm-ui-test-alert \n`Expected Day of Error`: 28th March \n`Current Spend`: $100.00 \n`Projected Spend at end of month`: $1000.00 \n`Soft Limit`: $700""" @@ -10357,6 +10343,7 @@ async def health_services_endpoint( }, ) except Exception as e: + traceback.print_exc() if isinstance(e, HTTPException): raise ProxyException( message=getattr(e, "detail", f"Authentication Error({str(e)})"), diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index 4bd793a496..aab43b1e6e 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -290,12 +290,9 @@ class ProxyLogging: "token_budget", "user_budget", "team_budget", - "user_and_proxy_budget", - "failed_budgets", + "proxy_budget", "projected_limit_exceeded", ], - user_max_budget: float, - user_current_spend: float, user_info: CallInfo, ): if self.alerting is None: @@ -303,8 +300,6 @@ class ProxyLogging: return await self.slack_alerting_instance.budget_alerts( type=type, - user_max_budget=user_max_budget, - user_current_spend=user_current_spend, user_info=user_info, ) diff --git a/litellm/tests/test_alerting.py b/litellm/tests/test_alerting.py index b1887f73d2..20c27082fc 100644 --- a/litellm/tests/test_alerting.py +++ b/litellm/tests/test_alerting.py @@ -431,7 +431,6 @@ async def test_send_daily_reports_all_zero_or_none(): "user_budget", "team_budget", "user_and_proxy_budget", - "failed_budgets", "projected_limit_exceeded", ], ) @@ -470,7 +469,6 @@ async def test_send_token_budget_crossed_alerts(alerting_type): "user_budget", "team_budget", "user_and_proxy_budget", - "failed_budgets", "projected_limit_exceeded", ], )