diff --git a/litellm/integrations/SlackAlerting/batching_handler.py b/litellm/integrations/SlackAlerting/batching_handler.py new file mode 100644 index 0000000000..7c4e9c6f53 --- /dev/null +++ b/litellm/integrations/SlackAlerting/batching_handler.py @@ -0,0 +1,65 @@ +""" +Handles Batching + sending Httpx Post requests to slack + +Slack alerts are sent every 10s or when events are greater than X events + +see custom_batch_logger.py for more details / defaults +""" + +import os +from typing import TYPE_CHECKING, Any, List, Literal, Optional, Union + +from litellm._logging import verbose_logger, verbose_proxy_logger +from litellm.proxy._types import AlertType, WebhookEvent + +if TYPE_CHECKING: + from .slack_alerting import SlackAlerting as _SlackAlerting + + SlackAlertingType = _SlackAlerting +else: + SlackAlertingType = Any + + +def squash_payloads(queue): + import json + + squashed = {} + if len(queue) == 0: + return squashed + if len(queue) == 1: + return {"key": {"item": queue[0], "count": 1}} + + for item in queue: + url = item["url"] + alert_type = item["alert_type"] + _key = (url, alert_type) + + if _key in squashed: + squashed[_key]["count"] += 1 + # Merge the payloads + + else: + squashed[_key] = {"item": item, "count": 1} + + return squashed + + +async def send_to_webhook(slackAlertingInstance: SlackAlertingType, item, count): + import json + + try: + payload = item["payload"] + if count > 1: + payload["text"] = f"[Num Alerts: {count}]\n\n{payload['text']}" + + response = await slackAlertingInstance.async_http_handler.post( + url=item["url"], + headers=item["headers"], + data=json.dumps(payload), + ) + if response.status_code != 200: + verbose_proxy_logger.debug( + f"Error sending slack alert to url={item['url']}. Error={response.text}" + ) + except Exception as e: + verbose_proxy_logger.debug(f"Error sending slack alert: {str(e)}") diff --git a/litellm/integrations/slack_alerting.py b/litellm/integrations/SlackAlerting/slack_alerting.py similarity index 92% rename from litellm/integrations/slack_alerting.py rename to litellm/integrations/SlackAlerting/slack_alerting.py index 3d757fa3dc..a0c1419690 100644 --- a/litellm/integrations/slack_alerting.py +++ b/litellm/integrations/SlackAlerting/slack_alerting.py @@ -15,7 +15,6 @@ from typing import Any, Dict, List, Literal, Optional, Set, TypedDict, Union import aiohttp import dotenv from openai import APIError -from pydantic import BaseModel, Field import litellm import litellm.litellm_core_utils @@ -23,7 +22,7 @@ import litellm.litellm_core_utils.litellm_logging import litellm.types from litellm._logging import verbose_logger, verbose_proxy_logger from litellm.caching import DualCache -from litellm.integrations.custom_logger import CustomLogger +from litellm.integrations.custom_batch_logger import CustomBatchLogger from litellm.litellm_core_utils.litellm_logging import Logging from litellm.llms.custom_httpx.http_handler import ( AsyncHTTPHandler, @@ -39,125 +38,12 @@ from litellm.proxy._types import ( ) from litellm.types.router import LiteLLM_Params -from .email_templates.templates import * +from ..email_templates.templates import * +from .batching_handler import send_to_webhook, squash_payloads +from .types import * -class BaseOutageModel(TypedDict): - alerts: List[int] - minor_alert_sent: bool - major_alert_sent: bool - last_updated_at: float - - -class OutageModel(BaseOutageModel): - model_id: str - - -class ProviderRegionOutageModel(BaseOutageModel): - provider_region_id: str - deployment_ids: Set[str] - - -# we use this for the email header, please send a test email if you change this. verify it looks good on email -LITELLM_LOGO_URL = "https://litellm-listing.s3.amazonaws.com/litellm_logo.png" -LITELLM_SUPPORT_CONTACT = "support@berri.ai" - - -class LiteLLMBase(BaseModel): - """ - Implements default functions, all pydantic objects should have. - """ - - def json(self, **kwargs): - try: - return self.model_dump() # noqa - except: - # if using pydantic v1 - return self.dict() - - -class SlackAlertingArgsEnum(Enum): - daily_report_frequency: int = 12 * 60 * 60 - report_check_interval: int = 5 * 60 - budget_alert_ttl: int = 24 * 60 * 60 - outage_alert_ttl: int = 1 * 60 - region_outage_alert_ttl: int = 1 * 60 - minor_outage_alert_threshold: int = 1 * 5 - major_outage_alert_threshold: int = 1 * 10 - max_outage_alert_list_size: int = 1 * 10 - - -class SlackAlertingArgs(LiteLLMBase): - daily_report_frequency: int = Field( - default=int( - os.getenv( - "SLACK_DAILY_REPORT_FREQUENCY", - SlackAlertingArgsEnum.daily_report_frequency.value, - ) - ), - description="Frequency of receiving deployment latency/failure reports. Default is 12hours. Value is in seconds.", - ) - report_check_interval: int = Field( - default=SlackAlertingArgsEnum.report_check_interval.value, - description="Frequency of checking cache if report should be sent. Background process. Default is once per hour. Value is in seconds.", - ) # 5 minutes - budget_alert_ttl: int = Field( - default=SlackAlertingArgsEnum.budget_alert_ttl.value, - description="Cache ttl for budgets alerts. Prevents spamming same alert, each time budget is crossed. Value is in seconds.", - ) # 24 hours - outage_alert_ttl: int = Field( - default=SlackAlertingArgsEnum.outage_alert_ttl.value, - description="Cache ttl for model outage alerts. Sets time-window for errors. Default is 1 minute. Value is in seconds.", - ) # 1 minute ttl - region_outage_alert_ttl: int = Field( - default=SlackAlertingArgsEnum.region_outage_alert_ttl.value, - description="Cache ttl for provider-region based outage alerts. Alert sent if 2+ models in same region report errors. Sets time-window for errors. Default is 1 minute. Value is in seconds.", - ) # 1 minute ttl - minor_outage_alert_threshold: int = Field( - default=SlackAlertingArgsEnum.minor_outage_alert_threshold.value, - description="The number of errors that count as a model/region minor outage. ('400' error code is not counted).", - ) - major_outage_alert_threshold: int = Field( - default=SlackAlertingArgsEnum.major_outage_alert_threshold.value, - description="The number of errors that countas a model/region major outage. ('400' error code is not counted).", - ) - max_outage_alert_list_size: int = Field( - default=SlackAlertingArgsEnum.max_outage_alert_list_size.value, - description="Maximum number of errors to store in cache. For a given model/region. Prevents memory leaks.", - ) # prevent memory leak - - -class DeploymentMetrics(LiteLLMBase): - """ - Metrics per deployment, stored in cache - - Used for daily reporting - """ - - id: str - """id of deployment in router model list""" - - failed_request: bool - """did it fail the request?""" - - latency_per_output_token: Optional[float] - """latency/output token of deployment""" - - updated_at: dt - """Current time of deployment being updated""" - - -class SlackAlertingCacheKeys(Enum): - """ - Enum for deployment daily metrics keys - {deployment_id}:{enum} - """ - - failed_requests_key = "failed_requests_daily_metrics" - latency_key = "latency_daily_metrics" - report_sent_key = "daily_metrics_report_sent" - - -class SlackAlerting(CustomLogger): +class SlackAlerting(CustomBatchLogger): """ Class for sending Slack Alerts """ @@ -186,6 +72,7 @@ class SlackAlerting(CustomLogger): ] = None, # if user wants to separate alerts to diff channels alerting_args={}, default_webhook_url: Optional[str] = None, + **kwargs, ): self.alerting_threshold = alerting_threshold self.alerting = alerting @@ -198,7 +85,8 @@ class SlackAlerting(CustomLogger): self.is_running = False self.alerting_args = SlackAlertingArgs(**alerting_args) self.default_webhook_url = default_webhook_url - self.llm_router: Optional[litellm.Router] = None + self.flush_lock = asyncio.Lock() + super().__init__(**kwargs, flush_lock=self.flush_lock) def update_values( self, @@ -226,6 +114,8 @@ class SlackAlerting(CustomLogger): if llm_router is not None: self.llm_router = llm_router + asyncio.create_task(self.periodic_flush()) + async def deployment_in_cooldown(self): pass @@ -1534,38 +1424,42 @@ Model Info: payload = {"text": formatted_message} headers = {"Content-type": "application/json"} - async def send_to_webhook(url: str): - return await self.async_http_handler.post( - url=url, - headers=headers, - data=json.dumps(payload), - ) - if isinstance(slack_webhook_url, list): - # Parallelize the calls if it's a list of URLs - responses = await asyncio.gather( - *[send_to_webhook(url) for url in slack_webhook_url] + for url in slack_webhook_url: + self.log_queue.append( + { + "url": url, + "headers": headers, + "payload": payload, + "alert_type": alert_type, + } + ) + else: + self.log_queue.append( + { + "url": slack_webhook_url, + "headers": headers, + "payload": payload, + "alert_type": alert_type, + } ) - for response, url in zip(responses, slack_webhook_url): - if response.status_code == 200: - pass - else: - verbose_proxy_logger.debug( - "Error sending slack alert to url={}. Error={}".format( - url, response.text - ) - ) - else: - # Single call if it's a single URL - response = await send_to_webhook(slack_webhook_url) + if len(self.log_queue) >= self.batch_size: + await self.flush_queue() - if response.status_code == 200: - pass - else: - verbose_proxy_logger.debug( - "Error sending slack alert. Error={}".format(response.text) - ) + async def async_send_batch(self): + if not self.log_queue: + return + + squashed_queue = squash_payloads(self.log_queue) + tasks = [ + send_to_webhook( + slackAlertingInstance=self, item=item["item"], count=item["count"] + ) + for item in squashed_queue.values() + ] + await asyncio.gather(*tasks) + self.log_queue.clear() async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): """Log deployment latency""" diff --git a/litellm/integrations/SlackAlerting/types.py b/litellm/integrations/SlackAlerting/types.py new file mode 100644 index 0000000000..d5519068e8 --- /dev/null +++ b/litellm/integrations/SlackAlerting/types.py @@ -0,0 +1,121 @@ +import os +from datetime import datetime as dt +from enum import Enum +from typing import Any, Dict, List, Literal, Optional, Set, TypedDict + +from pydantic import BaseModel, Field + + +class BaseOutageModel(TypedDict): + alerts: List[int] + minor_alert_sent: bool + major_alert_sent: bool + last_updated_at: float + + +class OutageModel(BaseOutageModel): + model_id: str + + +class ProviderRegionOutageModel(BaseOutageModel): + provider_region_id: str + deployment_ids: Set[str] + + +# we use this for the email header, please send a test email if you change this. verify it looks good on email +LITELLM_LOGO_URL = "https://litellm-listing.s3.amazonaws.com/litellm_logo.png" +LITELLM_SUPPORT_CONTACT = "support@berri.ai" + + +class LiteLLMBase(BaseModel): + """ + Implements default functions, all pydantic objects should have. + """ + + def json(self, **kwargs): + try: + return self.model_dump() # noqa + except: + # if using pydantic v1 + return self.dict() + + +class SlackAlertingArgsEnum(Enum): + daily_report_frequency: int = 12 * 60 * 60 + report_check_interval: int = 5 * 60 + budget_alert_ttl: int = 24 * 60 * 60 + outage_alert_ttl: int = 1 * 60 + region_outage_alert_ttl: int = 1 * 60 + minor_outage_alert_threshold: int = 1 * 5 + major_outage_alert_threshold: int = 1 * 10 + max_outage_alert_list_size: int = 1 * 10 + + +class SlackAlertingArgs(LiteLLMBase): + daily_report_frequency: int = Field( + default=int( + os.getenv( + "SLACK_DAILY_REPORT_FREQUENCY", + SlackAlertingArgsEnum.daily_report_frequency.value, + ) + ), + description="Frequency of receiving deployment latency/failure reports. Default is 12hours. Value is in seconds.", + ) + report_check_interval: int = Field( + default=SlackAlertingArgsEnum.report_check_interval.value, + description="Frequency of checking cache if report should be sent. Background process. Default is once per hour. Value is in seconds.", + ) # 5 minutes + budget_alert_ttl: int = Field( + default=SlackAlertingArgsEnum.budget_alert_ttl.value, + description="Cache ttl for budgets alerts. Prevents spamming same alert, each time budget is crossed. Value is in seconds.", + ) # 24 hours + outage_alert_ttl: int = Field( + default=SlackAlertingArgsEnum.outage_alert_ttl.value, + description="Cache ttl for model outage alerts. Sets time-window for errors. Default is 1 minute. Value is in seconds.", + ) # 1 minute ttl + region_outage_alert_ttl: int = Field( + default=SlackAlertingArgsEnum.region_outage_alert_ttl.value, + description="Cache ttl for provider-region based outage alerts. Alert sent if 2+ models in same region report errors. Sets time-window for errors. Default is 1 minute. Value is in seconds.", + ) # 1 minute ttl + minor_outage_alert_threshold: int = Field( + default=SlackAlertingArgsEnum.minor_outage_alert_threshold.value, + description="The number of errors that count as a model/region minor outage. ('400' error code is not counted).", + ) + major_outage_alert_threshold: int = Field( + default=SlackAlertingArgsEnum.major_outage_alert_threshold.value, + description="The number of errors that countas a model/region major outage. ('400' error code is not counted).", + ) + max_outage_alert_list_size: int = Field( + default=SlackAlertingArgsEnum.max_outage_alert_list_size.value, + description="Maximum number of errors to store in cache. For a given model/region. Prevents memory leaks.", + ) # prevent memory leak + + +class DeploymentMetrics(LiteLLMBase): + """ + Metrics per deployment, stored in cache + + Used for daily reporting + """ + + id: str + """id of deployment in router model list""" + + failed_request: bool + """did it fail the request?""" + + latency_per_output_token: Optional[float] + """latency/output token of deployment""" + + updated_at: dt + """Current time of deployment being updated""" + + +class SlackAlertingCacheKeys(Enum): + """ + Enum for deployment daily metrics keys - {deployment_id}:{enum} + """ + + failed_requests_key = "failed_requests_daily_metrics" + latency_key = "latency_daily_metrics" + report_sent_key = "daily_metrics_report_sent" diff --git a/litellm/proxy/proxy_config.yaml b/litellm/proxy/proxy_config.yaml index 890aaf5310..a0d03bdf80 100644 --- a/litellm/proxy/proxy_config.yaml +++ b/litellm/proxy/proxy_config.yaml @@ -14,11 +14,9 @@ model_list: general_settings: master_key: sk-1234 + alerting: ["slack"] + alerting_threshold: 0.00001 litellm_settings: - callbacks: ["otel"] - success_callback: ["langsmith", "prometheus"] - service_callback: ["prometheus_system"] - diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index 341f2b5d62..731feac725 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -115,7 +115,10 @@ from litellm import ( from litellm._logging import verbose_proxy_logger, verbose_router_logger from litellm.caching import DualCache, RedisCache from litellm.exceptions import RejectedRequestError -from litellm.integrations.slack_alerting import SlackAlerting, SlackAlertingArgs +from litellm.integrations.SlackAlerting.slack_alerting import ( + SlackAlerting, + SlackAlertingArgs, +) from litellm.litellm_core_utils.core_helpers import get_litellm_metadata_from_kwargs from litellm.llms.custom_httpx.httpx_handler import HTTPHandler from litellm.proxy._types import * diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index df3e97bf4a..9892969491 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -32,7 +32,7 @@ from litellm.caching import DualCache, RedisCache from litellm.exceptions import RejectedRequestError from litellm.integrations.custom_guardrail import CustomGuardrail from litellm.integrations.custom_logger import CustomLogger -from litellm.integrations.slack_alerting import SlackAlerting +from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting from litellm.litellm_core_utils.core_helpers import ( _get_parent_otel_span_from_kwargs, get_litellm_metadata_from_kwargs, diff --git a/litellm/router.py b/litellm/router.py index c187474f1a..fcfc92fd08 100644 --- a/litellm/router.py +++ b/litellm/router.py @@ -5682,7 +5682,7 @@ class Router: return allowed_fails_policy.ContentPolicyViolationErrorAllowedFails def _initialize_alerting(self): - from litellm.integrations.slack_alerting import SlackAlerting + from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting router_alerting_config: AlertingConfig = self.alerting_config diff --git a/litellm/tests/test_alerting.py b/litellm/tests/test_alerting.py index 72d9f2a300..4f1cd21aba 100644 --- a/litellm/tests/test_alerting.py +++ b/litellm/tests/test_alerting.py @@ -27,7 +27,10 @@ from openai import APIError import litellm from litellm.caching import DualCache, RedisCache -from litellm.integrations.slack_alerting import DeploymentMetrics, SlackAlerting +from litellm.integrations.SlackAlerting.slack_alerting import ( + DeploymentMetrics, + SlackAlerting, +) from litellm.proxy._types import CallInfo from litellm.proxy.utils import ProxyLogging from litellm.router import AlertingConfig, Router @@ -150,6 +153,7 @@ async def test_response_taking_too_long_hanging(slack_alerting): await slack_alerting.response_taking_too_long( type="hanging_request", request_data=request_data ) + mock_send_alert.assert_awaited_once() @@ -230,6 +234,12 @@ async def test_budget_alerts_crossed_again(slack_alerting): # Test for send_alert - should be called once @pytest.mark.asyncio async def test_send_alert(slack_alerting): + import logging + + from litellm._logging import verbose_logger + + asyncio.create_task(slack_alerting.periodic_flush()) + verbose_logger.setLevel(level=logging.DEBUG) with patch.object( slack_alerting.async_http_handler, "post", new=AsyncMock() ) as mock_post: @@ -237,6 +247,8 @@ async def test_send_alert(slack_alerting): await slack_alerting.send_alert( "Test message", "Low", "budget_alerts", alerting_metadata={} ) + + await asyncio.sleep(6) mock_post.assert_awaited_once()