litellm-mirror/litellm/integrations/SlackAlerting/utils.py
Ishaan Jaff 03b1db5a7d
(Feat) - Add PagerDuty Alerting Integration (#7478)
* define basic types

* fix verbose_logger.exception statement

* fix basic alerting

* test pager duty alerting

* test_pagerduty_alerting_high_failure_rate

* PagerDutyAlerting

* async_log_failure_event

* use pre_call_hook

* add _request_is_completed helper util

* update AlertingConfig

* rename PagerDutyInternalEvent

* _send_alert_if_thresholds_crossed

* use pagerduty as _custom_logger_compatible_callbacks_literal

* fix slack alerting imports

* fix imports in slack alerting

* PagerDutyAlerting

* fix _load_alerting_settings

* test_pagerduty_hanging_request_alerting

* working pager duty alerting

* fix linting

* doc pager duty alerting

* update hanging_response_handler

* fix import location

* update failure_threshold

* update async_pre_call_hook

* docs pagerduty

* test - callback_class_str_to_classType

* fix linting errors

* fix linting + testing error

* PagerDutyAlerting

* test_pagerduty_hanging_request_alerting

* fix unused imports

* docs pager duty

* @pytest.mark.flaky(retries=6, delay=2)

* test_model_info_bedrock_converse_enforcement
2025-01-01 07:12:51 -08:00

92 lines
3 KiB
Python

"""
Utils used for slack alerting
"""
import asyncio
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from litellm.proxy._types import AlertType
from litellm.secret_managers.main import get_secret
if TYPE_CHECKING:
from litellm.litellm_core_utils.litellm_logging import Logging as _Logging
Logging = _Logging
else:
Logging = Any
def process_slack_alerting_variables(
alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]]
) -> Optional[Dict[AlertType, Union[List[str], str]]]:
"""
process alert_to_webhook_url
- check if any urls are set as os.environ/SLACK_WEBHOOK_URL_1 read env var and set the correct value
"""
if alert_to_webhook_url is None:
return None
for alert_type, webhook_urls in alert_to_webhook_url.items():
if isinstance(webhook_urls, list):
_webhook_values: List[str] = []
for webhook_url in webhook_urls:
if "os.environ/" in webhook_url:
_env_value = get_secret(secret_name=webhook_url)
if not isinstance(_env_value, str):
raise ValueError(
f"Invalid webhook url value for: {webhook_url}. Got type={type(_env_value)}"
)
_webhook_values.append(_env_value)
else:
_webhook_values.append(webhook_url)
alert_to_webhook_url[alert_type] = _webhook_values
else:
_webhook_value_str: str = webhook_urls
if "os.environ/" in webhook_urls:
_env_value = get_secret(secret_name=webhook_urls)
if not isinstance(_env_value, str):
raise ValueError(
f"Invalid webhook url value for: {webhook_urls}. Got type={type(_env_value)}"
)
_webhook_value_str = _env_value
else:
_webhook_value_str = webhook_urls
alert_to_webhook_url[alert_type] = _webhook_value_str
return alert_to_webhook_url
async def _add_langfuse_trace_id_to_alert(
request_data: Optional[dict] = None,
) -> Optional[str]:
"""
Returns langfuse trace url
- check:
-> existing_trace_id
-> trace_id
-> litellm_call_id
"""
# do nothing for now
if (
request_data is not None
and request_data.get("litellm_logging_obj", None) is not None
):
trace_id: Optional[str] = None
litellm_logging_obj: Logging = request_data["litellm_logging_obj"]
for _ in range(3):
trace_id = litellm_logging_obj._get_trace_id(service_name="langfuse")
if trace_id is not None:
break
await asyncio.sleep(3) # wait 3s before retrying for trace id
_langfuse_object = litellm_logging_obj._get_callback_object(
service_name="langfuse"
)
if _langfuse_object is not None:
base_url = _langfuse_object.Langfuse.base_url
return f"{base_url}/trace/{trace_id}"
return None