diff --git a/litellm/integrations/langfuse.py b/litellm/integrations/langfuse.py index fa8b0c61d7..caf5437b24 100644 --- a/litellm/integrations/langfuse.py +++ b/litellm/integrations/langfuse.py @@ -262,7 +262,7 @@ class LangFuseLogger: try: tags = [] - metadata = copy.deepcopy(metadata) # Avoid modifying the original metadata + metadata = copy.deepcopy(metadata) # Avoid modifying the original metadata supports_tags = Version(langfuse.version.__version__) >= Version("2.6.3") supports_prompt = Version(langfuse.version.__version__) >= Version("2.7.3") supports_costs = Version(langfuse.version.__version__) >= Version("2.7.3") @@ -276,7 +276,6 @@ class LangFuseLogger: metadata_tags = metadata.pop("tags", []) tags = metadata_tags - # Clean Metadata before logging - never log raw metadata # the raw metadata can contain circular references which leads to infinite recursion # we clean out all extra litellm metadata params before logging @@ -303,18 +302,17 @@ class LangFuseLogger: else: clean_metadata[key] = value - session_id = clean_metadata.pop("session_id", None) trace_name = clean_metadata.pop("trace_name", None) trace_id = clean_metadata.pop("trace_id", None) existing_trace_id = clean_metadata.pop("existing_trace_id", None) update_trace_keys = clean_metadata.pop("update_trace_keys", []) - + if trace_name is None and existing_trace_id is None: # just log `litellm-{call_type}` as the trace name ## DO NOT SET TRACE_NAME if trace-id set. this can lead to overwriting of past traces. trace_name = f"litellm-{kwargs.get('call_type', 'completion')}" - + if existing_trace_id is not None: trace_params = {"id": existing_trace_id} @@ -322,15 +320,18 @@ class LangFuseLogger: for metadata_param_key in update_trace_keys: trace_param_key = metadata_param_key.replace("trace_", "") if trace_param_key not in trace_params: - updated_trace_value = clean_metadata.pop(metadata_param_key, None) + updated_trace_value = clean_metadata.pop( + metadata_param_key, None + ) if updated_trace_value is not None: trace_params[trace_param_key] = updated_trace_value - # Pop the trace specific keys that would have been popped if there were a new trace - for key in list(filter(lambda key: key.startswith("trace_"), clean_metadata.keys())): + for key in list( + filter(lambda key: key.startswith("trace_"), clean_metadata.keys()) + ): clean_metadata.pop(key, None) - + # Special keys that are found in the function arguments and not the metadata if "input" in update_trace_keys: trace_params["input"] = input @@ -342,16 +343,22 @@ class LangFuseLogger: "name": trace_name, "session_id": session_id, "input": input, - "version": clean_metadata.pop("trace_version", clean_metadata.get("version", None)), # If provided just version, it will applied to the trace as well, if applied a trace version it will take precedence + "version": clean_metadata.pop( + "trace_version", clean_metadata.get("version", None) + ), # If provided just version, it will applied to the trace as well, if applied a trace version it will take precedence } - for key in list(filter(lambda key: key.startswith("trace_"), clean_metadata.keys())): - trace_params[key.replace("trace_", "")] = clean_metadata.pop(key, None) - + for key in list( + filter(lambda key: key.startswith("trace_"), clean_metadata.keys()) + ): + trace_params[key.replace("trace_", "")] = clean_metadata.pop( + key, None + ) + if level == "ERROR": trace_params["status_message"] = output else: trace_params["output"] = output - + cost = kwargs.get("response_cost", None) print_verbose(f"trace: {cost}") @@ -454,7 +461,7 @@ class LangFuseLogger: ) generation_client = trace.generation(**generation_params) - + return generation_client.trace_id, generation_id except Exception as e: verbose_logger.debug(f"Langfuse Layer Error - {traceback.format_exc()}") diff --git a/litellm/integrations/slack_alerting.py b/litellm/integrations/slack_alerting.py index 5546f7c337..f033d99cdc 100644 --- a/litellm/integrations/slack_alerting.py +++ b/litellm/integrations/slack_alerting.py @@ -149,16 +149,21 @@ class SlackAlerting(CustomLogger): def _add_langfuse_trace_id_to_alert( self, - request_info: str, request_data: Optional[dict] = None, - kwargs: Optional[dict] = None, - type: Literal["hanging_request", "slow_response"] = "hanging_request", - start_time: Optional[datetime.datetime] = None, - end_time: Optional[datetime.datetime] = None, - ): + ) -> Optional[str]: + """ + Returns langfuse trace url + """ # do nothing for now - pass - return request_info + if ( + request_data is not None + and request_data.get("metadata", {}).get("trace_id", None) is not None + ): + trace_id = request_data["metadata"]["trace_id"] + if litellm.utils.langFuseLogger is not None: + base_url = litellm.utils.langFuseLogger.Langfuse.base_url + return f"{base_url}/trace/{trace_id}" + return None def _response_taking_too_long_callback_helper( self, @@ -501,14 +506,13 @@ class SlackAlerting(CustomLogger): ) if "langfuse" in litellm.success_callback: - request_info = self._add_langfuse_trace_id_to_alert( - request_info=request_info, + langfuse_url = self._add_langfuse_trace_id_to_alert( request_data=request_data, - type="hanging_request", - start_time=start_time, - end_time=end_time, ) + if langfuse_url is not None: + request_info += "\n🪢 Langfuse Trace: {}".format(langfuse_url) + # add deployment latencies to alert _deployment_latency_map = self._get_deployment_latencies_to_alert( metadata=request_data.get("metadata", {}) @@ -701,6 +705,7 @@ Model Info: "daily_reports", "new_model_added", ], + **kwargs, ): """ Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298 @@ -731,6 +736,10 @@ Model Info: formatted_message = ( f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}" ) + + if kwargs: + for key, value in kwargs.items(): + formatted_message += f"\n\n{key}: `{value}`\n\n" if _proxy_base_url is not None: formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`" diff --git a/litellm/proxy/_super_secret_config.yaml b/litellm/proxy/_super_secret_config.yaml index 0475508e31..b7293a17f4 100644 --- a/litellm/proxy/_super_secret_config.yaml +++ b/litellm/proxy/_super_secret_config.yaml @@ -14,6 +14,9 @@ model_list: api_key: my-fake-key-3 model: openai/my-fake-model-3 model_name: fake-openai-endpoint +- model_name: gpt-4 + litellm_params: + model: gpt-3.5-turbo router_settings: num_retries: 0 enable_pre_call_checks: true @@ -25,7 +28,7 @@ router_settings: routing_strategy: "latency-based-routing" litellm_settings: - success_callback: ["openmeter"] + success_callback: ["langfuse"] general_settings: alerting: ["slack"] diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index c22b381e2c..d6d27fc4c7 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -3160,7 +3160,9 @@ def data_generator(response): yield f"data: {json.dumps(chunk)}\n\n" -async def async_data_generator(response, user_api_key_dict): +async def async_data_generator( + response, user_api_key_dict: UserAPIKeyAuth, request_data: dict +): verbose_proxy_logger.debug("inside generator") try: start_time = time.time() @@ -3177,7 +3179,9 @@ async def async_data_generator(response, user_api_key_dict): except Exception as e: traceback.print_exc() await proxy_logging_obj.post_call_failure_hook( - user_api_key_dict=user_api_key_dict, original_exception=e + user_api_key_dict=user_api_key_dict, + original_exception=e, + request_data=request_data, ) verbose_proxy_logger.debug( f"\033[1;31mAn error occurred: {e}\n\n Debug this by setting `--debug`, e.g. `litellm --model gpt-3.5-turbo --debug`" @@ -3202,8 +3206,14 @@ async def async_data_generator(response, user_api_key_dict): yield f"data: {error_returned}\n\n" -def select_data_generator(response, user_api_key_dict): - return async_data_generator(response=response, user_api_key_dict=user_api_key_dict) +def select_data_generator( + response, user_api_key_dict: UserAPIKeyAuth, request_data: dict +): + return async_data_generator( + response=response, + user_api_key_dict=user_api_key_dict, + request_data=request_data, + ) def get_litellm_model_info(model: dict = {}): @@ -3496,9 +3506,8 @@ async def chat_completion( user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), ): global general_settings, user_debug, proxy_logging_obj, llm_model_list + data = {} try: - # async with llm_router.sem - data = {} body = await request.body() body_str = body.decode() try: @@ -3689,7 +3698,9 @@ async def chat_completion( "x-litellm-model-api-base": api_base, } selected_data_generator = select_data_generator( - response=response, user_api_key_dict=user_api_key_dict + response=response, + user_api_key_dict=user_api_key_dict, + request_data=data, ) return StreamingResponse( selected_data_generator, @@ -3711,7 +3722,7 @@ async def chat_completion( data["litellm_status"] = "fail" # used for alerting traceback.print_exc() await proxy_logging_obj.post_call_failure_hook( - user_api_key_dict=user_api_key_dict, original_exception=e + user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data ) verbose_proxy_logger.debug( f"\033[1;31mAn error occurred: {e}\n\n Debug this by setting `--debug`, e.g. `litellm --model gpt-3.5-turbo --debug`" @@ -3873,7 +3884,9 @@ async def completion( "x-litellm-model-id": model_id, } selected_data_generator = select_data_generator( - response=response, user_api_key_dict=user_api_key_dict + response=response, + user_api_key_dict=user_api_key_dict, + request_data=data, ) return StreamingResponse( @@ -3926,6 +3939,7 @@ async def embeddings( user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), ): global proxy_logging_obj + data: Any = {} try: # Use orjson to parse JSON data, orjson speeds up requests significantly body = await request.body() @@ -4071,7 +4085,7 @@ async def embeddings( except Exception as e: data["litellm_status"] = "fail" # used for alerting await proxy_logging_obj.post_call_failure_hook( - user_api_key_dict=user_api_key_dict, original_exception=e + user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data ) traceback.print_exc() if isinstance(e, HTTPException): @@ -4108,6 +4122,7 @@ async def image_generation( user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), ): global proxy_logging_obj + data = {} try: # Use orjson to parse JSON data, orjson speeds up requests significantly body = await request.body() @@ -4227,7 +4242,7 @@ async def image_generation( except Exception as e: data["litellm_status"] = "fail" # used for alerting await proxy_logging_obj.post_call_failure_hook( - user_api_key_dict=user_api_key_dict, original_exception=e + user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data ) traceback.print_exc() if isinstance(e, HTTPException): @@ -4268,10 +4283,11 @@ async def audio_transcriptions( https://platform.openai.com/docs/api-reference/audio/createTranscription?lang=curl """ global proxy_logging_obj + data: Dict = {} try: # Use orjson to parse JSON data, orjson speeds up requests significantly form_data = await request.form() - data: Dict = {key: value for key, value in form_data.items() if key != "file"} + data = {key: value for key, value in form_data.items() if key != "file"} # Include original request and headers in the data data["proxy_server_request"] = { # type: ignore @@ -4406,7 +4422,7 @@ async def audio_transcriptions( except Exception as e: data["litellm_status"] = "fail" # used for alerting await proxy_logging_obj.post_call_failure_hook( - user_api_key_dict=user_api_key_dict, original_exception=e + user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data ) traceback.print_exc() if isinstance(e, HTTPException): @@ -4455,6 +4471,7 @@ async def moderations( ``` """ global proxy_logging_obj + data: Dict = {} try: # Use orjson to parse JSON data, orjson speeds up requests significantly body = await request.body() @@ -4568,7 +4585,7 @@ async def moderations( except Exception as e: data["litellm_status"] = "fail" # used for alerting await proxy_logging_obj.post_call_failure_hook( - user_api_key_dict=user_api_key_dict, original_exception=e + user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data ) traceback.print_exc() if isinstance(e, HTTPException): @@ -7999,8 +8016,8 @@ async def async_queue_request( Now using a FastAPI background task + /chat/completions compatible endpoint """ + data = {} try: - data = {} data = await request.json() # type: ignore # Include original request and headers in the data @@ -8065,7 +8082,9 @@ async def async_queue_request( ): # use generate_responses to stream responses return StreamingResponse( async_data_generator( - user_api_key_dict=user_api_key_dict, response=response + user_api_key_dict=user_api_key_dict, + response=response, + request_data=data, ), media_type="text/event-stream", ) @@ -8073,7 +8092,7 @@ async def async_queue_request( return response except Exception as e: await proxy_logging_obj.post_call_failure_hook( - user_api_key_dict=user_api_key_dict, original_exception=e + user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data ) if isinstance(e, HTTPException): raise ProxyException( diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index 9734806dfb..0379d51528 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -302,6 +302,7 @@ class ProxyLogging: "budget_alerts", "db_exceptions", ], + request_data: Optional[dict] = None, ): """ Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298 @@ -331,10 +332,19 @@ class ProxyLogging: if _proxy_base_url is not None: formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`" + extra_kwargs = {} + if request_data is not None: + _url = self.slack_alerting_instance._add_langfuse_trace_id_to_alert( + request_data=request_data + ) + if _url is not None: + extra_kwargs["🪢 Langfuse Trace"] = _url + formatted_message += "\n\n🪢 Langfuse Trace: {}".format(_url) + for client in self.alerting: if client == "slack": await self.slack_alerting_instance.send_alert( - message=message, level=level, alert_type=alert_type + message=message, level=level, alert_type=alert_type, **extra_kwargs ) elif client == "sentry": if litellm.utils.sentry_sdk_instance is not None: @@ -369,6 +379,7 @@ class ProxyLogging: message=f"DB read/write call failed: {error_message}", level="High", alert_type="db_exceptions", + request_data={}, ) ) @@ -384,7 +395,10 @@ class ProxyLogging: litellm.utils.capture_exception(error=original_exception) async def post_call_failure_hook( - self, original_exception: Exception, user_api_key_dict: UserAPIKeyAuth + self, + original_exception: Exception, + user_api_key_dict: UserAPIKeyAuth, + request_data: dict, ): """ Allows users to raise custom exceptions/log when a call fails, without having to deal with parsing Request body. @@ -409,6 +423,7 @@ class ProxyLogging: message=f"LLM API call failed: {str(original_exception)}", level="High", alert_type="llm_exceptions", + request_data=request_data, ) )