feat(slack_alerting.py): enable provider-region based alerting

This commit is contained in:
Krrish Dholakia 2024-05-25 16:55:46 -07:00
parent 7694622007
commit e4629ba65d
3 changed files with 318 additions and 77 deletions

View file

@ -4,7 +4,7 @@ import dotenv, os, traceback
from litellm.proxy._types import UserAPIKeyAuth, CallInfo, AlertType
from litellm._logging import verbose_logger, verbose_proxy_logger
import litellm, threading
from typing import List, Literal, Any, Union, Optional, Dict
from typing import List, Literal, Any, Union, Optional, Dict, Set
from litellm.caching import DualCache
import asyncio, time
import aiohttp
@ -20,17 +20,25 @@ from typing import TypedDict
from openai import APIError
import litellm.types
import litellm.types.router
from litellm.types.router import LiteLLM_Params
class OutageModel(TypedDict):
model_id: str
class BaseOutageModel(TypedDict):
alerts: List[int]
deployment_ids: List[str]
minor_alert_sent: bool
major_alert_sent: bool
last_updated_at: float
class OutageModel(BaseOutageModel):
model_id: str
class ProviderRegionOutageModel(BaseOutageModel):
provider_region_id: str
deployment_ids: Set[str]
# we use this for the email header, please send a test email if you change this. verify it looks good on email
LITELLM_LOGO_URL = "https://litellm-listing.s3.amazonaws.com/litellm_logo.png"
EMAIL_LOGO_URL = os.getenv(
@ -60,6 +68,7 @@ class SlackAlertingArgs(LiteLLMBase):
report_check_interval: int = 5 * 60 # 5 minutes
budget_alert_ttl: int = 24 * 60 * 60 # 24 hours
outage_alert_ttl: int = 1 * 60 # 1 minute ttl
region_outage_alert_ttl: int = 1 * 60 # 1 minute ttl
minor_outage_alert_threshold: int = 5
major_outage_alert_threshold: int = 10
max_outage_alert_list_size: int = 10 # prevent memory leak
@ -736,6 +745,163 @@ class SlackAlerting(CustomLogger):
return error_msg
def _outage_alert_msg_factory(
self,
alert_type: Literal["Major", "Minor"],
key: Literal["Model", "Region"],
key_val: str,
provider: str,
api_base: Optional[str],
outage_value: BaseOutageModel,
) -> str:
"""Format an alert message for slack"""
headers = {f"{key} Name": key_val, "Provider": provider}
if api_base is not None:
headers["API Base"] = api_base # type: ignore
headers_str = "\n"
for k, v in headers.items():
headers_str += f"*{k}:* `{v}`\n"
return f"""\n\n
* {alert_type} Service Outage*
{headers_str}
*Errors:*
{self._count_outage_alerts(alerts=outage_value["alerts"])}
*Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n
"""
async def region_outage_alerts(
self,
exception: APIError,
deployment_id: str,
) -> None:
"""
Send slack alert if specific provider region is having an outage.
Track for 408 (Timeout) and >=500 Error codes
"""
## CREATE (PROVIDER+REGION) ID ##
if self.llm_router is None:
return
deployment = self.llm_router.get_deployment(model_id=deployment_id)
if deployment is None:
return
model = deployment.litellm_params.model
### GET PROVIDER ###
provider = deployment.litellm_params.custom_llm_provider
if provider is None:
model, provider, _, _ = litellm.get_llm_provider(model=model)
### GET REGION ###
region_name = deployment.litellm_params.region_name
if region_name is None:
region_name = litellm.utils._get_model_region(
custom_llm_provider=provider, litellm_params=deployment.litellm_params
)
if region_name is None:
return
### UNIQUE CACHE KEY ###
cache_key = provider + region_name
outage_value: Optional[ProviderRegionOutageModel] = (
await self.internal_usage_cache.async_get_cache(key=cache_key)
)
if (
getattr(exception, "status_code", None) is None
or (
exception.status_code != 408 # type: ignore
and exception.status_code < 500 # type: ignore
)
or self.llm_router is None
):
return
if outage_value is None:
_deployment_set = set()
_deployment_set.add(deployment_id)
outage_value = ProviderRegionOutageModel(
provider_region_id=cache_key,
alerts=[exception.status_code], # type: ignore
minor_alert_sent=False,
major_alert_sent=False,
last_updated_at=time.time(),
deployment_ids=_deployment_set,
)
## add to cache ##
await self.internal_usage_cache.async_set_cache(
key=cache_key,
value=outage_value,
ttl=self.alerting_args.region_outage_alert_ttl,
)
return
if len(outage_value["alerts"]) < self.alerting_args.max_outage_alert_list_size:
outage_value["alerts"].append(exception.status_code) # type: ignore
else: # prevent memory leaks
pass
_deployment_set = outage_value["deployment_ids"]
_deployment_set.add(deployment_id)
outage_value["deployment_ids"] = _deployment_set
outage_value["last_updated_at"] = time.time()
## MINOR OUTAGE ALERT SENT ##
if (
outage_value["minor_alert_sent"] == False
and len(outage_value["alerts"])
>= self.alerting_args.minor_outage_alert_threshold
and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment
):
msg = self._outage_alert_msg_factory(
alert_type="Minor",
key="Region",
key_val=region_name,
api_base=None,
outage_value=outage_value,
provider=provider,
)
# send minor alert
await self.send_alert(
message=msg, level="Medium", alert_type="outage_alerts"
)
# set to true
outage_value["minor_alert_sent"] = True
## MAJOR OUTAGE ALERT SENT ##
elif (
outage_value["major_alert_sent"] == False
and len(outage_value["alerts"])
>= self.alerting_args.major_outage_alert_threshold
and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment
):
msg = self._outage_alert_msg_factory(
alert_type="Major",
key="Region",
key_val=region_name,
api_base=None,
outage_value=outage_value,
provider=provider,
)
# send minor alert
await self.send_alert(message=msg, level="High", alert_type="outage_alerts")
# set to true
outage_value["major_alert_sent"] = True
## update cache ##
await self.internal_usage_cache.async_set_cache(
key=cache_key, value=outage_value
)
async def outage_alerts(
self,
exception: APIError,
@ -787,7 +953,6 @@ class SlackAlerting(CustomLogger):
outage_value = OutageModel(
model_id=deployment_id,
alerts=[exception.status_code], # type: ignore
deployment_ids=[deployment_id],
minor_alert_sent=False,
major_alert_sent=False,
last_updated_at=time.time(),
@ -801,8 +966,14 @@ class SlackAlerting(CustomLogger):
)
return
if (
len(outage_value["alerts"])
< self.alerting_args.max_outage_alert_list_size
):
outage_value["alerts"].append(exception.status_code) # type: ignore
outage_value["deployment_ids"].append(deployment_id)
else: # prevent memory leaks
pass
outage_value["last_updated_at"] = time.time()
## MINOR OUTAGE ALERT SENT ##
@ -811,25 +982,18 @@ class SlackAlerting(CustomLogger):
and len(outage_value["alerts"])
>= self.alerting_args.minor_outage_alert_threshold
):
msg = f"""\n\n
* Minor Service Outage*
*Model Name:* `{model}`
*Provider:* `{provider}`
*API Base:* `{api_base}`
*Errors:*
{self._count_outage_alerts(alerts=outage_value["alerts"])}
*Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n
"""
msg = self._outage_alert_msg_factory(
alert_type="Minor",
key="Model",
key_val=model,
api_base=api_base,
outage_value=outage_value,
provider=provider,
)
# send minor alert
_result_val = self.send_alert(
await self.send_alert(
message=msg, level="Medium", alert_type="outage_alerts"
)
if _result_val is not None:
await _result_val
# set to true
outage_value["minor_alert_sent"] = True
elif (
@ -837,19 +1001,14 @@ class SlackAlerting(CustomLogger):
and len(outage_value["alerts"])
>= self.alerting_args.major_outage_alert_threshold
):
msg = f"""\n\n
* Major Service Outage*
*Model Name:* `{model}`
*Provider:* `{provider}`
*API Base:* `{api_base}`
*Errors:*
{self._count_outage_alerts(alerts=outage_value["alerts"])}
*Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n
"""
msg = self._outage_alert_msg_factory(
alert_type="Major",
key="Model",
key_val=model,
api_base=api_base,
outage_value=outage_value,
provider=provider,
)
# send minor alert
await self.send_alert(
message=msg, level="High", alert_type="outage_alerts"
@ -1103,18 +1262,7 @@ Model Info:
self,
message: str,
level: Literal["Low", "Medium", "High"],
alert_type: Literal[
"llm_exceptions",
"llm_too_slow",
"llm_requests_hanging",
"budget_alerts",
"db_exceptions",
"daily_reports",
"spend_reports",
"new_model_added",
"cooldown_deployment",
"outage_alerts",
],
alert_type: Literal[AlertType],
user_info: Optional[WebhookEvent] = None,
**kwargs,
):
@ -1254,34 +1402,17 @@ Model Info:
except Exception as e:
verbose_logger.debug(f"Exception raises -{str(e)}")
if "outage_alerts" in self.alert_types and isinstance(
kwargs.get("exception", ""), APIError
):
_litellm_params = litellm.types.router.LiteLLM_Params(
model=kwargs.get("model", ""),
**kwargs.get("litellm_params", {}),
**kwargs.get("optional_params", {}),
)
_region_name = litellm.utils._get_model_region(
custom_llm_provider=kwargs.get("custom_llm_provider", ""),
litellm_params=_litellm_params,
)
# if region name not known, default to api base #
if _region_name is None:
_region_name = litellm.get_api_base(
model=kwargs.get("model", ""),
optional_params={
**kwargs.get("litellm_params", {}),
**kwargs.get("optional_params", {}),
},
)
if _region_name is None:
_region_name = ""
if isinstance(kwargs.get("exception", ""), APIError):
if "outage_alerts" in self.alert_types:
await self.outage_alerts(
exception=kwargs["exception"],
deployment_id=model_id,
)
if "region_outage_alerts" in self.alert_types:
await self.region_outage_alerts(
exception=kwargs["exception"], deployment_id=model_id
)
except Exception as e:
pass

View file

@ -18,6 +18,7 @@ AlertType = Literal[
"cooldown_deployment",
"new_model_added",
"outage_alerts",
"region_outage_alerts",
]

View file

@ -576,7 +576,9 @@ async def test_outage_alerting_called(
slack_alerting.update_values(llm_router=router)
with patch.object(
slack_alerting, "outage_alerts", new=AsyncMock()
) as mock_send_alert:
) as mock_outage_alert, patch.object(
slack_alerting, "region_outage_alerts", new=AsyncMock()
) as mock_region_alert:
try:
await router.acompletion(
model=model,
@ -586,7 +588,8 @@ async def test_outage_alerting_called(
except Exception as e:
pass
mock_send_alert.assert_called_once()
mock_outage_alert.assert_called_once()
mock_region_alert.assert_called_once()
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert:
for _ in range(6):
@ -600,6 +603,112 @@ async def test_outage_alerting_called(
pass
await asyncio.sleep(3)
if error_code == 500 or error_code == 408:
assert (
mock_send_alert.assert_called_once()
) # only model alert. region alert should only trigger for 2+ models in same region
else:
mock_send_alert.assert_not_called()
@pytest.mark.parametrize(
"model, api_base, llm_provider, vertex_project, vertex_location",
[
("gpt-3.5-turbo", None, "openai", None, None),
(
"azure/gpt-3.5-turbo",
"https://openai-gpt-4-test-v-1.openai.azure.com",
"azure",
None,
None,
),
("gemini-pro", None, "vertex_ai", "hardy-device-38811", "us-central1"),
],
)
@pytest.mark.parametrize("error_code", [500, 408, 400])
@pytest.mark.asyncio
async def test_region_outage_alerting_called(
model, api_base, llm_provider, vertex_project, vertex_location, error_code
):
"""
If call fails, outage alert is called
If multiple calls fail, outage alert is sent
"""
slack_alerting = SlackAlerting(
alerting=["webhook"], alert_types=["region_outage_alerts"]
)
litellm.callbacks = [slack_alerting]
error_to_raise: Optional[APIError] = None
if error_code == 400:
print("RAISING 400 ERROR CODE")
error_to_raise = litellm.BadRequestError(
message="this is a bad request",
model=model,
llm_provider=llm_provider,
)
elif error_code == 408:
print("RAISING 408 ERROR CODE")
error_to_raise = litellm.Timeout(
message="A timeout occurred", model=model, llm_provider=llm_provider
)
elif error_code == 500:
print("RAISING 500 ERROR CODE")
error_to_raise = litellm.ServiceUnavailableError(
message="API is unavailable",
model=model,
llm_provider=llm_provider,
response=httpx.Response(
status_code=503,
request=httpx.Request(
method="completion",
url="https://github.com/BerriAI/litellm",
),
),
)
router = Router(
model_list=[
{
"model_name": model,
"litellm_params": {
"model": model,
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": api_base,
"vertex_location": vertex_location,
"vertex_project": vertex_project,
},
"model_info": {"id": "1"},
},
{
"model_name": model,
"litellm_params": {
"model": model,
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": api_base,
"vertex_location": vertex_location,
"vertex_project": "vertex_project-2",
},
"model_info": {"id": "2"},
},
],
num_retries=0,
allowed_fails=100,
)
slack_alerting.update_values(llm_router=router)
with patch.object(slack_alerting, "send_alert", new=AsyncMock()) as mock_send_alert:
for idx in range(6):
if idx % 2 == 0:
deployment_id = "1"
else:
deployment_id = "2"
await slack_alerting.region_outage_alerts(
exception=error_to_raise, deployment_id=deployment_id # type: ignore
)
if model == "gemini-pro" and (error_code == 500 or error_code == 408):
mock_send_alert.assert_called_once()
else:
mock_send_alert.assert_not_called()