Merge pull request #4138 from BerriAI/litellm_log_management_endpoint_logs_otel

[FEAT]  log management endpoint logs to otel
This commit is contained in:
Ishaan Jaff 2024-06-11 19:00:13 -07:00 committed by GitHub
commit 206791f166
7 changed files with 265 additions and 35 deletions

View file

@ -6,17 +6,23 @@ import litellm
from litellm.integrations.custom_logger import CustomLogger from litellm.integrations.custom_logger import CustomLogger
from litellm._logging import verbose_logger from litellm._logging import verbose_logger
from litellm.types.services import ServiceLoggerPayload from litellm.types.services import ServiceLoggerPayload
from functools import wraps
from typing import Union, Optional, TYPE_CHECKING, Any from typing import Union, Optional, TYPE_CHECKING, Any
if TYPE_CHECKING: if TYPE_CHECKING:
from opentelemetry.trace import Span as _Span from opentelemetry.trace import Span as _Span
from litellm.proxy.proxy_server import UserAPIKeyAuth as _UserAPIKeyAuth from litellm.proxy.proxy_server import UserAPIKeyAuth as _UserAPIKeyAuth
from litellm.proxy._types import (
ManagementEndpointLoggingPayload as _ManagementEndpointLoggingPayload,
)
Span = _Span Span = _Span
UserAPIKeyAuth = _UserAPIKeyAuth UserAPIKeyAuth = _UserAPIKeyAuth
ManagementEndpointLoggingPayload = _ManagementEndpointLoggingPayload
else: else:
Span = Any Span = Any
UserAPIKeyAuth = Any UserAPIKeyAuth = Any
ManagementEndpointLoggingPayload = Any
LITELLM_TRACER_NAME = os.getenv("OTEL_TRACER_NAME", "litellm") LITELLM_TRACER_NAME = os.getenv("OTEL_TRACER_NAME", "litellm")
@ -562,3 +568,91 @@ class OpenTelemetry(CustomLogger):
self.OTEL_EXPORTER, self.OTEL_EXPORTER,
) )
return BatchSpanProcessor(ConsoleSpanExporter()) return BatchSpanProcessor(ConsoleSpanExporter())
async def async_management_endpoint_success_hook(
self,
logging_payload: ManagementEndpointLoggingPayload,
parent_otel_span: Optional[Span] = None,
):
from opentelemetry import trace
from datetime import datetime
from opentelemetry.trace import Status, StatusCode
_start_time_ns = logging_payload.start_time
_end_time_ns = logging_payload.end_time
start_time = logging_payload.start_time
end_time = logging_payload.end_time
if isinstance(start_time, float):
_start_time_ns = int(int(start_time) * 1e9)
else:
_start_time_ns = self._to_ns(start_time)
if isinstance(end_time, float):
_end_time_ns = int(int(end_time) * 1e9)
else:
_end_time_ns = self._to_ns(end_time)
if parent_otel_span is not None:
_span_name = logging_payload.route
management_endpoint_span = self.tracer.start_span(
name=_span_name,
context=trace.set_span_in_context(parent_otel_span),
start_time=_start_time_ns,
)
_request_data = logging_payload.request_data
if _request_data is not None:
for key, value in _request_data.items():
management_endpoint_span.set_attribute(f"request.{key}", value)
_response = logging_payload.response
if _response is not None:
for key, value in _response.items():
management_endpoint_span.set_attribute(f"response.{key}", value)
management_endpoint_span.set_status(Status(StatusCode.OK))
management_endpoint_span.end(end_time=_end_time_ns)
async def async_management_endpoint_failure_hook(
self,
logging_payload: ManagementEndpointLoggingPayload,
parent_otel_span: Optional[Span] = None,
):
from opentelemetry import trace
from datetime import datetime
from opentelemetry.trace import Status, StatusCode
_start_time_ns = logging_payload.start_time
_end_time_ns = logging_payload.end_time
start_time = logging_payload.start_time
end_time = logging_payload.end_time
if isinstance(start_time, float):
_start_time_ns = int(int(start_time) * 1e9)
else:
_start_time_ns = self._to_ns(start_time)
if isinstance(end_time, float):
_end_time_ns = int(int(end_time) * 1e9)
else:
_end_time_ns = self._to_ns(end_time)
if parent_otel_span is not None:
_span_name = logging_payload.route
management_endpoint_span = self.tracer.start_span(
name=_span_name,
context=trace.set_span_in_context(parent_otel_span),
start_time=_start_time_ns,
)
_request_data = logging_payload.request_data
if _request_data is not None:
for key, value in _request_data.items():
management_endpoint_span.set_attribute(f"request.{key}", value)
_exception = logging_payload.exception
management_endpoint_span.set_attribute(f"exception", str(_exception))
management_endpoint_span.set_status(Status(StatusCode.ERROR))
management_endpoint_span.end(end_time=_end_time_ns)

