mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-25 10:44:24 +00:00
* build(pyproject.toml): add new dev dependencies - for type checking * build: reformat files to fit black * ci: reformat to fit black * ci(test-litellm.yml): make tests run clear * build(pyproject.toml): add ruff * fix: fix ruff checks * build(mypy/): fix mypy linting errors * fix(hashicorp_secret_manager.py): fix passing cert for tls auth * build(mypy/): resolve all mypy errors * test: update test * fix: fix black formatting * build(pre-commit-config.yaml): use poetry run black * fix(proxy_server.py): fix linting error * fix: fix ruff safe representation error
142 lines
4.7 KiB
Python
142 lines
4.7 KiB
Python
"""
|
|
Handles logging DB success/failure to ServiceLogger()
|
|
|
|
ServiceLogger() then sends DB logs to Prometheus, OTEL, Datadog etc
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
from functools import wraps
|
|
from typing import Callable, Dict, Tuple
|
|
|
|
from litellm._service_logger import ServiceTypes
|
|
from litellm.litellm_core_utils.core_helpers import (
|
|
_get_parent_otel_span_from_kwargs,
|
|
get_litellm_metadata_from_kwargs,
|
|
)
|
|
|
|
|
|
def log_db_metrics(func):
|
|
"""
|
|
Decorator to log the duration of a DB related function to ServiceLogger()
|
|
|
|
Handles logging DB success/failure to ServiceLogger(), which logs to Prometheus, OTEL, Datadog
|
|
|
|
When logging Failure it checks if the Exception is a PrismaError, httpx.ConnectError or httpx.TimeoutException and then logs that as a DB Service Failure
|
|
|
|
Args:
|
|
func: The function to be decorated
|
|
|
|
Returns:
|
|
Result from the decorated function
|
|
|
|
Raises:
|
|
Exception: If the decorated function raises an exception
|
|
"""
|
|
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
start_time: datetime = datetime.now()
|
|
|
|
try:
|
|
result = await func(*args, **kwargs)
|
|
end_time: datetime = datetime.now()
|
|
from litellm.proxy.proxy_server import proxy_logging_obj
|
|
|
|
if "PROXY" not in func.__name__:
|
|
asyncio.create_task(
|
|
proxy_logging_obj.service_logging_obj.async_service_success_hook(
|
|
service=ServiceTypes.DB,
|
|
call_type=func.__name__,
|
|
parent_otel_span=kwargs.get("parent_otel_span", None),
|
|
duration=(end_time - start_time).total_seconds(),
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
event_metadata={
|
|
"function_name": func.__name__,
|
|
"function_kwargs": kwargs,
|
|
"function_args": args,
|
|
},
|
|
)
|
|
)
|
|
elif (
|
|
# in litellm custom callbacks kwargs is passed as arg[0]
|
|
# https://docs.litellm.ai/docs/observability/custom_callback#callback-functions
|
|
args is not None
|
|
and len(args) > 1
|
|
and isinstance(args[1], dict)
|
|
):
|
|
passed_kwargs = args[1]
|
|
parent_otel_span = _get_parent_otel_span_from_kwargs(
|
|
kwargs=passed_kwargs
|
|
)
|
|
if parent_otel_span is not None:
|
|
metadata = get_litellm_metadata_from_kwargs(kwargs=passed_kwargs)
|
|
|
|
asyncio.create_task(
|
|
proxy_logging_obj.service_logging_obj.async_service_success_hook(
|
|
service=ServiceTypes.BATCH_WRITE_TO_DB,
|
|
call_type=func.__name__,
|
|
parent_otel_span=parent_otel_span,
|
|
duration=0.0,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
event_metadata=metadata,
|
|
)
|
|
)
|
|
# end of logging to otel
|
|
return result
|
|
except Exception as e:
|
|
end_time: datetime = datetime.now()
|
|
await _handle_logging_db_exception(
|
|
e=e,
|
|
func=func,
|
|
kwargs=kwargs,
|
|
args=args,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
)
|
|
raise e
|
|
|
|
return wrapper
|
|
|
|
|
|
def _is_exception_related_to_db(e: Exception) -> bool:
|
|
"""
|
|
Returns True if the exception is related to the DB
|
|
"""
|
|
|
|
import httpx
|
|
from prisma.errors import PrismaError
|
|
|
|
return isinstance(e, (PrismaError, httpx.ConnectError, httpx.TimeoutException))
|
|
|
|
|
|
async def _handle_logging_db_exception(
|
|
e: Exception,
|
|
func: Callable,
|
|
kwargs: Dict,
|
|
args: Tuple,
|
|
start_time: datetime,
|
|
end_time: datetime,
|
|
) -> None:
|
|
from litellm.proxy.proxy_server import proxy_logging_obj
|
|
|
|
# don't log this as a DB Service Failure, if the DB did not raise an exception
|
|
if _is_exception_related_to_db(e) is not True:
|
|
return
|
|
|
|
await proxy_logging_obj.service_logging_obj.async_service_failure_hook(
|
|
error=e,
|
|
service=ServiceTypes.DB,
|
|
call_type=func.__name__,
|
|
parent_otel_span=kwargs.get("parent_otel_span"),
|
|
duration=(end_time - start_time).total_seconds(),
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
event_metadata={
|
|
"function_name": func.__name__,
|
|
"function_kwargs": kwargs,
|
|
"function_args": args,
|
|
},
|
|
)
|