mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 10:44:24 +00:00
* define basic types * fix verbose_logger.exception statement * fix basic alerting * test pager duty alerting * test_pagerduty_alerting_high_failure_rate * PagerDutyAlerting * async_log_failure_event * use pre_call_hook * add _request_is_completed helper util * update AlertingConfig * rename PagerDutyInternalEvent * _send_alert_if_thresholds_crossed * use pagerduty as _custom_logger_compatible_callbacks_literal * fix slack alerting imports * fix imports in slack alerting * PagerDutyAlerting * fix _load_alerting_settings * test_pagerduty_hanging_request_alerting * working pager duty alerting * fix linting * doc pager duty alerting * update hanging_response_handler * fix import location * update failure_threshold * update async_pre_call_hook * docs pagerduty * test - callback_class_str_to_classType * fix linting errors * fix linting + testing error * PagerDutyAlerting * test_pagerduty_hanging_request_alerting * fix unused imports * docs pager duty * @pytest.mark.flaky(retries=6, delay=2) * test_model_info_bedrock_converse_enforcement
92 lines
3 KiB
Python
92 lines
3 KiB
Python
"""
|
|
Utils used for slack alerting
|
|
"""
|
|
|
|
import asyncio
|
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
|
|
|
|
from litellm.proxy._types import AlertType
|
|
from litellm.secret_managers.main import get_secret
|
|
|
|
if TYPE_CHECKING:
|
|
from litellm.litellm_core_utils.litellm_logging import Logging as _Logging
|
|
|
|
Logging = _Logging
|
|
else:
|
|
Logging = Any
|
|
|
|
|
|
def process_slack_alerting_variables(
|
|
alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]]
|
|
) -> Optional[Dict[AlertType, Union[List[str], str]]]:
|
|
"""
|
|
process alert_to_webhook_url
|
|
- check if any urls are set as os.environ/SLACK_WEBHOOK_URL_1 read env var and set the correct value
|
|
"""
|
|
if alert_to_webhook_url is None:
|
|
return None
|
|
|
|
for alert_type, webhook_urls in alert_to_webhook_url.items():
|
|
if isinstance(webhook_urls, list):
|
|
_webhook_values: List[str] = []
|
|
for webhook_url in webhook_urls:
|
|
if "os.environ/" in webhook_url:
|
|
_env_value = get_secret(secret_name=webhook_url)
|
|
if not isinstance(_env_value, str):
|
|
raise ValueError(
|
|
f"Invalid webhook url value for: {webhook_url}. Got type={type(_env_value)}"
|
|
)
|
|
_webhook_values.append(_env_value)
|
|
else:
|
|
_webhook_values.append(webhook_url)
|
|
|
|
alert_to_webhook_url[alert_type] = _webhook_values
|
|
else:
|
|
_webhook_value_str: str = webhook_urls
|
|
if "os.environ/" in webhook_urls:
|
|
_env_value = get_secret(secret_name=webhook_urls)
|
|
if not isinstance(_env_value, str):
|
|
raise ValueError(
|
|
f"Invalid webhook url value for: {webhook_urls}. Got type={type(_env_value)}"
|
|
)
|
|
_webhook_value_str = _env_value
|
|
else:
|
|
_webhook_value_str = webhook_urls
|
|
|
|
alert_to_webhook_url[alert_type] = _webhook_value_str
|
|
|
|
return alert_to_webhook_url
|
|
|
|
|
|
async def _add_langfuse_trace_id_to_alert(
|
|
request_data: Optional[dict] = None,
|
|
) -> Optional[str]:
|
|
"""
|
|
Returns langfuse trace url
|
|
|
|
- check:
|
|
-> existing_trace_id
|
|
-> trace_id
|
|
-> litellm_call_id
|
|
"""
|
|
# do nothing for now
|
|
if (
|
|
request_data is not None
|
|
and request_data.get("litellm_logging_obj", None) is not None
|
|
):
|
|
trace_id: Optional[str] = None
|
|
litellm_logging_obj: Logging = request_data["litellm_logging_obj"]
|
|
|
|
for _ in range(3):
|
|
trace_id = litellm_logging_obj._get_trace_id(service_name="langfuse")
|
|
if trace_id is not None:
|
|
break
|
|
await asyncio.sleep(3) # wait 3s before retrying for trace id
|
|
|
|
_langfuse_object = litellm_logging_obj._get_callback_object(
|
|
service_name="langfuse"
|
|
)
|
|
if _langfuse_object is not None:
|
|
base_url = _langfuse_object.Langfuse.base_url
|
|
return f"{base_url}/trace/{trace_id}"
|
|
return None
|