(Feat) - Add PagerDuty Alerting Integration (#7478)

* define basic types

* fix verbose_logger.exception statement

* fix basic alerting

* test pager duty alerting

* test_pagerduty_alerting_high_failure_rate

* PagerDutyAlerting

* async_log_failure_event

* use pre_call_hook

* add _request_is_completed helper util

* update AlertingConfig

* rename PagerDutyInternalEvent

* _send_alert_if_thresholds_crossed

* use pagerduty as _custom_logger_compatible_callbacks_literal

* fix slack alerting imports

* fix imports in slack alerting

* PagerDutyAlerting

* fix _load_alerting_settings

* test_pagerduty_hanging_request_alerting

* working pager duty alerting

* fix linting

* doc pager duty alerting

* update hanging_response_handler

* fix import location

* update failure_threshold

* update async_pre_call_hook

* docs pagerduty

* test - callback_class_str_to_classType

* fix linting errors

* fix linting + testing error

* PagerDutyAlerting

* test_pagerduty_hanging_request_alerting

* fix unused imports

* docs pager duty

* @pytest.mark.flaky(retries=6, delay=2)

* test_model_info_bedrock_converse_enforcement
This commit is contained in:
Ishaan Jaff 2025-01-01 07:12:51 -08:00 committed by GitHub
parent 9af6ba0a02
commit a39cac313c
15 changed files with 691 additions and 28 deletions

View file

@ -452,6 +452,7 @@ router_settings:
| OTEL_HEADERS | Headers for OpenTelemetry requests | OTEL_HEADERS | Headers for OpenTelemetry requests
| OTEL_SERVICE_NAME | Service name identifier for OpenTelemetry | OTEL_SERVICE_NAME | Service name identifier for OpenTelemetry
| OTEL_TRACER_NAME | Tracer name for OpenTelemetry tracing | OTEL_TRACER_NAME | Tracer name for OpenTelemetry tracing
| PAGERDUTY_API_KEY | API key for PagerDuty Alerting
| POD_NAME | Pod name for the server, this will be [emitted to `datadog` logs](https://docs.litellm.ai/docs/proxy/logging#datadog) as `POD_NAME` | POD_NAME | Pod name for the server, this will be [emitted to `datadog` logs](https://docs.litellm.ai/docs/proxy/logging#datadog) as `POD_NAME`
| PREDIBASE_API_BASE | Base URL for Predibase API | PREDIBASE_API_BASE | Base URL for Predibase API
| PRESIDIO_ANALYZER_API_BASE | Base URL for Presidio Analyzer service | PRESIDIO_ANALYZER_API_BASE | Base URL for Presidio Analyzer service

View file

@ -0,0 +1,106 @@
import Image from '@theme/IdealImage';
# PagerDuty Alerting
:::info
✨ PagerDuty Alerting is on LiteLLM Enterprise
[Enterprise Pricing](https://www.litellm.ai/#pricing)
[Get free 7-day trial key](https://www.litellm.ai/#trial)
:::
Handles two types of alerts:
- High LLM API Failure Rate. Configure X fails in Y seconds to trigger an alert.
- High Number of Hanging LLM Requests. Configure X hangs in Y seconds to trigger an alert.
## Quick Start
1. Set `PAGERDUTY_API_KEY="d8bxxxxx"` in your environment variables.
```
PAGERDUTY_API_KEY="d8bxxxxx"
```
2. Set PagerDuty Alerting in your config file.
```yaml
model_list:
- model_name: "openai/*"
litellm_params:
model: "openai/*"
api_key: os.environ/OPENAI_API_KEY
general_settings:
alerting: ["pagerduty"]
alerting_args:
failure_threshold: 1 # Number of requests failing in a window
failure_threshold_window_seconds: 10 # Window in seconds
# Requests hanging threshold
hanging_threshold_seconds: 0.0000001 # Number of seconds of waiting for a response before a request is considered hanging
hanging_threshold_window_seconds: 10 # Window in seconds
```
3. Test it
Start LiteLLM Proxy
```shell
litellm --config config.yaml
```
### LLM API Failure Alert
Try sending a bad request to proxy
```shell
curl -i --location 'http://0.0.0.0:4000/chat/completions' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-1234' \
--data ' {
"model": "gpt-4o",
"user": "hi",
"messages": [
{
"role": "user",
"bad_param": "i like coffee"
}
]
}
'
```
<Image img={require('../../img/pagerduty_fail.png')} />
### LLM Hanging Alert
Try sending a hanging request to proxy
Since our hanging threshold is 0.0000001 seconds, you should see an alert.
```shell
curl -i --location 'http://0.0.0.0:4000/chat/completions' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-1234' \
--data ' {
"model": "gpt-4o",
"user": "hi",
"messages": [
{
"role": "user",
"content": "i like coffee"
}
]
}
'
```
<Image img={require('../../img/pagerduty_hanging.png')} />

Binary file not shown.

After

Width:  |  Height:  |  Size: 210 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 219 KiB

View file

@ -118,7 +118,13 @@ const sidebars = {
{ {
type: "category", type: "category",
label: "Logging, Alerting, Metrics", label: "Logging, Alerting, Metrics",
items: ["proxy/logging", "proxy/logging_spec", "proxy/team_logging","proxy/alerting", "proxy/prometheus"], items: [
"proxy/logging",
"proxy/logging_spec",
"proxy/team_logging",
"proxy/prometheus",
"proxy/alerting",
"proxy/pagerduty"],
}, },
{ {
type: "category", type: "category",

View file

@ -74,6 +74,7 @@ _custom_logger_compatible_callbacks_literal = Literal[
"argilla", "argilla",
"mlflow", "mlflow",
"langfuse", "langfuse",
"pagerduty",
"humanloop", "humanloop",
] ]
logged_real_time_event_types: Optional[Union[List[str], Literal["*"]]] = None logged_real_time_event_types: Optional[Union[List[str], Literal["*"]]] = None

View file

@ -6,7 +6,7 @@ import os
import random import random
import time import time
from datetime import timedelta from datetime import timedelta
from typing import Any, Dict, List, Literal, Optional, Union from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union
from openai import APIError from openai import APIError
@ -25,13 +25,19 @@ from litellm.llms.custom_httpx.http_handler import (
httpxSpecialProvider, httpxSpecialProvider,
) )
from litellm.proxy._types import AlertType, CallInfo, VirtualKeyEvent, WebhookEvent from litellm.proxy._types import AlertType, CallInfo, VirtualKeyEvent, WebhookEvent
from litellm.router import Router
from litellm.types.integrations.slack_alerting import * from litellm.types.integrations.slack_alerting import *
from ..email_templates.templates import * from ..email_templates.templates import *
from .batching_handler import send_to_webhook, squash_payloads from .batching_handler import send_to_webhook, squash_payloads
from .utils import _add_langfuse_trace_id_to_alert, process_slack_alerting_variables from .utils import _add_langfuse_trace_id_to_alert, process_slack_alerting_variables
if TYPE_CHECKING:
from litellm.router import Router as _Router
Router = _Router
else:
Router = Any
class SlackAlerting(CustomBatchLogger): class SlackAlerting(CustomBatchLogger):
""" """
@ -465,18 +471,10 @@ class SlackAlerting(CustomBatchLogger):
self.alerting_threshold self.alerting_threshold
) # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests ) # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests
alerting_metadata: dict = {} alerting_metadata: dict = {}
if ( if await self._request_is_completed(request_data=request_data) is True:
request_data is not None return
and request_data.get("litellm_status", "") != "success"
and request_data.get("litellm_status", "") != "fail" if request_data is not None:
):
## CHECK IF CACHE IS UPDATED
litellm_call_id = request_data.get("litellm_call_id", "")
status: Optional[str] = await self.internal_usage_cache.async_get_cache(
key="request_status:{}".format(litellm_call_id), local_only=True
)
if status is not None and (status == "success" or status == "fail"):
return
if request_data.get("deployment", None) is not None and isinstance( if request_data.get("deployment", None) is not None and isinstance(
request_data["deployment"], dict request_data["deployment"], dict
): ):
@ -1753,3 +1751,23 @@ Model Info:
) )
return return
async def _request_is_completed(self, request_data: Optional[dict]) -> bool:
"""
Returns True if the request is completed - either as a success or failure
"""
if request_data is None:
return False
if (
request_data.get("litellm_status", "") != "success"
and request_data.get("litellm_status", "") != "fail"
):
## CHECK IF CACHE IS UPDATED
litellm_call_id = request_data.get("litellm_call_id", "")
status: Optional[str] = await self.internal_usage_cache.async_get_cache(
key="request_status:{}".format(litellm_call_id), local_only=True
)
if status is not None and (status == "success" or status == "fail"):
return True
return False

View file

@ -3,12 +3,18 @@ Utils used for slack alerting
""" """
import asyncio import asyncio
from typing import Dict, List, Optional, Union from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from litellm.litellm_core_utils.litellm_logging import Logging
from litellm.proxy._types import AlertType from litellm.proxy._types import AlertType
from litellm.secret_managers.main import get_secret from litellm.secret_managers.main import get_secret
if TYPE_CHECKING:
from litellm.litellm_core_utils.litellm_logging import Logging as _Logging
Logging = _Logging
else:
Logging = Any
def process_slack_alerting_variables( def process_slack_alerting_variables(
alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]] alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]]

