[Feat-Perf] Use Batching + Squashing (#5645)

* use folder for slack alerting

* clean up slack alerting

* fix test alerting
This commit is contained in:
Ishaan Jaff 2024-09-12 18:37:53 -07:00 committed by GitHub
parent fe5e0bcd15
commit e7c9716841
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 249 additions and 156 deletions

View file

@ -0,0 +1,65 @@
"""
Handles Batching + sending Httpx Post requests to slack
Slack alerts are sent every 10s or when events are greater than X events
see custom_batch_logger.py for more details / defaults
"""
import os
from typing import TYPE_CHECKING, Any, List, Literal, Optional, Union
from litellm._logging import verbose_logger, verbose_proxy_logger
from litellm.proxy._types import AlertType, WebhookEvent
if TYPE_CHECKING:
from .slack_alerting import SlackAlerting as _SlackAlerting
SlackAlertingType = _SlackAlerting
else:
SlackAlertingType = Any
def squash_payloads(queue):
import json
squashed = {}
if len(queue) == 0:
return squashed
if len(queue) == 1:
return {"key": {"item": queue[0], "count": 1}}
for item in queue:
url = item["url"]
alert_type = item["alert_type"]
_key = (url, alert_type)
if _key in squashed:
squashed[_key]["count"] += 1
# Merge the payloads
else:
squashed[_key] = {"item": item, "count": 1}
return squashed
async def send_to_webhook(slackAlertingInstance: SlackAlertingType, item, count):
import json
try:
payload = item["payload"]
if count > 1:
payload["text"] = f"[Num Alerts: {count}]\n\n{payload['text']}"
response = await slackAlertingInstance.async_http_handler.post(
url=item["url"],
headers=item["headers"],
data=json.dumps(payload),
)
if response.status_code != 200:
verbose_proxy_logger.debug(
f"Error sending slack alert to url={item['url']}. Error={response.text}"
)
except Exception as e:
verbose_proxy_logger.debug(f"Error sending slack alert: {str(e)}")

View file

@ -15,7 +15,6 @@ from typing import Any, Dict, List, Literal, Optional, Set, TypedDict, Union
import aiohttp
import dotenv
from openai import APIError
from pydantic import BaseModel, Field
import litellm
import litellm.litellm_core_utils
@ -23,7 +22,7 @@ import litellm.litellm_core_utils.litellm_logging
import litellm.types
from litellm._logging import verbose_logger, verbose_proxy_logger
from litellm.caching import DualCache
from litellm.integrations.custom_logger import CustomLogger
from litellm.integrations.custom_batch_logger import CustomBatchLogger
from litellm.litellm_core_utils.litellm_logging import Logging
from litellm.llms.custom_httpx.http_handler import (
AsyncHTTPHandler,
@ -39,125 +38,12 @@ from litellm.proxy._types import (
)
from litellm.types.router import LiteLLM_Params
from .email_templates.templates import *
from ..email_templates.templates import *
from .batching_handler import send_to_webhook, squash_payloads
from .types import *
class BaseOutageModel(TypedDict):
alerts: List[int]
minor_alert_sent: bool
major_alert_sent: bool
last_updated_at: float
class OutageModel(BaseOutageModel):
model_id: str
class ProviderRegionOutageModel(BaseOutageModel):
provider_region_id: str
deployment_ids: Set[str]
# we use this for the email header, please send a test email if you change this. verify it looks good on email
LITELLM_LOGO_URL = "https://litellm-listing.s3.amazonaws.com/litellm_logo.png"
LITELLM_SUPPORT_CONTACT = "support@berri.ai"
class LiteLLMBase(BaseModel):
"""
Implements default functions, all pydantic objects should have.
"""
def json(self, **kwargs):
try:
return self.model_dump() # noqa
except:
# if using pydantic v1
return self.dict()
class SlackAlertingArgsEnum(Enum):
daily_report_frequency: int = 12 * 60 * 60
report_check_interval: int = 5 * 60
budget_alert_ttl: int = 24 * 60 * 60
outage_alert_ttl: int = 1 * 60
region_outage_alert_ttl: int = 1 * 60
minor_outage_alert_threshold: int = 1 * 5
major_outage_alert_threshold: int = 1 * 10
max_outage_alert_list_size: int = 1 * 10
class SlackAlertingArgs(LiteLLMBase):
daily_report_frequency: int = Field(
default=int(
os.getenv(
"SLACK_DAILY_REPORT_FREQUENCY",
SlackAlertingArgsEnum.daily_report_frequency.value,
)
),
description="Frequency of receiving deployment latency/failure reports. Default is 12hours. Value is in seconds.",
)
report_check_interval: int = Field(
default=SlackAlertingArgsEnum.report_check_interval.value,
description="Frequency of checking cache if report should be sent. Background process. Default is once per hour. Value is in seconds.",
) # 5 minutes
budget_alert_ttl: int = Field(
default=SlackAlertingArgsEnum.budget_alert_ttl.value,
description="Cache ttl for budgets alerts. Prevents spamming same alert, each time budget is crossed. Value is in seconds.",
) # 24 hours
outage_alert_ttl: int = Field(
default=SlackAlertingArgsEnum.outage_alert_ttl.value,
description="Cache ttl for model outage alerts. Sets time-window for errors. Default is 1 minute. Value is in seconds.",
) # 1 minute ttl
region_outage_alert_ttl: int = Field(
default=SlackAlertingArgsEnum.region_outage_alert_ttl.value,
description="Cache ttl for provider-region based outage alerts. Alert sent if 2+ models in same region report errors. Sets time-window for errors. Default is 1 minute. Value is in seconds.",
) # 1 minute ttl
minor_outage_alert_threshold: int = Field(
default=SlackAlertingArgsEnum.minor_outage_alert_threshold.value,
description="The number of errors that count as a model/region minor outage. ('400' error code is not counted).",
)
major_outage_alert_threshold: int = Field(
default=SlackAlertingArgsEnum.major_outage_alert_threshold.value,
description="The number of errors that countas a model/region major outage. ('400' error code is not counted).",
)
max_outage_alert_list_size: int = Field(
default=SlackAlertingArgsEnum.max_outage_alert_list_size.value,
description="Maximum number of errors to store in cache. For a given model/region. Prevents memory leaks.",
) # prevent memory leak
class DeploymentMetrics(LiteLLMBase):
"""
Metrics per deployment, stored in cache
Used for daily reporting
"""
id: str
"""id of deployment in router model list"""
failed_request: bool
"""did it fail the request?"""
latency_per_output_token: Optional[float]
"""latency/output token of deployment"""
updated_at: dt
"""Current time of deployment being updated"""
class SlackAlertingCacheKeys(Enum):
"""
Enum for deployment daily metrics keys - {deployment_id}:{enum}
"""
failed_requests_key = "failed_requests_daily_metrics"
latency_key = "latency_daily_metrics"
report_sent_key = "daily_metrics_report_sent"
class SlackAlerting(CustomLogger):
class SlackAlerting(CustomBatchLogger):
"""
Class for sending Slack Alerts
"""
@ -186,6 +72,7 @@ class SlackAlerting(CustomLogger):
] = None, # if user wants to separate alerts to diff channels
alerting_args={},
default_webhook_url: Optional[str] = None,
**kwargs,
):
self.alerting_threshold = alerting_threshold
self.alerting = alerting
@ -198,7 +85,8 @@ class SlackAlerting(CustomLogger):
self.is_running = False
self.alerting_args = SlackAlertingArgs(**alerting_args)
self.default_webhook_url = default_webhook_url
self.llm_router: Optional[litellm.Router] = None
self.flush_lock = asyncio.Lock()
super().__init__(**kwargs, flush_lock=self.flush_lock)
def update_values(
self,
@ -226,6 +114,8 @@ class SlackAlerting(CustomLogger):
if llm_router is not None:
self.llm_router = llm_router
asyncio.create_task(self.periodic_flush())
async def deployment_in_cooldown(self):
pass
@ -1534,38 +1424,42 @@ Model Info:
payload = {"text": formatted_message}
headers = {"Content-type": "application/json"}
async def send_to_webhook(url: str):
return await self.async_http_handler.post(
url=url,
headers=headers,
data=json.dumps(payload),
)
if isinstance(slack_webhook_url, list):
# Parallelize the calls if it's a list of URLs
responses = await asyncio.gather(
*[send_to_webhook(url) for url in slack_webhook_url]
for url in slack_webhook_url:
self.log_queue.append(
{
"url": url,
"headers": headers,
"payload": payload,
"alert_type": alert_type,
}
)
else:
self.log_queue.append(
{
"url": slack_webhook_url,
"headers": headers,
"payload": payload,
"alert_type": alert_type,
}
)
for response, url in zip(responses, slack_webhook_url):
if response.status_code == 200:
pass
else:
verbose_proxy_logger.debug(
"Error sending slack alert to url={}. Error={}".format(
url, response.text
)
)
else:
# Single call if it's a single URL
response = await send_to_webhook(slack_webhook_url)
if len(self.log_queue) >= self.batch_size:
await self.flush_queue()
if response.status_code == 200:
pass
else:
verbose_proxy_logger.debug(
"Error sending slack alert. Error={}".format(response.text)
async def async_send_batch(self):
if not self.log_queue:
return
squashed_queue = squash_payloads(self.log_queue)
tasks = [
send_to_webhook(
slackAlertingInstance=self, item=item["item"], count=item["count"]
)
for item in squashed_queue.values()
]
await asyncio.gather(*tasks)
self.log_queue.clear()
async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
"""Log deployment latency"""

View file

@ -0,0 +1,121 @@
import os
from datetime import datetime as dt
from enum import Enum
from typing import Any, Dict, List, Literal, Optional, Set, TypedDict
from pydantic import BaseModel, Field
class BaseOutageModel(TypedDict):
alerts: List[int]
minor_alert_sent: bool
major_alert_sent: bool
last_updated_at: float
class OutageModel(BaseOutageModel):
model_id: str
class ProviderRegionOutageModel(BaseOutageModel):
provider_region_id: str
deployment_ids: Set[str]
# we use this for the email header, please send a test email if you change this. verify it looks good on email
LITELLM_LOGO_URL = "https://litellm-listing.s3.amazonaws.com/litellm_logo.png"
LITELLM_SUPPORT_CONTACT = "support@berri.ai"
class LiteLLMBase(BaseModel):
"""
Implements default functions, all pydantic objects should have.
"""
def json(self, **kwargs):
try:
return self.model_dump() # noqa
except:
# if using pydantic v1
return self.dict()
class SlackAlertingArgsEnum(Enum):
daily_report_frequency: int = 12 * 60 * 60
report_check_interval: int = 5 * 60
budget_alert_ttl: int = 24 * 60 * 60
outage_alert_ttl: int = 1 * 60
region_outage_alert_ttl: int = 1 * 60
minor_outage_alert_threshold: int = 1 * 5
major_outage_alert_threshold: int = 1 * 10
max_outage_alert_list_size: int = 1 * 10
class SlackAlertingArgs(LiteLLMBase):
daily_report_frequency: int = Field(
default=int(
os.getenv(
"SLACK_DAILY_REPORT_FREQUENCY",
SlackAlertingArgsEnum.daily_report_frequency.value,
)
),
description="Frequency of receiving deployment latency/failure reports. Default is 12hours. Value is in seconds.",
)
report_check_interval: int = Field(
default=SlackAlertingArgsEnum.report_check_interval.value,
description="Frequency of checking cache if report should be sent. Background process. Default is once per hour. Value is in seconds.",
) # 5 minutes
budget_alert_ttl: int = Field(
default=SlackAlertingArgsEnum.budget_alert_ttl.value,
description="Cache ttl for budgets alerts. Prevents spamming same alert, each time budget is crossed. Value is in seconds.",
) # 24 hours
outage_alert_ttl: int = Field(
default=SlackAlertingArgsEnum.outage_alert_ttl.value,
description="Cache ttl for model outage alerts. Sets time-window for errors. Default is 1 minute. Value is in seconds.",
) # 1 minute ttl
region_outage_alert_ttl: int = Field(
default=SlackAlertingArgsEnum.region_outage_alert_ttl.value,
description="Cache ttl for provider-region based outage alerts. Alert sent if 2+ models in same region report errors. Sets time-window for errors. Default is 1 minute. Value is in seconds.",
) # 1 minute ttl
minor_outage_alert_threshold: int = Field(
default=SlackAlertingArgsEnum.minor_outage_alert_threshold.value,
description="The number of errors that count as a model/region minor outage. ('400' error code is not counted).",
)
major_outage_alert_threshold: int = Field(
default=SlackAlertingArgsEnum.major_outage_alert_threshold.value,
description="The number of errors that countas a model/region major outage. ('400' error code is not counted).",
)
max_outage_alert_list_size: int = Field(
default=SlackAlertingArgsEnum.max_outage_alert_list_size.value,
description="Maximum number of errors to store in cache. For a given model/region. Prevents memory leaks.",
) # prevent memory leak
class DeploymentMetrics(LiteLLMBase):
"""
Metrics per deployment, stored in cache
Used for daily reporting
"""
id: str
"""id of deployment in router model list"""
failed_request: bool
"""did it fail the request?"""
latency_per_output_token: Optional[float]
"""latency/output token of deployment"""
updated_at: dt
"""Current time of deployment being updated"""
class SlackAlertingCacheKeys(Enum):
"""
Enum for deployment daily metrics keys - {deployment_id}:{enum}
"""
failed_requests_key = "failed_requests_daily_metrics"
latency_key = "latency_daily_metrics"
report_sent_key = "daily_metrics_report_sent"

View file

@ -14,11 +14,9 @@ model_list:
general_settings:
master_key: sk-1234
alerting: ["slack"]
alerting_threshold: 0.00001
litellm_settings:
callbacks: ["otel"]
success_callback: ["langsmith", "prometheus"]
service_callback: ["prometheus_system"]

View file

@ -115,7 +115,10 @@ from litellm import (
from litellm._logging import verbose_proxy_logger, verbose_router_logger
from litellm.caching import DualCache, RedisCache
from litellm.exceptions import RejectedRequestError
from litellm.integrations.slack_alerting import SlackAlerting, SlackAlertingArgs
from litellm.integrations.SlackAlerting.slack_alerting import (
SlackAlerting,
SlackAlertingArgs,
)
from litellm.litellm_core_utils.core_helpers import get_litellm_metadata_from_kwargs
from litellm.llms.custom_httpx.httpx_handler import HTTPHandler
from litellm.proxy._types import *

View file

@ -32,7 +32,7 @@ from litellm.caching import DualCache, RedisCache
from litellm.exceptions import RejectedRequestError
from litellm.integrations.custom_guardrail import CustomGuardrail
from litellm.integrations.custom_logger import CustomLogger
from litellm.integrations.slack_alerting import SlackAlerting
from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting
from litellm.litellm_core_utils.core_helpers import (
_get_parent_otel_span_from_kwargs,
get_litellm_metadata_from_kwargs,

View file

@ -5682,7 +5682,7 @@ class Router:
return allowed_fails_policy.ContentPolicyViolationErrorAllowedFails
def _initialize_alerting(self):
from litellm.integrations.slack_alerting import SlackAlerting
from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting
router_alerting_config: AlertingConfig = self.alerting_config

View file

@ -27,7 +27,10 @@ from openai import APIError
import litellm
from litellm.caching import DualCache, RedisCache
from litellm.integrations.slack_alerting import DeploymentMetrics, SlackAlerting
from litellm.integrations.SlackAlerting.slack_alerting import (
DeploymentMetrics,
SlackAlerting,
)
from litellm.proxy._types import CallInfo
from litellm.proxy.utils import ProxyLogging
from litellm.router import AlertingConfig, Router
@ -150,6 +153,7 @@ async def test_response_taking_too_long_hanging(slack_alerting):
await slack_alerting.response_taking_too_long(
type="hanging_request", request_data=request_data
)
mock_send_alert.assert_awaited_once()
@ -230,6 +234,12 @@ async def test_budget_alerts_crossed_again(slack_alerting):
# Test for send_alert - should be called once
@pytest.mark.asyncio
async def test_send_alert(slack_alerting):
import logging
from litellm._logging import verbose_logger
asyncio.create_task(slack_alerting.periodic_flush())
verbose_logger.setLevel(level=logging.DEBUG)
with patch.object(
slack_alerting.async_http_handler, "post", new=AsyncMock()
) as mock_post:
@ -237,6 +247,8 @@ async def test_send_alert(slack_alerting):
await slack_alerting.send_alert(
"Test message", "Low", "budget_alerts", alerting_metadata={}
)
await asyncio.sleep(6)
mock_post.assert_awaited_once()