View file

@ -1561,3 +1561,12 @@ class SpanAttributes(str, enum.Enum):
LLM_OPENAI_API_BASE = "gen_ai.openai.api_base" LLM_OPENAI_API_BASE = "gen_ai.openai.api_base"
LLM_OPENAI_API_VERSION = "gen_ai.openai.api_version" LLM_OPENAI_API_VERSION = "gen_ai.openai.api_version"
LLM_OPENAI_API_TYPE = "gen_ai.openai.api_type" LLM_OPENAI_API_TYPE = "gen_ai.openai.api_type"
class ManagementEndpointLoggingPayload(LiteLLMBase):
route: str
request_data: dict
response: Optional[dict] = None
exception: Optional[Any] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None

View file

@ -0,0 +1,31 @@
from typing import Optional
from fastapi import Request
import ast, json
async def _read_request_body(request: Optional[Request]) -> dict:
"""
Asynchronous function to read the request body and parse it as JSON or literal data.
Parameters:
- request: The request object to read the body from
Returns:
- dict: Parsed request data as a dictionary
"""
try:
request_data: dict = {}
if request is None:
return request_data
body = await request.body()
if body == b"" or body is None:
return request_data
body_str = body.decode()
try:
request_data = ast.literal_eval(body_str)
except:
request_data = json.loads(body_str)
return request_data
except:
return {}

View file

