mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 11:14:04 +00:00
[Feat] Improve OTEL Tracking - Require all Redis Cache reads to be logged on OTEL (#5881)
* fix use previous internal usage caching logic * fix test_dual_cache_uses_redis * redis track event_metadata in service logging * show otel error on _get_parent_otel_span_from_kwargs * track parent otel span on internal usage cache * update_request_status * fix internal usage cache * fix linting * fix test internal usage cache * fix linting error * show event metadata in redis set * fix test_get_team_redis * fix test_get_team_redis * test_proxy_logging_setup
This commit is contained in:
parent
2f67026f35
commit
4d253e473a
9 changed files with 243 additions and 79 deletions
|
@ -1,7 +1,7 @@
|
|||
import sys
|
||||
import traceback
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Literal, Optional
|
||||
from typing import TYPE_CHECKING, Any, Literal, Optional, Union
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
|
@ -10,17 +10,28 @@ from litellm import ModelResponse
|
|||
from litellm._logging import verbose_proxy_logger
|
||||
from litellm.caching import DualCache
|
||||
from litellm.integrations.custom_logger import CustomLogger
|
||||
from litellm.litellm_core_utils.core_helpers import _get_parent_otel_span_from_kwargs
|
||||
from litellm.proxy._types import UserAPIKeyAuth
|
||||
from litellm.proxy.auth.auth_utils import (
|
||||
get_key_model_rpm_limit,
|
||||
get_key_model_tpm_limit,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from opentelemetry.trace import Span as _Span
|
||||
|
||||
from litellm.proxy.utils import InternalUsageCache as _InternalUsageCache
|
||||
|
||||
Span = _Span
|
||||
InternalUsageCache = _InternalUsageCache
|
||||
else:
|
||||
Span = Any
|
||||
InternalUsageCache = Any
|
||||
|
||||
|
||||
class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
||||
|
||||
# Class variables or attributes
|
||||
def __init__(self, internal_usage_cache: DualCache):
|
||||
def __init__(self, internal_usage_cache: InternalUsageCache):
|
||||
self.internal_usage_cache = internal_usage_cache
|
||||
|
||||
def print_verbose(self, print_statement):
|
||||
|
@ -44,7 +55,8 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
rate_limit_type: Literal["user", "customer", "team"],
|
||||
):
|
||||
current = await self.internal_usage_cache.async_get_cache(
|
||||
key=request_count_api_key
|
||||
key=request_count_api_key,
|
||||
litellm_parent_otel_span=user_api_key_dict.parent_otel_span,
|
||||
) # {"current_requests": 1, "current_tpm": 1, "current_rpm": 10}
|
||||
if current is None:
|
||||
if max_parallel_requests == 0 or tpm_limit == 0 or rpm_limit == 0:
|
||||
|
@ -58,7 +70,9 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
"current_rpm": 0,
|
||||
}
|
||||
await self.internal_usage_cache.async_set_cache(
|
||||
request_count_api_key, new_val
|
||||
key=request_count_api_key,
|
||||
value=new_val,
|
||||
litellm_parent_otel_span=user_api_key_dict.parent_otel_span,
|
||||
)
|
||||
elif (
|
||||
int(current["current_requests"]) < max_parallel_requests
|
||||
|
@ -72,7 +86,9 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
"current_rpm": current["current_rpm"],
|
||||
}
|
||||
await self.internal_usage_cache.async_set_cache(
|
||||
request_count_api_key, new_val
|
||||
key=request_count_api_key,
|
||||
value=new_val,
|
||||
litellm_parent_otel_span=user_api_key_dict.parent_otel_span,
|
||||
)
|
||||
else:
|
||||
raise HTTPException(
|
||||
|
@ -135,12 +151,14 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
# ------------
|
||||
# Setup values
|
||||
# ------------
|
||||
|
||||
new_val: Optional[dict] = None
|
||||
if global_max_parallel_requests is not None:
|
||||
# get value from cache
|
||||
_key = "global_max_parallel_requests"
|
||||
current_global_requests = await self.internal_usage_cache.async_get_cache(
|
||||
key=_key, local_only=True
|
||||
key=_key,
|
||||
local_only=True,
|
||||
litellm_parent_otel_span=user_api_key_dict.parent_otel_span,
|
||||
)
|
||||
# check if below limit
|
||||
if current_global_requests is None:
|
||||
|
@ -153,7 +171,10 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
# if below -> increment
|
||||
else:
|
||||
await self.internal_usage_cache.async_increment_cache(
|
||||
key=_key, value=1, local_only=True
|
||||
key=_key,
|
||||
value=1,
|
||||
local_only=True,
|
||||
litellm_parent_otel_span=user_api_key_dict.parent_otel_span,
|
||||
)
|
||||
|
||||
current_date = datetime.now().strftime("%Y-%m-%d")
|
||||
|
@ -167,7 +188,8 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
# CHECK IF REQUEST ALLOWED for key
|
||||
|
||||
current = await self.internal_usage_cache.async_get_cache(
|
||||
key=request_count_api_key
|
||||
key=request_count_api_key,
|
||||
litellm_parent_otel_span=user_api_key_dict.parent_otel_span,
|
||||
) # {"current_requests": 1, "current_tpm": 1, "current_rpm": 10}
|
||||
self.print_verbose(f"current: {current}")
|
||||
if (
|
||||
|
@ -187,7 +209,9 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
"current_rpm": 0,
|
||||
}
|
||||
await self.internal_usage_cache.async_set_cache(
|
||||
request_count_api_key, new_val
|
||||
key=request_count_api_key,
|
||||
value=new_val,
|
||||
litellm_parent_otel_span=user_api_key_dict.parent_otel_span,
|
||||
)
|
||||
elif (
|
||||
int(current["current_requests"]) < max_parallel_requests
|
||||
|
@ -201,7 +225,9 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
"current_rpm": current["current_rpm"],
|
||||
}
|
||||
await self.internal_usage_cache.async_set_cache(
|
||||
request_count_api_key, new_val
|
||||
key=request_count_api_key,
|
||||
value=new_val,
|
||||
litellm_parent_otel_span=user_api_key_dict.parent_otel_span,
|
||||
)
|
||||
else:
|
||||
return self.raise_rate_limit_error(
|
||||
|
@ -219,7 +245,8 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
)
|
||||
|
||||
current = await self.internal_usage_cache.async_get_cache(
|
||||
key=request_count_api_key
|
||||
key=request_count_api_key,
|
||||
litellm_parent_otel_span=user_api_key_dict.parent_otel_span,
|
||||
) # {"current_requests": 1, "current_tpm": 1, "current_rpm": 10}
|
||||
|
||||
tpm_limit_for_model = None
|
||||
|
@ -242,7 +269,9 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
"current_rpm": 0,
|
||||
}
|
||||
await self.internal_usage_cache.async_set_cache(
|
||||
request_count_api_key, new_val
|
||||
key=request_count_api_key,
|
||||
value=new_val,
|
||||
litellm_parent_otel_span=user_api_key_dict.parent_otel_span,
|
||||
)
|
||||
elif tpm_limit_for_model is not None or rpm_limit_for_model is not None:
|
||||
# Increase count for this token
|
||||
|
@ -267,16 +296,19 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
)
|
||||
else:
|
||||
await self.internal_usage_cache.async_set_cache(
|
||||
request_count_api_key, new_val
|
||||
key=request_count_api_key,
|
||||
value=new_val,
|
||||
litellm_parent_otel_span=user_api_key_dict.parent_otel_span,
|
||||
)
|
||||
|
||||
_remaining_tokens = None
|
||||
_remaining_requests = None
|
||||
# Add remaining tokens, requests to metadata
|
||||
if tpm_limit_for_model is not None:
|
||||
_remaining_tokens = tpm_limit_for_model - new_val["current_tpm"]
|
||||
if rpm_limit_for_model is not None:
|
||||
_remaining_requests = rpm_limit_for_model - new_val["current_rpm"]
|
||||
if new_val:
|
||||
if tpm_limit_for_model is not None:
|
||||
_remaining_tokens = tpm_limit_for_model - new_val["current_tpm"]
|
||||
if rpm_limit_for_model is not None:
|
||||
_remaining_requests = rpm_limit_for_model - new_val["current_rpm"]
|
||||
|
||||
_remaining_limits_data = {
|
||||
f"litellm-key-remaining-tokens-{_model}": _remaining_tokens,
|
||||
|
@ -291,7 +323,8 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
user_id = user_api_key_dict.user_id
|
||||
if user_id is not None:
|
||||
_user_id_rate_limits = await self.internal_usage_cache.async_get_cache(
|
||||
key=user_id
|
||||
key=user_id,
|
||||
litellm_parent_otel_span=user_api_key_dict.parent_otel_span,
|
||||
)
|
||||
# get user tpm/rpm limits
|
||||
if _user_id_rate_limits is not None and isinstance(
|
||||
|
@ -388,6 +421,9 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
get_model_group_from_litellm_kwargs,
|
||||
)
|
||||
|
||||
litellm_parent_otel_span: Union[Span, None] = _get_parent_otel_span_from_kwargs(
|
||||
kwargs=kwargs
|
||||
)
|
||||
try:
|
||||
self.print_verbose("INSIDE parallel request limiter ASYNC SUCCESS LOGGING")
|
||||
global_max_parallel_requests = kwargs["litellm_params"]["metadata"].get(
|
||||
|
@ -416,7 +452,10 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
_key = "global_max_parallel_requests"
|
||||
# decrement
|
||||
await self.internal_usage_cache.async_increment_cache(
|
||||
key=_key, value=-1, local_only=True
|
||||
key=_key,
|
||||
value=-1,
|
||||
local_only=True,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
)
|
||||
|
||||
current_date = datetime.now().strftime("%Y-%m-%d")
|
||||
|
@ -427,7 +466,7 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
total_tokens = 0
|
||||
|
||||
if isinstance(response_obj, ModelResponse):
|
||||
total_tokens = response_obj.usage.total_tokens
|
||||
total_tokens = response_obj.usage.total_tokens # type: ignore
|
||||
|
||||
# ------------
|
||||
# Update usage - API Key
|
||||
|
@ -439,7 +478,8 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
)
|
||||
|
||||
current = await self.internal_usage_cache.async_get_cache(
|
||||
key=request_count_api_key
|
||||
key=request_count_api_key,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
) or {
|
||||
"current_requests": 1,
|
||||
"current_tpm": total_tokens,
|
||||
|
@ -456,7 +496,10 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
f"updated_value in success call: {new_val}, precise_minute: {precise_minute}"
|
||||
)
|
||||
await self.internal_usage_cache.async_set_cache(
|
||||
request_count_api_key, new_val, ttl=60
|
||||
request_count_api_key,
|
||||
new_val,
|
||||
ttl=60,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
) # store in cache for 1 min.
|
||||
|
||||
# ------------
|
||||
|
@ -476,7 +519,8 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
)
|
||||
|
||||
current = await self.internal_usage_cache.async_get_cache(
|
||||
key=request_count_api_key
|
||||
key=request_count_api_key,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
) or {
|
||||
"current_requests": 1,
|
||||
"current_tpm": total_tokens,
|
||||
|
@ -493,7 +537,10 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
f"updated_value in success call: {new_val}, precise_minute: {precise_minute}"
|
||||
)
|
||||
await self.internal_usage_cache.async_set_cache(
|
||||
request_count_api_key, new_val, ttl=60
|
||||
request_count_api_key,
|
||||
new_val,
|
||||
ttl=60,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
)
|
||||
|
||||
# ------------
|
||||
|
@ -503,14 +550,15 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
total_tokens = 0
|
||||
|
||||
if isinstance(response_obj, ModelResponse):
|
||||
total_tokens = response_obj.usage.total_tokens
|
||||
total_tokens = response_obj.usage.total_tokens # type: ignore
|
||||
|
||||
request_count_api_key = (
|
||||
f"{user_api_key_user_id}::{precise_minute}::request_count"
|
||||
)
|
||||
|
||||
current = await self.internal_usage_cache.async_get_cache(
|
||||
key=request_count_api_key
|
||||
key=request_count_api_key,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
) or {
|
||||
"current_requests": 1,
|
||||
"current_tpm": total_tokens,
|
||||
|
@ -527,7 +575,10 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
f"updated_value in success call: {new_val}, precise_minute: {precise_minute}"
|
||||
)
|
||||
await self.internal_usage_cache.async_set_cache(
|
||||
request_count_api_key, new_val, ttl=60
|
||||
request_count_api_key,
|
||||
new_val,
|
||||
ttl=60,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
) # store in cache for 1 min.
|
||||
|
||||
# ------------
|
||||
|
@ -537,14 +588,15 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
total_tokens = 0
|
||||
|
||||
if isinstance(response_obj, ModelResponse):
|
||||
total_tokens = response_obj.usage.total_tokens
|
||||
total_tokens = response_obj.usage.total_tokens # type: ignore
|
||||
|
||||
request_count_api_key = (
|
||||
f"{user_api_key_team_id}::{precise_minute}::request_count"
|
||||
)
|
||||
|
||||
current = await self.internal_usage_cache.async_get_cache(
|
||||
key=request_count_api_key
|
||||
key=request_count_api_key,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
) or {
|
||||
"current_requests": 1,
|
||||
"current_tpm": total_tokens,
|
||||
|
@ -561,7 +613,10 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
f"updated_value in success call: {new_val}, precise_minute: {precise_minute}"
|
||||
)
|
||||
await self.internal_usage_cache.async_set_cache(
|
||||
request_count_api_key, new_val, ttl=60
|
||||
request_count_api_key,
|
||||
new_val,
|
||||
ttl=60,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
) # store in cache for 1 min.
|
||||
|
||||
# ------------
|
||||
|
@ -571,14 +626,15 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
total_tokens = 0
|
||||
|
||||
if isinstance(response_obj, ModelResponse):
|
||||
total_tokens = response_obj.usage.total_tokens
|
||||
total_tokens = response_obj.usage.total_tokens # type: ignore
|
||||
|
||||
request_count_api_key = (
|
||||
f"{user_api_key_end_user_id}::{precise_minute}::request_count"
|
||||
)
|
||||
|
||||
current = await self.internal_usage_cache.async_get_cache(
|
||||
key=request_count_api_key
|
||||
key=request_count_api_key,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
) or {
|
||||
"current_requests": 1,
|
||||
"current_tpm": total_tokens,
|
||||
|
@ -595,7 +651,10 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
f"updated_value in success call: {new_val}, precise_minute: {precise_minute}"
|
||||
)
|
||||
await self.internal_usage_cache.async_set_cache(
|
||||
request_count_api_key, new_val, ttl=60
|
||||
request_count_api_key,
|
||||
new_val,
|
||||
ttl=60,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
) # store in cache for 1 min.
|
||||
|
||||
except Exception as e:
|
||||
|
@ -604,6 +663,9 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
|
||||
try:
|
||||
self.print_verbose("Inside Max Parallel Request Failure Hook")
|
||||
litellm_parent_otel_span: Union[Span, None] = (
|
||||
_get_parent_otel_span_from_kwargs(kwargs=kwargs)
|
||||
)
|
||||
_metadata = kwargs["litellm_params"].get("metadata", {}) or {}
|
||||
global_max_parallel_requests = _metadata.get(
|
||||
"global_max_parallel_requests", None
|
||||
|
@ -626,12 +688,17 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
_key = "global_max_parallel_requests"
|
||||
current_global_requests = (
|
||||
await self.internal_usage_cache.async_get_cache(
|
||||
key=_key, local_only=True
|
||||
key=_key,
|
||||
local_only=True,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
)
|
||||
)
|
||||
# decrement
|
||||
await self.internal_usage_cache.async_increment_cache(
|
||||
key=_key, value=-1, local_only=True
|
||||
key=_key,
|
||||
value=-1,
|
||||
local_only=True,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
)
|
||||
|
||||
current_date = datetime.now().strftime("%Y-%m-%d")
|
||||
|
@ -647,7 +714,8 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
# Update usage
|
||||
# ------------
|
||||
current = await self.internal_usage_cache.async_get_cache(
|
||||
key=request_count_api_key
|
||||
key=request_count_api_key,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
) or {
|
||||
"current_requests": 1,
|
||||
"current_tpm": 0,
|
||||
|
@ -662,7 +730,10 @@ class _PROXY_MaxParallelRequestsHandler(CustomLogger):
|
|||
|
||||
self.print_verbose(f"updated_value in failure call: {new_val}")
|
||||
await self.internal_usage_cache.async_set_cache(
|
||||
request_count_api_key, new_val, ttl=60
|
||||
request_count_api_key,
|
||||
new_val,
|
||||
ttl=60,
|
||||
litellm_parent_otel_span=litellm_parent_otel_span,
|
||||
) # save in cache for up to 1 min.
|
||||
except Exception as e:
|
||||
verbose_proxy_logger.exception(
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue