diff --git a/litellm/integrations/slack_alerting.py b/litellm/integrations/slack_alerting.py index 620320f674..4e89d41111 100644 --- a/litellm/integrations/slack_alerting.py +++ b/litellm/integrations/slack_alerting.py @@ -4,7 +4,7 @@ import dotenv, os, traceback from litellm.proxy._types import UserAPIKeyAuth, CallInfo, AlertType from litellm._logging import verbose_logger, verbose_proxy_logger import litellm, threading -from typing import List, Literal, Any, Union, Optional, Dict +from typing import List, Literal, Any, Union, Optional, Dict, Set from litellm.caching import DualCache import asyncio, time import aiohttp @@ -20,17 +20,25 @@ from typing import TypedDict from openai import APIError import litellm.types -import litellm.types.router +from litellm.types.router import LiteLLM_Params -class OutageModel(TypedDict): - model_id: str +class BaseOutageModel(TypedDict): alerts: List[int] - deployment_ids: List[str] minor_alert_sent: bool major_alert_sent: bool last_updated_at: float + +class OutageModel(BaseOutageModel): + model_id: str + + +class ProviderRegionOutageModel(BaseOutageModel): + provider_region_id: str + deployment_ids: Set[str] + + # we use this for the email header, please send a test email if you change this. verify it looks good on email LITELLM_LOGO_URL = "https://litellm-listing.s3.amazonaws.com/litellm_logo.png" EMAIL_LOGO_URL = os.getenv( @@ -60,6 +68,7 @@ class SlackAlertingArgs(LiteLLMBase): report_check_interval: int = 5 * 60 # 5 minutes budget_alert_ttl: int = 24 * 60 * 60 # 24 hours outage_alert_ttl: int = 1 * 60 # 1 minute ttl + region_outage_alert_ttl: int = 1 * 60 # 1 minute ttl minor_outage_alert_threshold: int = 5 major_outage_alert_threshold: int = 10 max_outage_alert_list_size: int = 10 # prevent memory leak @@ -736,6 +745,163 @@ class SlackAlerting(CustomLogger): return error_msg + def _outage_alert_msg_factory( + self, + alert_type: Literal["Major", "Minor"], + key: Literal["Model", "Region"], + key_val: str, + provider: str, + api_base: Optional[str], + outage_value: BaseOutageModel, + ) -> str: + """Format an alert message for slack""" + headers = {f"{key} Name": key_val, "Provider": provider} + if api_base is not None: + headers["API Base"] = api_base # type: ignore + + headers_str = "\n" + for k, v in headers.items(): + headers_str += f"*{k}:* `{v}`\n" + return f"""\n\n +*⚠️ {alert_type} Service Outage* + +{headers_str} + +*Errors:* +{self._count_outage_alerts(alerts=outage_value["alerts"])} + +*Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n +""" + + async def region_outage_alerts( + self, + exception: APIError, + deployment_id: str, + ) -> None: + """ + Send slack alert if specific provider region is having an outage. + + Track for 408 (Timeout) and >=500 Error codes + """ + ## CREATE (PROVIDER+REGION) ID ## + if self.llm_router is None: + return + + deployment = self.llm_router.get_deployment(model_id=deployment_id) + + if deployment is None: + return + + model = deployment.litellm_params.model + ### GET PROVIDER ### + provider = deployment.litellm_params.custom_llm_provider + if provider is None: + model, provider, _, _ = litellm.get_llm_provider(model=model) + + ### GET REGION ### + region_name = deployment.litellm_params.region_name + if region_name is None: + region_name = litellm.utils._get_model_region( + custom_llm_provider=provider, litellm_params=deployment.litellm_params + ) + + if region_name is None: + return + + ### UNIQUE CACHE KEY ### + cache_key = provider + region_name + + outage_value: Optional[ProviderRegionOutageModel] = ( + await self.internal_usage_cache.async_get_cache(key=cache_key) + ) + + if ( + getattr(exception, "status_code", None) is None + or ( + exception.status_code != 408 # type: ignore + and exception.status_code < 500 # type: ignore + ) + or self.llm_router is None + ): + return + + if outage_value is None: + _deployment_set = set() + _deployment_set.add(deployment_id) + outage_value = ProviderRegionOutageModel( + provider_region_id=cache_key, + alerts=[exception.status_code], # type: ignore + minor_alert_sent=False, + major_alert_sent=False, + last_updated_at=time.time(), + deployment_ids=_deployment_set, + ) + + ## add to cache ## + await self.internal_usage_cache.async_set_cache( + key=cache_key, + value=outage_value, + ttl=self.alerting_args.region_outage_alert_ttl, + ) + return + + if len(outage_value["alerts"]) < self.alerting_args.max_outage_alert_list_size: + outage_value["alerts"].append(exception.status_code) # type: ignore + else: # prevent memory leaks + pass + _deployment_set = outage_value["deployment_ids"] + _deployment_set.add(deployment_id) + outage_value["deployment_ids"] = _deployment_set + outage_value["last_updated_at"] = time.time() + + ## MINOR OUTAGE ALERT SENT ## + if ( + outage_value["minor_alert_sent"] == False + and len(outage_value["alerts"]) + >= self.alerting_args.minor_outage_alert_threshold + and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment + ): + msg = self._outage_alert_msg_factory( + alert_type="Minor", + key="Region", + key_val=region_name, + api_base=None, + outage_value=outage_value, + provider=provider, + ) + # send minor alert + await self.send_alert( + message=msg, level="Medium", alert_type="outage_alerts" + ) + # set to true + outage_value["minor_alert_sent"] = True + + ## MAJOR OUTAGE ALERT SENT ## + elif ( + outage_value["major_alert_sent"] == False + and len(outage_value["alerts"]) + >= self.alerting_args.major_outage_alert_threshold + and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment + ): + msg = self._outage_alert_msg_factory( + alert_type="Major", + key="Region", + key_val=region_name, + api_base=None, + outage_value=outage_value, + provider=provider, + ) + + # send minor alert + await self.send_alert(message=msg, level="High", alert_type="outage_alerts") + # set to true + outage_value["major_alert_sent"] = True + + ## update cache ## + await self.internal_usage_cache.async_set_cache( + key=cache_key, value=outage_value + ) + async def outage_alerts( self, exception: APIError, @@ -787,7 +953,6 @@ class SlackAlerting(CustomLogger): outage_value = OutageModel( model_id=deployment_id, alerts=[exception.status_code], # type: ignore - deployment_ids=[deployment_id], minor_alert_sent=False, major_alert_sent=False, last_updated_at=time.time(), @@ -801,8 +966,14 @@ class SlackAlerting(CustomLogger): ) return - outage_value["alerts"].append(exception.status_code) # type: ignore - outage_value["deployment_ids"].append(deployment_id) + if ( + len(outage_value["alerts"]) + < self.alerting_args.max_outage_alert_list_size + ): + outage_value["alerts"].append(exception.status_code) # type: ignore + else: # prevent memory leaks + pass + outage_value["last_updated_at"] = time.time() ## MINOR OUTAGE ALERT SENT ## @@ -811,25 +982,18 @@ class SlackAlerting(CustomLogger): and len(outage_value["alerts"]) >= self.alerting_args.minor_outage_alert_threshold ): - msg = f"""\n\n -*⚠️ Minor Service Outage* - -*Model Name:* `{model}` -*Provider:* `{provider}` -*API Base:* `{api_base}` - -*Errors:* -{self._count_outage_alerts(alerts=outage_value["alerts"])} - - -*Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n -""" + msg = self._outage_alert_msg_factory( + alert_type="Minor", + key="Model", + key_val=model, + api_base=api_base, + outage_value=outage_value, + provider=provider, + ) # send minor alert - _result_val = self.send_alert( + await self.send_alert( message=msg, level="Medium", alert_type="outage_alerts" ) - if _result_val is not None: - await _result_val # set to true outage_value["minor_alert_sent"] = True elif ( @@ -837,19 +1001,14 @@ class SlackAlerting(CustomLogger): and len(outage_value["alerts"]) >= self.alerting_args.major_outage_alert_threshold ): - msg = f"""\n\n -*⚠️ Major Service Outage* - -*Model Name:* `{model}` -*Provider:* `{provider}` -*API Base:* `{api_base}` - -*Errors:* -{self._count_outage_alerts(alerts=outage_value["alerts"])} - - -*Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n -""" + msg = self._outage_alert_msg_factory( + alert_type="Major", + key="Model", + key_val=model, + api_base=api_base, + outage_value=outage_value, + provider=provider, + ) # send minor alert await self.send_alert( message=msg, level="High", alert_type="outage_alerts" @@ -1103,18 +1262,7 @@ Model Info: self, message: str, level: Literal["Low", "Medium", "High"], - alert_type: Literal[ - "llm_exceptions", - "llm_too_slow", - "llm_requests_hanging", - "budget_alerts", - "db_exceptions", - "daily_reports", - "spend_reports", - "new_model_added", - "cooldown_deployment", - "outage_alerts", - ], + alert_type: Literal[AlertType], user_info: Optional[WebhookEvent] = None, **kwargs, ): @@ -1254,34 +1402,17 @@ Model Info: except Exception as e: verbose_logger.debug(f"Exception raises -{str(e)}") - if "outage_alerts" in self.alert_types and isinstance( - kwargs.get("exception", ""), APIError - ): - _litellm_params = litellm.types.router.LiteLLM_Params( - model=kwargs.get("model", ""), - **kwargs.get("litellm_params", {}), - **kwargs.get("optional_params", {}), - ) - _region_name = litellm.utils._get_model_region( - custom_llm_provider=kwargs.get("custom_llm_provider", ""), - litellm_params=_litellm_params, - ) - # if region name not known, default to api base # - if _region_name is None: - _region_name = litellm.get_api_base( - model=kwargs.get("model", ""), - optional_params={ - **kwargs.get("litellm_params", {}), - **kwargs.get("optional_params", {}), - }, + if isinstance(kwargs.get("exception", ""), APIError): + if "outage_alerts" in self.alert_types: + await self.outage_alerts( + exception=kwargs["exception"], + deployment_id=model_id, ) - if _region_name is None: - _region_name = "" - await self.outage_alerts( - exception=kwargs["exception"], - deployment_id=model_id, - ) + if "region_outage_alerts" in self.alert_types: + await self.region_outage_alerts( + exception=kwargs["exception"], deployment_id=model_id + ) except Exception as e: pass diff --git a/litellm/proxy/_types.py b/litellm/proxy/_types.py index e8b3e65720..55d354e8b9 100644 --- a/litellm/proxy/_types.py +++ b/litellm/proxy/_types.py @@ -18,6 +18,7 @@ AlertType = Literal[ "cooldown_deployment", "new_model_added", "outage_alerts", + "region_outage_alerts", ] diff --git a/litellm/tests/test_alerting.py b/litellm/tests/test_alerting.py index 54555464a3..bcec0e6b22 100644 --- a/litellm/tests/test_alerting.py +++ b/litellm/tests/test_alerting.py @@ -576,7 +576,9 @@ async def test_outage_alerting_called( slack_alerting.update_values(llm_router=router) with patch.object( slack_alerting, "outage_alerts", new=AsyncMock() - ) as mock_send_alert: + ) as mock_outage_alert, patch.object( + slack_alerting, "region_outage_alerts", new=AsyncMock() + ) as mock_region_alert: try: await router.acompletion( model=model, @@ -586,7 +588,8 @@ async def test_outage_alerting_called( except Exception as e: pass - mock_send_alert.assert_called_once() + mock_outage_alert.assert_called_once() + mock_region_alert.assert_called_once() with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: for _ in range(6): @@ -600,6 +603,112 @@ async def test_outage_alerting_called( pass await asyncio.sleep(3) if error_code == 500 or error_code == 408: + assert ( + mock_send_alert.assert_called_once() + ) # only model alert. region alert should only trigger for 2+ models in same region + else: + mock_send_alert.assert_not_called() + + +@pytest.mark.parametrize( + "model, api_base, llm_provider, vertex_project, vertex_location", + [ + ("gpt-3.5-turbo", None, "openai", None, None), + ( + "azure/gpt-3.5-turbo", + "https://openai-gpt-4-test-v-1.openai.azure.com", + "azure", + None, + None, + ), + ("gemini-pro", None, "vertex_ai", "hardy-device-38811", "us-central1"), + ], +) +@pytest.mark.parametrize("error_code", [500, 408, 400]) +@pytest.mark.asyncio +async def test_region_outage_alerting_called( + model, api_base, llm_provider, vertex_project, vertex_location, error_code +): + """ + If call fails, outage alert is called + + If multiple calls fail, outage alert is sent + """ + slack_alerting = SlackAlerting( + alerting=["webhook"], alert_types=["region_outage_alerts"] + ) + + litellm.callbacks = [slack_alerting] + + error_to_raise: Optional[APIError] = None + + if error_code == 400: + print("RAISING 400 ERROR CODE") + error_to_raise = litellm.BadRequestError( + message="this is a bad request", + model=model, + llm_provider=llm_provider, + ) + elif error_code == 408: + print("RAISING 408 ERROR CODE") + error_to_raise = litellm.Timeout( + message="A timeout occurred", model=model, llm_provider=llm_provider + ) + elif error_code == 500: + print("RAISING 500 ERROR CODE") + error_to_raise = litellm.ServiceUnavailableError( + message="API is unavailable", + model=model, + llm_provider=llm_provider, + response=httpx.Response( + status_code=503, + request=httpx.Request( + method="completion", + url="https://github.com/BerriAI/litellm", + ), + ), + ) + + router = Router( + model_list=[ + { + "model_name": model, + "litellm_params": { + "model": model, + "api_key": os.getenv("AZURE_API_KEY"), + "api_base": api_base, + "vertex_location": vertex_location, + "vertex_project": vertex_project, + }, + "model_info": {"id": "1"}, + }, + { + "model_name": model, + "litellm_params": { + "model": model, + "api_key": os.getenv("AZURE_API_KEY"), + "api_base": api_base, + "vertex_location": vertex_location, + "vertex_project": "vertex_project-2", + }, + "model_info": {"id": "2"}, + }, + ], + num_retries=0, + allowed_fails=100, + ) + + slack_alerting.update_values(llm_router=router) + with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert: + for idx in range(6): + if idx % 2 == 0: + deployment_id = "1" + else: + deployment_id = "2" + await slack_alerting.region_outage_alerts( + exception=error_to_raise, deployment_id=deployment_id # type: ignore + ) + if model == "gemini-pro" and (error_code == 500 or error_code == 408): mock_send_alert.assert_called_once() else: mock_send_alert.assert_not_called()