Merge pull request #3506 from BerriAI/litellm_reintegrate_langfuse_url_slack_alert

feat(slack_alerting.py): reintegrate langfuse trace url for slack alerts
This commit is contained in:
Krish Dholakia 2024-05-07 15:03:29 -07:00 committed by GitHub
commit 2aaaa5e1b4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 164 additions and 122 deletions

View file

@ -276,7 +276,6 @@ class LangFuseLogger:
metadata_tags = metadata.pop("tags", []) metadata_tags = metadata.pop("tags", [])
tags = metadata_tags tags = metadata_tags
# Clean Metadata before logging - never log raw metadata # Clean Metadata before logging - never log raw metadata
# the raw metadata can contain circular references which leads to infinite recursion # the raw metadata can contain circular references which leads to infinite recursion
# we clean out all extra litellm metadata params before logging # we clean out all extra litellm metadata params before logging
@ -303,7 +302,6 @@ class LangFuseLogger:
else: else:
clean_metadata[key] = value clean_metadata[key] = value
session_id = clean_metadata.pop("session_id", None) session_id = clean_metadata.pop("session_id", None)
trace_name = clean_metadata.pop("trace_name", None) trace_name = clean_metadata.pop("trace_name", None)
trace_id = clean_metadata.pop("trace_id", None) trace_id = clean_metadata.pop("trace_id", None)
@ -322,13 +320,16 @@ class LangFuseLogger:
for metadata_param_key in update_trace_keys: for metadata_param_key in update_trace_keys:
trace_param_key = metadata_param_key.replace("trace_", "") trace_param_key = metadata_param_key.replace("trace_", "")
if trace_param_key not in trace_params: if trace_param_key not in trace_params:
updated_trace_value = clean_metadata.pop(metadata_param_key, None) updated_trace_value = clean_metadata.pop(
metadata_param_key, None
)
if updated_trace_value is not None: if updated_trace_value is not None:
trace_params[trace_param_key] = updated_trace_value trace_params[trace_param_key] = updated_trace_value
# Pop the trace specific keys that would have been popped if there were a new trace # Pop the trace specific keys that would have been popped if there were a new trace
for key in list(filter(lambda key: key.startswith("trace_"), clean_metadata.keys())): for key in list(
filter(lambda key: key.startswith("trace_"), clean_metadata.keys())
):
clean_metadata.pop(key, None) clean_metadata.pop(key, None)
# Special keys that are found in the function arguments and not the metadata # Special keys that are found in the function arguments and not the metadata
@ -342,10 +343,16 @@ class LangFuseLogger:
"name": trace_name, "name": trace_name,
"session_id": session_id, "session_id": session_id,
"input": input, "input": input,
"version": clean_metadata.pop("trace_version", clean_metadata.get("version", None)), # If provided just version, it will applied to the trace as well, if applied a trace version it will take precedence "version": clean_metadata.pop(
"trace_version", clean_metadata.get("version", None)
), # If provided just version, it will applied to the trace as well, if applied a trace version it will take precedence
} }
for key in list(filter(lambda key: key.startswith("trace_"), clean_metadata.keys())): for key in list(
trace_params[key.replace("trace_", "")] = clean_metadata.pop(key, None) filter(lambda key: key.startswith("trace_"), clean_metadata.keys())
):
trace_params[key.replace("trace_", "")] = clean_metadata.pop(
key, None
)
if level == "ERROR": if level == "ERROR":
trace_params["status_message"] = output trace_params["status_message"] = output

View file