View file

@ -0,0 +1,303 @@
"""
PagerDuty Alerting Integration
Handles two types of alerts:
- High LLM API Failure Rate. Configure X fails in Y seconds to trigger an alert.
- High Number of Hanging LLM Requests. Configure X hangs in Y seconds to trigger an alert.
"""
import asyncio
import os
from datetime import datetime, timedelta, timezone
from typing import List, Literal, Optional, Union
from litellm._logging import verbose_logger
from litellm.caching import DualCache
from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting
from litellm.llms.custom_httpx.http_handler import (
AsyncHTTPHandler,
get_async_httpx_client,
httpxSpecialProvider,
)
from litellm.proxy._types import UserAPIKeyAuth
from litellm.types.integrations.pagerduty import (
AlertingConfig,
PagerDutyInternalEvent,
PagerDutyPayload,
PagerDutyRequestBody,
)
from litellm.types.utils import (
StandardLoggingPayload,
StandardLoggingPayloadErrorInformation,
)
PAGERDUTY_DEFAULT_FAILURE_THRESHOLD = 60
PAGERDUTY_DEFAULT_FAILURE_THRESHOLD_WINDOW_SECONDS = 60
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS = 60
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_WINDOW_SECONDS = 600
class PagerDutyAlerting(SlackAlerting):
"""
Tracks failed requests and hanging requests separately.
If threshold is crossed for either type, triggers a PagerDuty alert.
"""
def __init__(
self, alerting_args: Optional[Union[AlertingConfig, dict]] = None, **kwargs
):
from litellm.proxy.proxy_server import CommonProxyErrors, premium_user
super().__init__()
_api_key = os.getenv("PAGERDUTY_API_KEY")
if not _api_key:
raise ValueError("PAGERDUTY_API_KEY is not set")
self.api_key: str = _api_key
alerting_args = alerting_args or {}
self.alerting_args: AlertingConfig = AlertingConfig(
failure_threshold=alerting_args.get(
"failure_threshold", PAGERDUTY_DEFAULT_FAILURE_THRESHOLD
),
failure_threshold_window_seconds=alerting_args.get(
"failure_threshold_window_seconds",
PAGERDUTY_DEFAULT_FAILURE_THRESHOLD_WINDOW_SECONDS,
),
hanging_threshold_seconds=alerting_args.get(
"hanging_threshold_seconds", PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS
),
hanging_threshold_window_seconds=alerting_args.get(
"hanging_threshold_window_seconds",
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_WINDOW_SECONDS,
),
)
# Separate storage for failures vs. hangs
self._failure_events: List[PagerDutyInternalEvent] = []
self._hanging_events: List[PagerDutyInternalEvent] = []
# premium user check
if premium_user is not True:
raise ValueError(
f"PagerDutyAlerting is only available for LiteLLM Enterprise users. {CommonProxyErrors.not_premium_user.value}"
)
# ------------------ MAIN LOGIC ------------------ #
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
"""
Record a failure event. Only send an alert to PagerDuty if the
configured *failure* threshold is exceeded in the specified window.
"""
now = datetime.now(timezone.utc)
standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
"standard_logging_object"
)
if not standard_logging_payload:
raise ValueError(
"standard_logging_object is required for PagerDutyAlerting"
)
# Extract error details
error_info: Optional[StandardLoggingPayloadErrorInformation] = (
standard_logging_payload.get("error_information") or {}
)
_meta = standard_logging_payload.get("metadata") or {}
self._failure_events.append(
PagerDutyInternalEvent(
failure_event_type="failed_response",
timestamp=now,
error_class=error_info.get("error_class"),
error_code=error_info.get("error_code"),
error_llm_provider=error_info.get("llm_provider"),
user_api_key_hash=_meta.get("user_api_key_hash"),
user_api_key_alias=_meta.get("user_api_key_alias"),
user_api_key_org_id=_meta.get("user_api_key_org_id"),
user_api_key_team_id=_meta.get("user_api_key_team_id"),
user_api_key_user_id=_meta.get("user_api_key_user_id"),
user_api_key_team_alias=_meta.get("user_api_key_team_alias"),
user_api_key_end_user_id=_meta.get("user_api_key_end_user_id"),
)
)
# Prune + Possibly alert
window_seconds = self.alerting_args.get("failure_threshold_window_seconds", 60)
threshold = self.alerting_args.get("failure_threshold", 1)
# If threshold is crossed, send PD alert for failures
await self._send_alert_if_thresholds_crossed(
events=self._failure_events,
window_seconds=window_seconds,
threshold=threshold,
alert_prefix="High LLM API Failure Rate",
)
async def async_pre_call_hook(
self,
user_api_key_dict: UserAPIKeyAuth,
cache: DualCache,
data: dict,
call_type: Literal[
"completion",
"text_completion",
"embeddings",
"image_generation",
"moderation",
"audio_transcription",
"pass_through_endpoint",
"rerank",
],
) -> Optional[Union[Exception, str, dict]]:
"""
Example of detecting hanging requests by waiting a given threshold.
If the request didn't finish by then, we treat it as 'hanging'.
"""
verbose_logger.info("Inside Proxy Logging Pre-call hook!")
asyncio.create_task(
self.hanging_response_handler(
request_data=data, user_api_key_dict=user_api_key_dict
)
)
return None
async def hanging_response_handler(
self, request_data: Optional[dict], user_api_key_dict: UserAPIKeyAuth
):
"""
Checks if request completed by the time 'hanging_threshold_seconds' elapses.
If not, we classify it as a hanging request.
"""
verbose_logger.debug(
f"Inside Hanging Response Handler!..sleeping for {self.alerting_args.get('hanging_threshold_seconds', PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS)} seconds"
)
await asyncio.sleep(
self.alerting_args.get(
"hanging_threshold_seconds", PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS
)
)
if await self._request_is_completed(request_data=request_data):
return # It's not hanging if completed
# Otherwise, record it as hanging
self._hanging_events.append(
PagerDutyInternalEvent(
failure_event_type="hanging_response",
timestamp=datetime.now(timezone.utc),
error_class="HangingRequest",
error_code="HangingRequest",
error_llm_provider="HangingRequest",
user_api_key_hash=user_api_key_dict.api_key,
user_api_key_alias=user_api_key_dict.key_alias,
user_api_key_org_id=user_api_key_dict.org_id,
user_api_key_team_id=user_api_key_dict.team_id,
user_api_key_user_id=user_api_key_dict.user_id,
user_api_key_team_alias=user_api_key_dict.team_alias,
user_api_key_end_user_id=user_api_key_dict.end_user_id,
)
)
# Prune + Possibly alert
window_seconds = self.alerting_args.get(
"hanging_threshold_window_seconds",
PAGERDUTY_DEFAULT_HANGING_THRESHOLD_WINDOW_SECONDS,
)
threshold: int = self.alerting_args.get(
"hanging_threshold_fails", PAGERDUTY_DEFAULT_HANGING_THRESHOLD_SECONDS
)
# If threshold is crossed, send PD alert for hangs
await self._send_alert_if_thresholds_crossed(
events=self._hanging_events,
window_seconds=window_seconds,
threshold=threshold,
alert_prefix="High Number of Hanging LLM Requests",
)
# ------------------ HELPERS ------------------ #
async def _send_alert_if_thresholds_crossed(
self,
events: List[PagerDutyInternalEvent],
window_seconds: int,
threshold: int,
alert_prefix: str,
):
"""
1. Prune old events
2. If threshold is reached, build alert, send to PagerDuty
3. Clear those events
"""
cutoff = datetime.now(timezone.utc) - timedelta(seconds=window_seconds)
pruned = [e for e in events if e.get("timestamp", datetime.min) > cutoff]
# Update the reference list
events.clear()
events.extend(pruned)
# Check threshold
verbose_logger.debug(
f"Have {len(events)} events in the last {window_seconds} seconds. Threshold is {threshold}"
)
if len(events) >= threshold:
# Build short summary of last N events
error_summaries = self._build_error_summaries(events, max_errors=5)
alert_message = (
f"{alert_prefix}: {len(events)} in the last {window_seconds} seconds."
)
custom_details = {"recent_errors": error_summaries}
await self.send_alert_to_pagerduty(
alert_message=alert_message,
custom_details=custom_details,
)
# Clear them after sending an alert, so we don't spam
events.clear()
def _build_error_summaries(
self, events: List[PagerDutyInternalEvent], max_errors: int = 5
) -> List[PagerDutyInternalEvent]:
"""
Build short text summaries for the last `max_errors`.
Example: "ValueError (code: 500, provider: openai)"
"""
recent = events[-max_errors:]
summaries = []
for fe in recent:
# If any of these is None, show "N/A" to avoid messing up the summary string
fe.pop("timestamp")
summaries.append(fe)
return summaries
async def send_alert_to_pagerduty(self, alert_message: str, custom_details: dict):
"""
Send [critical] Alert to PagerDuty
https://developer.pagerduty.com/api-reference/YXBpOjI3NDgyNjU-pager-duty-v2-events-api
"""
try:
verbose_logger.debug(f"Sending alert to PagerDuty: {alert_message}")
async_client: AsyncHTTPHandler = get_async_httpx_client(
llm_provider=httpxSpecialProvider.LoggingCallback
)
payload: PagerDutyRequestBody = PagerDutyRequestBody(
payload=PagerDutyPayload(
summary=alert_message,
severity="critical",
source="LiteLLM Alert",
component="LiteLLM",
custom_details=custom_details,
),
routing_key=self.api_key,
event_action="trigger",
)
return await async_client.post(
url="https://events.pagerduty.com/v2/enqueue",
json=dict(payload),
headers={"Content-Type": "application/json"},
)
except Exception as e:
verbose_logger.exception(f"Error sending alert to PagerDuty: {e}")