@ -0,0 +1,90 @@
from datetime import datetime
from functools import wraps
from litellm.proxy._types import UserAPIKeyAuth, ManagementEndpointLoggingPayload
from litellm.proxy.common_utils.http_parsing_utils import _read_request_body
from fastapi import Request
def management_endpoint_wrapper(func):
"""
This wrapper does the following:
1. Log I/O, Exceptions to OTEL
2. Create an Audit log for success calls
"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = datetime.now()
try:
result = await func(*args, **kwargs)
end_time = datetime.now()
if kwargs is None:
kwargs = {}
user_api_key_dict: UserAPIKeyAuth = (
kwargs.get("user_api_key_dict") or UserAPIKeyAuth()
)
parent_otel_span = user_api_key_dict.parent_otel_span
if parent_otel_span is not None:
from litellm.proxy.proxy_server import open_telemetry_logger
if open_telemetry_logger is not None:
_http_request: Request = kwargs.get("http_request")
_route = _http_request.url.path
_request_body: dict = await _read_request_body(
request=_http_request
)
_response = dict(result) if result is not None else None
logging_payload = ManagementEndpointLoggingPayload(
route=_route,
request_data=_request_body,
response=_response,
start_time=start_time,
end_time=end_time,
)
await open_telemetry_logger.async_management_endpoint_success_hook(
logging_payload=logging_payload,
parent_otel_span=parent_otel_span,
)
return result
except Exception as e:
end_time = datetime.now()
if kwargs is None:
kwargs = {}
user_api_key_dict: UserAPIKeyAuth = (
kwargs.get("user_api_key_dict") or UserAPIKeyAuth()
)
parent_otel_span = user_api_key_dict.parent_otel_span
if parent_otel_span is not None:
from litellm.proxy.proxy_server import open_telemetry_logger
if open_telemetry_logger is not None:
_http_request: Request = kwargs.get("http_request")
_route = _http_request.url.path
_request_body: dict = await _read_request_body(
request=_http_request
)
logging_payload = ManagementEndpointLoggingPayload(
route=_route,
request_data=_request_body,
response=None,
start_time=start_time,
end_time=end_time,
exception=e,
)
await open_telemetry_logger.async_management_endpoint_failure_hook(
logging_payload=logging_payload,
parent_otel_span=parent_otel_span,
)
raise e
return wrapper

View file

@ -103,7 +103,6 @@ from litellm.proxy.utils import (
hash_token, hash_token,
html_form, html_form,
missing_keys_html_form, missing_keys_html_form,
_read_request_body,
_is_valid_team_configs, _is_valid_team_configs,
_is_user_proxy_admin, _is_user_proxy_admin,
_get_user_role, _get_user_role,
@ -115,6 +114,8 @@ from litellm.proxy.utils import (
_to_ns, _to_ns,
get_error_message_str, get_error_message_str,
) )
from litellm.proxy.common_utils.http_parsing_utils import _read_request_body
from litellm import ( from litellm import (
CreateBatchRequest, CreateBatchRequest,
RetrieveBatchRequest, RetrieveBatchRequest,
@ -163,6 +164,9 @@ from litellm.proxy.auth.auth_checks import (
get_actual_routes, get_actual_routes,
log_to_opentelemetry, log_to_opentelemetry,
) )
from litellm.proxy.common_utils.management_endpoint_utils import (
management_endpoint_wrapper,
)
from litellm.llms.custom_httpx.httpx_handler import HTTPHandler from litellm.llms.custom_httpx.httpx_handler import HTTPHandler
from litellm.exceptions import RejectedRequestError from litellm.exceptions import RejectedRequestError
from litellm.integrations.slack_alerting import SlackAlertingArgs, SlackAlerting from litellm.integrations.slack_alerting import SlackAlertingArgs, SlackAlerting
@ -8193,7 +8197,9 @@ async def _get_spend_report_for_time_range(
return response, spend_per_tag return response, spend_per_tag
except Exception as e: except Exception as e:
verbose_proxy_logger.error("Exception in _get_daily_spend_reports", e) # noqa verbose_proxy_logger.error(
"Exception in _get_daily_spend_reports {}".format(str(e))
) # noqa
@router.post( @router.post(
@ -8886,7 +8892,10 @@ async def new_user(data: NewUserRequest):
role="user", role="user",
user_email=data_json.get("user_email", None), user_email=data_json.get("user_email", None),
), ),
) ),
http_request=Request(
scope={"type": "http"},
),
) )
if data.send_invite_email is True: if data.send_invite_email is True:
@ -9919,8 +9928,10 @@ async def delete_end_user(
dependencies=[Depends(user_api_key_auth)], dependencies=[Depends(user_api_key_auth)],
response_model=LiteLLM_TeamTable, response_model=LiteLLM_TeamTable,
) )
@management_endpoint_wrapper
async def new_team( async def new_team(
data: NewTeamRequest, data: NewTeamRequest,
http_request: Request,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
litellm_changed_by: Optional[str] = Header( litellm_changed_by: Optional[str] = Header(
None, None,
@ -10154,6 +10165,7 @@ async def create_audit_log_for_update(request_data: LiteLLM_AuditLogs):
@router.post( @router.post(
"/team/update", tags=["team management"], dependencies=[Depends(user_api_key_auth)] "/team/update", tags=["team management"], dependencies=[Depends(user_api_key_auth)]
) )
@management_endpoint_wrapper
async def update_team( async def update_team(
data: UpdateTeamRequest, data: UpdateTeamRequest,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
@ -10259,8 +10271,10 @@ async def update_team(
tags=["team management"], tags=["team management"],
dependencies=[Depends(user_api_key_auth)], dependencies=[Depends(user_api_key_auth)],
) )
@management_endpoint_wrapper
async def team_member_add( async def team_member_add(
data: TeamMemberAddRequest, data: TeamMemberAddRequest,
http_request: Request,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
): ):
""" """
@ -10352,8 +10366,10 @@ async def team_member_add(
tags=["team management"], tags=["team management"],
dependencies=[Depends(user_api_key_auth)], dependencies=[Depends(user_api_key_auth)],
) )
@management_endpoint_wrapper
async def team_member_delete( async def team_member_delete(
data: TeamMemberDeleteRequest, data: TeamMemberDeleteRequest,
http_request: Request,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
): ):
""" """
@ -10457,8 +10473,10 @@ async def team_member_delete(
@router.post( @router.post(
"/team/delete", tags=["team management"], dependencies=[Depends(user_api_key_auth)] "/team/delete", tags=["team management"], dependencies=[Depends(user_api_key_auth)]
) )
@management_endpoint_wrapper
async def delete_team( async def delete_team(
data: DeleteTeamRequest, data: DeleteTeamRequest,
http_request: Request,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
litellm_changed_by: Optional[str] = Header( litellm_changed_by: Optional[str] = Header(
None, None,
@ -10542,10 +10560,12 @@ async def delete_team(
@router.get( @router.get(
"/team/info", tags=["team management"], dependencies=[Depends(user_api_key_auth)] "/team/info", tags=["team management"], dependencies=[Depends(user_api_key_auth)]
) )
@management_endpoint_wrapper
async def team_info( async def team_info(
http_request: Request,
team_id: str = fastapi.Query( team_id: str = fastapi.Query(
default=None, description="Team ID in the request parameters" default=None, description="Team ID in the request parameters"
) ),
): ):
""" """
get info on team + related keys get info on team + related keys
@ -10629,8 +10649,10 @@ async def team_info(
@router.post( @router.post(
"/team/block", tags=["team management"], dependencies=[Depends(user_api_key_auth)] "/team/block", tags=["team management"], dependencies=[Depends(user_api_key_auth)]
) )
@management_endpoint_wrapper
async def block_team( async def block_team(
data: BlockTeamRequest, data: BlockTeamRequest,
http_request: Request,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
): ):
""" """
@ -10651,8 +10673,10 @@ async def block_team(
@router.post( @router.post(
"/team/unblock", tags=["team management"], dependencies=[Depends(user_api_key_auth)] "/team/unblock", tags=["team management"], dependencies=[Depends(user_api_key_auth)]
) )
@management_endpoint_wrapper
async def unblock_team( async def unblock_team(
data: BlockTeamRequest, data: BlockTeamRequest,
http_request: Request,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
): ):
""" """
@ -10673,7 +10697,9 @@ async def unblock_team(
@router.get( @router.get(
"/team/list", tags=["team management"], dependencies=[Depends(user_api_key_auth)] "/team/list", tags=["team management"], dependencies=[Depends(user_api_key_auth)]
) )
@management_endpoint_wrapper
async def list_team( async def list_team(
http_request: Request,
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
): ):
""" """

View file

@ -48,6 +48,7 @@ from datetime import datetime, timedelta
from litellm.integrations.slack_alerting import SlackAlerting from litellm.integrations.slack_alerting import SlackAlerting
from typing_extensions import overload from typing_extensions import overload
from functools import wraps from functools import wraps
from fastapi import Request
if TYPE_CHECKING: if TYPE_CHECKING:
from opentelemetry.trace import Span as _Span from opentelemetry.trace import Span as _Span
@ -2595,36 +2596,6 @@ async def update_spend(
raise e raise e
async def _read_request_body(request):
"""
Asynchronous function to read the request body and parse it as JSON or literal data.
Parameters:
- request: The request object to read the body from
Returns:
- dict: Parsed request data as a dictionary
"""
import ast, json
try:
request_data = {}
if request is None:
return request_data
body = await request.body()
if body == b"" or body is None:
return request_data
body_str = body.decode()
try:
request_data = ast.literal_eval(body_str)
except:
request_data = json.loads(body_str)
return request_data
except:
return {}
def _is_projected_spend_over_limit( def _is_projected_spend_over_limit(
current_spend: float, soft_budget_limit: Optional[float] current_spend: float, soft_budget_limit: Optional[float]
): ):

View file

@ -137,6 +137,7 @@ async def test_new_user_response(prisma_client):
NewTeamRequest( NewTeamRequest(
team_id=_team_id, team_id=_team_id,
), ),
http_request=Request(scope={"type": "http"}),
user_api_key_dict=UserAPIKeyAuth( user_api_key_dict=UserAPIKeyAuth(
user_role=LitellmUserRoles.PROXY_ADMIN, user_role=LitellmUserRoles.PROXY_ADMIN,
api_key="sk-1234", api_key="sk-1234",
@ -368,6 +369,7 @@ async def test_call_with_valid_model_using_all_models(prisma_client):
new_team_response = await new_team( new_team_response = await new_team(
data=team_request, data=team_request,
user_api_key_dict=UserAPIKeyAuth(user_role=LitellmUserRoles.PROXY_ADMIN), user_api_key_dict=UserAPIKeyAuth(user_role=LitellmUserRoles.PROXY_ADMIN),
http_request=Request(scope={"type": "http"}),
) )
print("new_team_response", new_team_response) print("new_team_response", new_team_response)
created_team_id = new_team_response["team_id"] created_team_id = new_team_response["team_id"]
@ -1086,6 +1088,7 @@ def test_generate_and_update_key(prisma_client):
api_key="sk-1234", api_key="sk-1234",
user_id="1234", user_id="1234",
), ),
http_request=Request(scope={"type": "http"}),
) )
_team_2 = "ishaan-special-team_{}".format(uuid.uuid4()) _team_2 = "ishaan-special-team_{}".format(uuid.uuid4())
@ -1098,6 +1101,7 @@ def test_generate_and_update_key(prisma_client):
api_key="sk-1234", api_key="sk-1234",
user_id="1234", user_id="1234",
), ),
http_request=Request(scope={"type": "http"}),
) )
request = NewUserRequest( request = NewUserRequest(
@ -2050,6 +2054,7 @@ async def test_master_key_hashing(prisma_client):
api_key="sk-1234", api_key="sk-1234",
user_id="1234", user_id="1234",
), ),
http_request=Request(scope={"type": "http"}),
) )
_response = await new_user( _response = await new_user(
@ -2183,6 +2188,7 @@ async def test_create_update_team(prisma_client):
tpm_limit=20, tpm_limit=20,
rpm_limit=20, rpm_limit=20,
), ),
http_request=Request(scope={"type": "http"}),
user_api_key_dict=UserAPIKeyAuth( user_api_key_dict=UserAPIKeyAuth(
user_role=LitellmUserRoles.PROXY_ADMIN, user_role=LitellmUserRoles.PROXY_ADMIN,
api_key="sk-1234", api_key="sk-1234",
@ -2232,7 +2238,10 @@ async def test_create_update_team(prisma_client):
) )
# now hit team_info # now hit team_info
response = await team_info(team_id=_team_id) response = await team_info(
team_id=_team_id,
http_request=Request(scope={"type": "http"}),
)
print("RESPONSE from team_info", response) print("RESPONSE from team_info", response)