@ -149,16 +149,21 @@ class SlackAlerting(CustomLogger):
def _add_langfuse_trace_id_to_alert( def _add_langfuse_trace_id_to_alert(
self, self,
request_info: str,
request_data: Optional[dict] = None, request_data: Optional[dict] = None,
kwargs: Optional[dict] = None, ) -> Optional[str]:
type: Literal["hanging_request", "slow_response"] = "hanging_request", """
start_time: Optional[datetime.datetime] = None, Returns langfuse trace url
end_time: Optional[datetime.datetime] = None, """
):
# do nothing for now # do nothing for now
pass if (
return request_info request_data is not None
and request_data.get("metadata", {}).get("trace_id", None) is not None
):
trace_id = request_data["metadata"]["trace_id"]
if litellm.utils.langFuseLogger is not None:
base_url = litellm.utils.langFuseLogger.Langfuse.base_url
return f"{base_url}/trace/{trace_id}"
return None
def _response_taking_too_long_callback_helper( def _response_taking_too_long_callback_helper(
self, self,
@ -501,14 +506,13 @@ class SlackAlerting(CustomLogger):
) )
if "langfuse" in litellm.success_callback: if "langfuse" in litellm.success_callback:
request_info = self._add_langfuse_trace_id_to_alert( langfuse_url = self._add_langfuse_trace_id_to_alert(
request_info=request_info,
request_data=request_data, request_data=request_data,
type="hanging_request",
start_time=start_time,
end_time=end_time,
) )
if langfuse_url is not None:
request_info += "\n🪢 Langfuse Trace: {}".format(langfuse_url)
# add deployment latencies to alert # add deployment latencies to alert
_deployment_latency_map = self._get_deployment_latencies_to_alert( _deployment_latency_map = self._get_deployment_latencies_to_alert(
metadata=request_data.get("metadata", {}) metadata=request_data.get("metadata", {})
@ -701,6 +705,7 @@ Model Info:
"daily_reports", "daily_reports",
"new_model_added", "new_model_added",
], ],
**kwargs,
): ):
""" """
Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298 Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298
@ -731,6 +736,10 @@ Model Info:
formatted_message = ( formatted_message = (
f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}" f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}"
) )
if kwargs:
for key, value in kwargs.items():
formatted_message += f"\n\n{key}: `{value}`\n\n"
if _proxy_base_url is not None: if _proxy_base_url is not None:
formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`" formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`"

View file

@ -14,6 +14,9 @@ model_list:
api_key: my-fake-key-3 api_key: my-fake-key-3
model: openai/my-fake-model-3 model: openai/my-fake-model-3
model_name: fake-openai-endpoint model_name: fake-openai-endpoint
- model_name: gpt-4
litellm_params:
model: gpt-3.5-turbo
router_settings: router_settings:
num_retries: 0 num_retries: 0
enable_pre_call_checks: true enable_pre_call_checks: true
@ -25,7 +28,7 @@ router_settings:
routing_strategy: "latency-based-routing" routing_strategy: "latency-based-routing"
litellm_settings: litellm_settings:
success_callback: ["openmeter"] success_callback: ["langfuse"]
general_settings: general_settings:
alerting: ["slack"] alerting: ["slack"]

View file