View file

@ -30,6 +30,7 @@ from litellm.cost_calculator import _select_model_name_for_cost_calc
from litellm.integrations.custom_guardrail import CustomGuardrail from litellm.integrations.custom_guardrail import CustomGuardrail
from litellm.integrations.custom_logger import CustomLogger from litellm.integrations.custom_logger import CustomLogger
from litellm.integrations.mlflow import MlflowLogger from litellm.integrations.mlflow import MlflowLogger
from litellm.integrations.pagerduty.pagerduty import PagerDutyAlerting
from litellm.litellm_core_utils.redact_messages import ( from litellm.litellm_core_utils.redact_messages import (
redact_message_input_output_from_custom_logger, redact_message_input_output_from_custom_logger,
redact_message_input_output_from_logging, redact_message_input_output_from_logging,
@ -1992,7 +1993,7 @@ class Logging(LiteLLMLoggingBaseClass):
) )
except Exception as e: except Exception as e:
verbose_logger.exception( verbose_logger.exception(
"LiteLLM.LoggingError: [Non-Blocking] Exception occurred while success \ "LiteLLM.LoggingError: [Non-Blocking] Exception occurred while failure \
logging {}\nCallback={}".format( logging {}\nCallback={}".format(
str(e), callback str(e), callback
) )
@ -2163,7 +2164,12 @@ def _init_custom_logger_compatible_class( # noqa: PLR0915
llm_router: Optional[ llm_router: Optional[
Any Any
], # expect litellm.Router, but typing errors due to circular import ], # expect litellm.Router, but typing errors due to circular import
custom_logger_init_args: Optional[dict] = {},
) -> Optional[CustomLogger]: ) -> Optional[CustomLogger]:
"""
Initialize a custom logger compatible class
"""
custom_logger_init_args = custom_logger_init_args or {}
if logging_integration == "lago": if logging_integration == "lago":
for callback in _in_memory_loggers: for callback in _in_memory_loggers:
if isinstance(callback, LagoLogger): if isinstance(callback, LagoLogger):
@ -2386,6 +2392,13 @@ def _init_custom_logger_compatible_class( # noqa: PLR0915
langfuse_logger = LangfusePromptManagement() langfuse_logger = LangfusePromptManagement()
_in_memory_loggers.append(langfuse_logger) _in_memory_loggers.append(langfuse_logger)
return langfuse_logger # type: ignore return langfuse_logger # type: ignore
elif logging_integration == "pagerduty":
for callback in _in_memory_loggers:
if isinstance(callback, PagerDutyAlerting):
return callback
pagerduty_logger = PagerDutyAlerting(**custom_logger_init_args)
_in_memory_loggers.append(pagerduty_logger)
return pagerduty_logger # type: ignore
elif logging_integration == "humanloop": elif logging_integration == "humanloop":
for callback in _in_memory_loggers: for callback in _in_memory_loggers:
if isinstance(callback, HumanloopLogger): if isinstance(callback, HumanloopLogger):
@ -2509,6 +2522,10 @@ def get_custom_logger_compatible_class( # noqa: PLR0915
for callback in _in_memory_loggers: for callback in _in_memory_loggers:
if isinstance(callback, MlflowLogger): if isinstance(callback, MlflowLogger):
return callback return callback
elif logging_integration == "pagerduty":
for callback in _in_memory_loggers:
if isinstance(callback, PagerDutyAlerting):
return callback
return None return None

View file

@ -11,9 +11,21 @@ litellm_settings:
callbacks: ["datadog"] callbacks: ["datadog"]
general_settings:
alerting: ["pagerduty"]
alerting_args:
failure_threshold: 4 # Number of requests failing in a window
failure_threshold_window_seconds: 10 # Window in seconds
# Requests hanging threshold
hanging_threshold_seconds: 0.0000001 # Number of seconds of waiting for a response before a request is considered hanging
hanging_threshold_window_seconds: 10 # Window in seconds
# For /fine_tuning/jobs endpoints # For /fine_tuning/jobs endpoints
finetune_settings: finetune_settings:
- custom_llm_provider: "vertex_ai" - custom_llm_provider: "vertex_ai"
vertex_project: "adroit-crow-413218" vertex_project: "adroit-crow-413218"
vertex_location: "us-central1" vertex_location: "us-central1"
vertex_credentials: "/Users/ishaanjaffer/Downloads/adroit-crow-413218-a956eef1a2a8.json" vertex_credentials: "/Users/ishaanjaffer/Downloads/adroit-crow-413218-a956eef1a2a8.json"

View file

@ -1939,15 +1939,7 @@ class ProxyConfig:
use_azure_key_vault = general_settings.get("use_azure_key_vault", False) use_azure_key_vault = general_settings.get("use_azure_key_vault", False)
load_from_azure_key_vault(use_azure_key_vault=use_azure_key_vault) load_from_azure_key_vault(use_azure_key_vault=use_azure_key_vault)
### ALERTING ### ### ALERTING ###
self._load_alerting_settings(general_settings=general_settings)
proxy_logging_obj.update_values(
alerting=general_settings.get("alerting", None),
alerting_threshold=general_settings.get("alerting_threshold", 600),
alert_types=general_settings.get("alert_types", None),
alert_to_webhook_url=general_settings.get("alert_to_webhook_url", None),
alerting_args=general_settings.get("alerting_args", None),
redis_cache=redis_usage_cache,
)
### CONNECT TO DATABASE ### ### CONNECT TO DATABASE ###
database_url = general_settings.get("database_url", None) database_url = general_settings.get("database_url", None)
if database_url and database_url.startswith("os.environ/"): if database_url and database_url.startswith("os.environ/"):
@ -2135,6 +2127,46 @@ class ProxyConfig:
) )
return router, router.get_model_list(), general_settings return router, router.get_model_list(), general_settings
def _load_alerting_settings(self, general_settings: dict):
"""
Initialize alerting settings
"""
from litellm.litellm_core_utils.litellm_logging import (
_init_custom_logger_compatible_class,
)
_alerting_callbacks = general_settings.get("alerting", None)
verbose_proxy_logger.debug(f"_alerting_callbacks: {general_settings}")
if _alerting_callbacks is None:
return
for _alert in _alerting_callbacks:
if _alert == "slack":
# [OLD] v0 implementation
proxy_logging_obj.update_values(
alerting=general_settings.get("alerting", None),
alerting_threshold=general_settings.get("alerting_threshold", 600),
alert_types=general_settings.get("alert_types", None),
alert_to_webhook_url=general_settings.get(
"alert_to_webhook_url", None
),
alerting_args=general_settings.get("alerting_args", None),
redis_cache=redis_usage_cache,
)
else:
# [NEW] v1 implementation - init as a custom logger
if _alert in litellm._known_custom_logger_compatible_callbacks:
_logger = _init_custom_logger_compatible_class(
logging_integration=_alert,
internal_usage_cache=None,
llm_router=None,
custom_logger_init_args={
"alerting_args": general_settings.get("alerting_args", None)
},
)
if _logger is not None:
litellm.callbacks.append(_logger)
pass
def get_model_info_with_id(self, model, db_model=False) -> RouterModelInfo: def get_model_info_with_id(self, model, db_model=False) -> RouterModelInfo:
""" """
Common logic across add + delete router models Common logic across add + delete router models

View file

@ -0,0 +1,62 @@
from datetime import datetime
from typing import List, Literal, Optional, TypedDict, Union
from litellm.types.utils import StandardLoggingUserAPIKeyMetadata
class LinkDict(TypedDict, total=False):
href: str
text: Optional[str]
class ImageDict(TypedDict, total=False):
src: str
href: Optional[str]
alt: Optional[str]
class PagerDutyPayload(TypedDict, total=False):
summary: str
timestamp: Optional[str] # ISO 8601 date-time format
severity: Literal["critical", "warning", "error", "info"]
source: str
component: Optional[str]
group: Optional[str]
class_: Optional[str] # Using class_ since 'class' is a reserved keyword
custom_details: Optional[dict]
class PagerDutyRequestBody(TypedDict, total=False):
payload: PagerDutyPayload
routing_key: str
event_action: Literal["trigger", "acknowledge", "resolve"]
dedup_key: Optional[str]
client: Optional[str]
client_url: Optional[str]
links: Optional[List[LinkDict]]
images: Optional[List[ImageDict]]
class AlertingConfig(TypedDict, total=False):
"""
Config for alerting thresholds
"""
# Requests failing threshold
failure_threshold: int # Number of requests failing in a window
failure_threshold_window_seconds: int # Window in seconds
# Requests hanging threshold
hanging_threshold_seconds: float # Number of seconds of waiting for a response before a request is considered hanging
hanging_threshold_fails: int # Number of requests hanging in a window
hanging_threshold_window_seconds: int # Window in seconds
class PagerDutyInternalEvent(StandardLoggingUserAPIKeyMetadata, total=False):
"""Simple structure to hold timestamp and error info."""
failure_event_type: Literal["failed_response", "hanging_response"]
timestamp: datetime
error_class: Optional[str]
error_code: Optional[str]
error_llm_provider: Optional[str]

View file

@ -0,0 +1,96 @@
import asyncio
import os
import random
import sys
from datetime import datetime, timedelta
from typing import Optional
sys.path.insert(0, os.path.abspath("../.."))
import pytest
import litellm
from litellm.integrations.pagerduty.pagerduty import PagerDutyAlerting, AlertingConfig
from litellm.proxy._types import UserAPIKeyAuth
@pytest.mark.asyncio
async def test_pagerduty_alerting():
pagerduty = PagerDutyAlerting(
alerting_args=AlertingConfig(
failure_threshold=1, failure_threshold_window_seconds=10
)
)
litellm.callbacks = [pagerduty]
try:
await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "hi"}],
mock_response="litellm.RateLimitError",
)
except litellm.RateLimitError:
pass
await asyncio.sleep(2)
@pytest.mark.asyncio
async def test_pagerduty_alerting_high_failure_rate():
pagerduty = PagerDutyAlerting(
alerting_args=AlertingConfig(
failure_threshold=3, failure_threshold_window_seconds=600
)
)
litellm.callbacks = [pagerduty]
try:
await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "hi"}],
mock_response="litellm.RateLimitError",
)
except litellm.RateLimitError:
pass
await asyncio.sleep(2)
# make 3 more fails
for _ in range(3):
try:
await litellm.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "hi"}],
mock_response="litellm.RateLimitError",
)
except litellm.RateLimitError:
pass
await asyncio.sleep(2)
@pytest.mark.asyncio
async def test_pagerduty_hanging_request_alerting():
pagerduty = PagerDutyAlerting(
alerting_args=AlertingConfig(hanging_threshold_seconds=0.0000001)
)
litellm.callbacks = [pagerduty]
await pagerduty.async_pre_call_hook(
cache=None,
user_api_key_dict=UserAPIKeyAuth(
api_key="test",
key_alias="test-pagerduty",
team_alias="test-team",
org_id="test-org",
user_id="test-user",
end_user_id="test-end-user",
),
data={"model": "gpt-4o", "messages": [{"role": "user", "content": "hi"}]},
call_type="completion",
)
await litellm.acompletion(
model="gpt-4o",
messages=[{"role": "user", "content": "hi"}],
)
await asyncio.sleep(1)

