diff --git a/litellm/integrations/opentelemetry.py b/litellm/integrations/opentelemetry.py index e18cadce23..2226397580 100644 --- a/litellm/integrations/opentelemetry.py +++ b/litellm/integrations/opentelemetry.py @@ -6,17 +6,23 @@ import litellm from litellm.integrations.custom_logger import CustomLogger from litellm._logging import verbose_logger from litellm.types.services import ServiceLoggerPayload +from functools import wraps from typing import Union, Optional, TYPE_CHECKING, Any if TYPE_CHECKING: from opentelemetry.trace import Span as _Span from litellm.proxy.proxy_server import UserAPIKeyAuth as _UserAPIKeyAuth + from litellm.proxy._types import ( + ManagementEndpointLoggingPayload as _ManagementEndpointLoggingPayload, + ) Span = _Span UserAPIKeyAuth = _UserAPIKeyAuth + ManagementEndpointLoggingPayload = _ManagementEndpointLoggingPayload else: Span = Any UserAPIKeyAuth = Any + ManagementEndpointLoggingPayload = Any LITELLM_TRACER_NAME = os.getenv("OTEL_TRACER_NAME", "litellm") @@ -562,3 +568,48 @@ class OpenTelemetry(CustomLogger): self.OTEL_EXPORTER, ) return BatchSpanProcessor(ConsoleSpanExporter()) + + async def async_management_endpoint_success_hook( + self, + logging_payload: ManagementEndpointLoggingPayload, + parent_otel_span: Optional[Span] = None, + ): + from opentelemetry import trace + from datetime import datetime + from opentelemetry.trace import Status, StatusCode + + _start_time_ns = logging_payload.start_time + _end_time_ns = logging_payload.end_time + + start_time = logging_payload.start_time + end_time = logging_payload.end_time + + if isinstance(start_time, float): + _start_time_ns = int(int(start_time) * 1e9) + else: + _start_time_ns = self._to_ns(start_time) + + if isinstance(end_time, float): + _end_time_ns = int(int(end_time) * 1e9) + else: + _end_time_ns = self._to_ns(end_time) + + if parent_otel_span is not None: + _span_name = logging_payload.route + management_endpoint_span = self.tracer.start_span( + name=_span_name, + context=trace.set_span_in_context(parent_otel_span), + start_time=_start_time_ns, + ) + + _request_data = logging_payload.request_data + if _request_data is not None: + for key, value in _request_data.items(): + management_endpoint_span.set_attribute(f"request.{key}", value) + + _response = logging_payload.response + if _response is not None: + for key, value in _response.items(): + management_endpoint_span.set_attribute(f"response.{key}", value) + management_endpoint_span.set_status(Status(StatusCode.OK)) + management_endpoint_span.end(end_time=_end_time_ns) diff --git a/litellm/proxy/_types.py b/litellm/proxy/_types.py index 0d29f794f6..72065e5b97 100644 --- a/litellm/proxy/_types.py +++ b/litellm/proxy/_types.py @@ -1561,3 +1561,12 @@ class SpanAttributes(str, enum.Enum): LLM_OPENAI_API_BASE = "gen_ai.openai.api_base" LLM_OPENAI_API_VERSION = "gen_ai.openai.api_version" LLM_OPENAI_API_TYPE = "gen_ai.openai.api_type" + + +class ManagementEndpointLoggingPayload(LiteLLMBase): + route: str + request_data: dict + response: Optional[dict] = None + exception: Optional[Any] = None + start_time: Optional[datetime] = None + end_time: Optional[datetime] = None diff --git a/litellm/proxy/management_endpoint_utils.py b/litellm/proxy/management_endpoint_utils.py new file mode 100644 index 0000000000..aabf6e5d3b --- /dev/null +++ b/litellm/proxy/management_endpoint_utils.py @@ -0,0 +1,56 @@ +from datetime import datetime +from functools import wraps +from litellm.proxy._types import UserAPIKeyAuth, ManagementEndpointLoggingPayload +from litellm.proxy.utils import _read_request_body +from fastapi import Request + + +def management_endpoint_wrapper(func): + """ + This wrapper does the following: + + 1. Log I/O, Exceptions to OTEL + 2. Create an Audit log for success calls + """ + + @wraps(func) + async def wrapper(*args, **kwargs): + start_time = datetime.now() + + try: + result = await func(*args, **kwargs) + end_time = datetime.now() + user_api_key_dict: UserAPIKeyAuth = kwargs["user_api_key_dict"] + + parent_otel_span = user_api_key_dict.parent_otel_span + if parent_otel_span is not None: + from litellm.proxy.proxy_server import open_telemetry_logger + + if open_telemetry_logger is not None: + _http_request: Request = kwargs.get("http_request") + + _route = _http_request.url.path + _request_body: dict = await _read_request_body( + request=_http_request + ) + _response = dict(result) if result is not None else None + + logging_payload = ManagementEndpointLoggingPayload( + route=_route, + request_data=_request_body, + response=_response, + start_time=start_time, + end_time=end_time, + ) + + await open_telemetry_logger.async_management_endpoint_success_hook( + logging_payload=logging_payload, + parent_otel_span=parent_otel_span, + ) + + return result + except Exception as e: + end_time = datetime.now() + raise e + + return wrapper diff --git a/litellm/proxy/proxy_server.py b/litellm/proxy/proxy_server.py index f76bf22579..247751f0f5 100644 --- a/litellm/proxy/proxy_server.py +++ b/litellm/proxy/proxy_server.py @@ -163,6 +163,7 @@ from litellm.proxy.auth.auth_checks import ( get_actual_routes, log_to_opentelemetry, ) +from litellm.proxy.management_endpoint_utils import management_endpoint_wrapper from litellm.llms.custom_httpx.httpx_handler import HTTPHandler from litellm.exceptions import RejectedRequestError from litellm.integrations.slack_alerting import SlackAlertingArgs, SlackAlerting @@ -9912,8 +9913,10 @@ async def delete_end_user( dependencies=[Depends(user_api_key_auth)], response_model=LiteLLM_TeamTable, ) +@management_endpoint_wrapper async def new_team( data: NewTeamRequest, + http_request: Request, user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), litellm_changed_by: Optional[str] = Header( None, @@ -10147,6 +10150,7 @@ async def create_audit_log_for_update(request_data: LiteLLM_AuditLogs): @router.post( "/team/update", tags=["team management"], dependencies=[Depends(user_api_key_auth)] ) +@management_endpoint_wrapper async def update_team( data: UpdateTeamRequest, user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), @@ -10252,8 +10256,10 @@ async def update_team( tags=["team management"], dependencies=[Depends(user_api_key_auth)], ) +@management_endpoint_wrapper async def team_member_add( data: TeamMemberAddRequest, + http_request: Request, user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), ): """ @@ -10345,8 +10351,10 @@ async def team_member_add( tags=["team management"], dependencies=[Depends(user_api_key_auth)], ) +@management_endpoint_wrapper async def team_member_delete( data: TeamMemberDeleteRequest, + http_request: Request, user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), ): """ @@ -10450,8 +10458,10 @@ async def team_member_delete( @router.post( "/team/delete", tags=["team management"], dependencies=[Depends(user_api_key_auth)] ) +@management_endpoint_wrapper async def delete_team( data: DeleteTeamRequest, + http_request: Request, user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), litellm_changed_by: Optional[str] = Header( None, @@ -10535,10 +10545,12 @@ async def delete_team( @router.get( "/team/info", tags=["team management"], dependencies=[Depends(user_api_key_auth)] ) +@management_endpoint_wrapper async def team_info( + http_request: Request, team_id: str = fastapi.Query( default=None, description="Team ID in the request parameters" - ) + ), ): """ get info on team + related keys @@ -10622,8 +10634,10 @@ async def team_info( @router.post( "/team/block", tags=["team management"], dependencies=[Depends(user_api_key_auth)] ) +@management_endpoint_wrapper async def block_team( data: BlockTeamRequest, + http_request: Request, user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), ): """ @@ -10644,8 +10658,10 @@ async def block_team( @router.post( "/team/unblock", tags=["team management"], dependencies=[Depends(user_api_key_auth)] ) +@management_endpoint_wrapper async def unblock_team( data: BlockTeamRequest, + http_request: Request, user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), ): """ @@ -10666,7 +10682,9 @@ async def unblock_team( @router.get( "/team/list", tags=["team management"], dependencies=[Depends(user_api_key_auth)] ) +@management_endpoint_wrapper async def list_team( + http_request: Request, user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), ): """ diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index 6647881e29..3530b93347 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -48,6 +48,7 @@ from datetime import datetime, timedelta from litellm.integrations.slack_alerting import SlackAlerting from typing_extensions import overload from functools import wraps +from fastapi import Request if TYPE_CHECKING: from opentelemetry.trace import Span as _Span @@ -2595,7 +2596,7 @@ async def update_spend( raise e -async def _read_request_body(request): +async def _read_request_body(request: Optional[Request]) -> dict: """ Asynchronous function to read the request body and parse it as JSON or literal data. @@ -2608,7 +2609,7 @@ async def _read_request_body(request): import ast, json try: - request_data = {} + request_data: dict = {} if request is None: return request_data body = await request.body()