@ -3164,7 +3164,9 @@ def data_generator(response):
yield f"data: {json.dumps(chunk)}\n\n" yield f"data: {json.dumps(chunk)}\n\n"
async def async_data_generator(response, user_api_key_dict): async def async_data_generator(
response, user_api_key_dict: UserAPIKeyAuth, request_data: dict
):
verbose_proxy_logger.debug("inside generator") verbose_proxy_logger.debug("inside generator")
try: try:
start_time = time.time() start_time = time.time()
@ -3181,7 +3183,9 @@ async def async_data_generator(response, user_api_key_dict):
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
await proxy_logging_obj.post_call_failure_hook( await proxy_logging_obj.post_call_failure_hook(
user_api_key_dict=user_api_key_dict, original_exception=e user_api_key_dict=user_api_key_dict,
original_exception=e,
request_data=request_data,
) )
verbose_proxy_logger.debug( verbose_proxy_logger.debug(
f"\033[1;31mAn error occurred: {e}\n\n Debug this by setting `--debug`, e.g. `litellm --model gpt-3.5-turbo --debug`" f"\033[1;31mAn error occurred: {e}\n\n Debug this by setting `--debug`, e.g. `litellm --model gpt-3.5-turbo --debug`"
@ -3206,8 +3210,14 @@ async def async_data_generator(response, user_api_key_dict):
yield f"data: {error_returned}\n\n" yield f"data: {error_returned}\n\n"
def select_data_generator(response, user_api_key_dict): def select_data_generator(
return async_data_generator(response=response, user_api_key_dict=user_api_key_dict) response, user_api_key_dict: UserAPIKeyAuth, request_data: dict
):
return async_data_generator(
response=response,
user_api_key_dict=user_api_key_dict,
request_data=request_data,
)
def get_litellm_model_info(model: dict = {}): def get_litellm_model_info(model: dict = {}):
@ -3502,9 +3512,8 @@ async def chat_completion(
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
): ):
global general_settings, user_debug, proxy_logging_obj, llm_model_list global general_settings, user_debug, proxy_logging_obj, llm_model_list
try:
# async with llm_router.sem
data = {} data = {}
try:
body = await request.body() body = await request.body()
body_str = body.decode() body_str = body.decode()
try: try:
@ -3695,7 +3704,9 @@ async def chat_completion(
"x-litellm-model-api-base": api_base, "x-litellm-model-api-base": api_base,
} }
selected_data_generator = select_data_generator( selected_data_generator = select_data_generator(
response=response, user_api_key_dict=user_api_key_dict response=response,
user_api_key_dict=user_api_key_dict,
request_data=data,
) )
return StreamingResponse( return StreamingResponse(
selected_data_generator, selected_data_generator,
@ -3717,7 +3728,7 @@ async def chat_completion(
data["litellm_status"] = "fail" # used for alerting data["litellm_status"] = "fail" # used for alerting
traceback.print_exc() traceback.print_exc()
await proxy_logging_obj.post_call_failure_hook( await proxy_logging_obj.post_call_failure_hook(
user_api_key_dict=user_api_key_dict, original_exception=e user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data
) )
verbose_proxy_logger.debug( verbose_proxy_logger.debug(
f"\033[1;31mAn error occurred: {e}\n\n Debug this by setting `--debug`, e.g. `litellm --model gpt-3.5-turbo --debug`" f"\033[1;31mAn error occurred: {e}\n\n Debug this by setting `--debug`, e.g. `litellm --model gpt-3.5-turbo --debug`"
@ -3879,7 +3890,9 @@ async def completion(
"x-litellm-model-id": model_id, "x-litellm-model-id": model_id,
} }
selected_data_generator = select_data_generator( selected_data_generator = select_data_generator(
response=response, user_api_key_dict=user_api_key_dict response=response,
user_api_key_dict=user_api_key_dict,
request_data=data,
) )
return StreamingResponse( return StreamingResponse(
@ -3932,6 +3945,7 @@ async def embeddings(
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
): ):
global proxy_logging_obj global proxy_logging_obj
data: Any = {}
try: try:
# Use orjson to parse JSON data, orjson speeds up requests significantly # Use orjson to parse JSON data, orjson speeds up requests significantly
body = await request.body() body = await request.body()
@ -4077,7 +4091,7 @@ async def embeddings(
except Exception as e: except Exception as e:
data["litellm_status"] = "fail" # used for alerting data["litellm_status"] = "fail" # used for alerting
await proxy_logging_obj.post_call_failure_hook( await proxy_logging_obj.post_call_failure_hook(
user_api_key_dict=user_api_key_dict, original_exception=e user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data
) )
traceback.print_exc() traceback.print_exc()
if isinstance(e, HTTPException): if isinstance(e, HTTPException):
@ -4114,6 +4128,7 @@ async def image_generation(
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
): ):
global proxy_logging_obj global proxy_logging_obj
data = {}
try: try:
# Use orjson to parse JSON data, orjson speeds up requests significantly # Use orjson to parse JSON data, orjson speeds up requests significantly
body = await request.body() body = await request.body()
@ -4233,7 +4248,7 @@ async def image_generation(
except Exception as e: except Exception as e:
data["litellm_status"] = "fail" # used for alerting data["litellm_status"] = "fail" # used for alerting
await proxy_logging_obj.post_call_failure_hook( await proxy_logging_obj.post_call_failure_hook(
user_api_key_dict=user_api_key_dict, original_exception=e user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data
) )
traceback.print_exc() traceback.print_exc()
if isinstance(e, HTTPException): if isinstance(e, HTTPException):
@ -4274,10 +4289,11 @@ async def audio_transcriptions(
https://platform.openai.com/docs/api-reference/audio/createTranscription?lang=curl https://platform.openai.com/docs/api-reference/audio/createTranscription?lang=curl
""" """
global proxy_logging_obj global proxy_logging_obj
data: Dict = {}
try: try:
# Use orjson to parse JSON data, orjson speeds up requests significantly # Use orjson to parse JSON data, orjson speeds up requests significantly
form_data = await request.form() form_data = await request.form()
data: Dict = {key: value for key, value in form_data.items() if key != "file"} data = {key: value for key, value in form_data.items() if key != "file"}
# Include original request and headers in the data # Include original request and headers in the data
data["proxy_server_request"] = { # type: ignore data["proxy_server_request"] = { # type: ignore
@ -4412,7 +4428,7 @@ async def audio_transcriptions(
except Exception as e: except Exception as e:
data["litellm_status"] = "fail" # used for alerting data["litellm_status"] = "fail" # used for alerting
await proxy_logging_obj.post_call_failure_hook( await proxy_logging_obj.post_call_failure_hook(
user_api_key_dict=user_api_key_dict, original_exception=e user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data
) )
traceback.print_exc() traceback.print_exc()
if isinstance(e, HTTPException): if isinstance(e, HTTPException):
@ -4461,6 +4477,7 @@ async def moderations(
``` ```
""" """
global proxy_logging_obj global proxy_logging_obj
data: Dict = {}
try: try:
# Use orjson to parse JSON data, orjson speeds up requests significantly # Use orjson to parse JSON data, orjson speeds up requests significantly
body = await request.body() body = await request.body()
@ -4574,7 +4591,7 @@ async def moderations(
except Exception as e: except Exception as e:
data["litellm_status"] = "fail" # used for alerting data["litellm_status"] = "fail" # used for alerting
await proxy_logging_obj.post_call_failure_hook( await proxy_logging_obj.post_call_failure_hook(
user_api_key_dict=user_api_key_dict, original_exception=e user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data
) )
traceback.print_exc() traceback.print_exc()
if isinstance(e, HTTPException): if isinstance(e, HTTPException):
@ -8006,8 +8023,8 @@ async def async_queue_request(
Now using a FastAPI background task + /chat/completions compatible endpoint Now using a FastAPI background task + /chat/completions compatible endpoint
""" """
try:
data = {} data = {}
try:
data = await request.json() # type: ignore data = await request.json() # type: ignore
# Include original request and headers in the data # Include original request and headers in the data
@ -8072,7 +8089,9 @@ async def async_queue_request(
): # use generate_responses to stream responses ): # use generate_responses to stream responses
return StreamingResponse( return StreamingResponse(
async_data_generator( async_data_generator(
user_api_key_dict=user_api_key_dict, response=response user_api_key_dict=user_api_key_dict,
response=response,
request_data=data,
), ),
media_type="text/event-stream", media_type="text/event-stream",
) )
@ -8080,7 +8099,7 @@ async def async_queue_request(
return response return response
except Exception as e: except Exception as e:
await proxy_logging_obj.post_call_failure_hook( await proxy_logging_obj.post_call_failure_hook(
user_api_key_dict=user_api_key_dict, original_exception=e user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data
) )
if isinstance(e, HTTPException): if isinstance(e, HTTPException):
raise ProxyException( raise ProxyException(

View file

@ -302,6 +302,7 @@ class ProxyLogging:
"budget_alerts", "budget_alerts",
"db_exceptions", "db_exceptions",
], ],
request_data: Optional[dict] = None,
): ):
""" """
Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298 Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298
@ -331,10 +332,19 @@ class ProxyLogging:
if _proxy_base_url is not None: if _proxy_base_url is not None:
formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`" formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`"
extra_kwargs = {}
if request_data is not None:
_url = self.slack_alerting_instance._add_langfuse_trace_id_to_alert(
request_data=request_data
)
if _url is not None:
extra_kwargs["🪢 Langfuse Trace"] = _url
formatted_message += "\n\n🪢 Langfuse Trace: {}".format(_url)
for client in self.alerting: for client in self.alerting:
if client == "slack": if client == "slack":
await self.slack_alerting_instance.send_alert( await self.slack_alerting_instance.send_alert(
message=message, level=level, alert_type=alert_type message=message, level=level, alert_type=alert_type, **extra_kwargs
) )
elif client == "sentry": elif client == "sentry":
if litellm.utils.sentry_sdk_instance is not None: if litellm.utils.sentry_sdk_instance is not None:
@ -369,6 +379,7 @@ class ProxyLogging:
message=f"DB read/write call failed: {error_message}", message=f"DB read/write call failed: {error_message}",
level="High", level="High",
alert_type="db_exceptions", alert_type="db_exceptions",
request_data={},
) )
) )
@ -384,7 +395,10 @@ class ProxyLogging:
litellm.utils.capture_exception(error=original_exception) litellm.utils.capture_exception(error=original_exception)
async def post_call_failure_hook( async def post_call_failure_hook(
self, original_exception: Exception, user_api_key_dict: UserAPIKeyAuth self,
original_exception: Exception,
user_api_key_dict: UserAPIKeyAuth,
request_data: dict,
): ):
""" """
Allows users to raise custom exceptions/log when a call fails, without having to deal with parsing Request body. Allows users to raise custom exceptions/log when a call fails, without having to deal with parsing Request body.
@ -409,6 +423,7 @@ class ProxyLogging:
message=f"LLM API call failed: {str(original_exception)}", message=f"LLM API call failed: {str(original_exception)}",
level="High", level="High",
alert_type="llm_exceptions", alert_type="llm_exceptions",
request_data=request_data,
) )
) )

File diff suppressed because one or more lines are too long