View file

@ -20,6 +20,7 @@ from prometheus_client import REGISTRY, CollectorRegistry
from litellm.integrations.lago import LagoLogger from litellm.integrations.lago import LagoLogger
from litellm.integrations.openmeter import OpenMeterLogger from litellm.integrations.openmeter import OpenMeterLogger
from litellm.integrations.braintrust_logging import BraintrustLogger from litellm.integrations.braintrust_logging import BraintrustLogger
from litellm.integrations.pagerduty.pagerduty import PagerDutyAlerting
from litellm.integrations.galileo import GalileoObserve from litellm.integrations.galileo import GalileoObserve
from litellm.integrations.langsmith import LangsmithLogger from litellm.integrations.langsmith import LangsmithLogger
from litellm.integrations.literal_ai import LiteralAILogger from litellm.integrations.literal_ai import LiteralAILogger
@ -68,6 +69,7 @@ callback_class_str_to_classType = {
"mlflow": MlflowLogger, "mlflow": MlflowLogger,
"langfuse": LangfusePromptManagement, "langfuse": LangfusePromptManagement,
"otel": OpenTelemetry, "otel": OpenTelemetry,
"pagerduty": PagerDutyAlerting,
} }
expected_env_vars = { expected_env_vars = {
@ -87,6 +89,7 @@ expected_env_vars = {
"ARIZE_SPACE_KEY": "arize_space_key", "ARIZE_SPACE_KEY": "arize_space_key",
"ARIZE_API_KEY": "arize_api_key", "ARIZE_API_KEY": "arize_api_key",
"ARGILLA_API_KEY": "argilla_api_key", "ARGILLA_API_KEY": "argilla_api_key",
"PAGERDUTY_API_KEY": "pagerduty_api_key",
} }