Merge branch 'main' into litellm_track_cache_read_tokens_usage_metrics

This commit is contained in:
Ishaan Jaff 2025-04-15 21:15:02 -07:00
commit 7bd250d929
25 changed files with 484 additions and 141 deletions

View file

@ -334,7 +334,6 @@ router_settings:
| AZURE_STORAGE_TENANT_ID | The Application Tenant ID to use for Authentication to Azure Blob Storage logging
| AZURE_STORAGE_CLIENT_ID | The Application Client ID to use for Authentication to Azure Blob Storage logging
| AZURE_STORAGE_CLIENT_SECRET | The Application Client Secret to use for Authentication to Azure Blob Storage logging
| BERRISPEND_ACCOUNT_ID | Account ID for BerriSpend service
| BRAINTRUST_API_KEY | API key for Braintrust integration
| CIRCLE_OIDC_TOKEN | OpenID Connect token for CircleCI

View file

@ -862,7 +862,7 @@ Add the following to your env
```shell
OTEL_EXPORTER="otlp_http"
OTEL_ENDPOINT="http:/0.0.0.0:4317"
OTEL_ENDPOINT="http://0.0.0.0:4317"
OTEL_HEADERS="x-honeycomb-team=<your-api-key>" # Optional
```
@ -2501,4 +2501,4 @@ litellm_settings:
:::info
`thresholds` are not required by default, but you can tune the values to your needs.
Default values is `4` for all categories
::: -->
::: -->

View file

@ -38,7 +38,7 @@ hide_table_of_contents: false
2. OpenAI Moderations - `omni-moderation-latest` support. [Start Here](https://docs.litellm.ai/docs/moderation)
3. Azure O1 - fake streaming support. This ensures if a `stream=true` is passed, the response is streamed. [Start Here](https://docs.litellm.ai/docs/providers/azure)
4. Anthropic - non-whitespace char stop sequence handling - [PR](https://github.com/BerriAI/litellm/pull/7484)
5. Azure OpenAI - support Entra id username + password based auth. [Start Here](https://docs.litellm.ai/docs/providers/azure#entrata-id---use-tenant_id-client_id-client_secret)
5. Azure OpenAI - support Entra ID username + password based auth. [Start Here](https://docs.litellm.ai/docs/providers/azure#entra-id---use-tenant_id-client_id-client_secret)
6. LM Studio - embedding route support. [Start Here](https://docs.litellm.ai/docs/providers/lm-studio)
7. WatsonX - ZenAPIKeyAuth support. [Start Here](https://docs.litellm.ai/docs/providers/watsonx)

Binary file not shown.

View file

@ -0,0 +1,36 @@
-- CreateTable
CREATE TABLE "LiteLLM_DailyTeamSpend" (
"id" TEXT NOT NULL,
"team_id" TEXT NOT NULL,
"date" TEXT NOT NULL,
"api_key" TEXT NOT NULL,
"model" TEXT NOT NULL,
"model_group" TEXT,
"custom_llm_provider" TEXT,
"prompt_tokens" INTEGER NOT NULL DEFAULT 0,
"completion_tokens" INTEGER NOT NULL DEFAULT 0,
"spend" DOUBLE PRECISION NOT NULL DEFAULT 0.0,
"api_requests" INTEGER NOT NULL DEFAULT 0,
"successful_requests" INTEGER NOT NULL DEFAULT 0,
"failed_requests" INTEGER NOT NULL DEFAULT 0,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "LiteLLM_DailyTeamSpend_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "LiteLLM_DailyTeamSpend_date_idx" ON "LiteLLM_DailyTeamSpend"("date");
-- CreateIndex
CREATE INDEX "LiteLLM_DailyTeamSpend_team_id_idx" ON "LiteLLM_DailyTeamSpend"("team_id");
-- CreateIndex
CREATE INDEX "LiteLLM_DailyTeamSpend_api_key_idx" ON "LiteLLM_DailyTeamSpend"("api_key");
-- CreateIndex
CREATE INDEX "LiteLLM_DailyTeamSpend_model_idx" ON "LiteLLM_DailyTeamSpend"("model");
-- CreateIndex
CREATE UNIQUE INDEX "LiteLLM_DailyTeamSpend_team_id_date_api_key_model_custom_ll_key" ON "LiteLLM_DailyTeamSpend"("team_id", "date", "api_key", "model", "custom_llm_provider");

View file

@ -1,6 +1,6 @@
[tool.poetry]
name = "litellm-proxy-extras"
version = "0.1.7"
version = "0.1.8"
description = "Additional files for the LiteLLM Proxy. Reduces the size of the main litellm package."
authors = ["BerriAI"]
readme = "README.md"
@ -22,7 +22,7 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.commitizen]
version = "0.1.7"
version = "0.1.8"
version_files = [
"pyproject.toml:version",
"../requirements.txt:litellm-proxy-extras==",

View file

@ -24,6 +24,7 @@ SINGLE_DEPLOYMENT_TRAFFIC_FAILURE_THRESHOLD = 1000 # Minimum number of requests
########### v2 Architecture constants for managing writing updates to the database ###########
REDIS_UPDATE_BUFFER_KEY = "litellm_spend_update_buffer"
REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_spend_update_buffer"
REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_team_spend_update_buffer"
MAX_REDIS_BUFFER_DEQUEUE_COUNT = 100
MAX_SIZE_IN_MEMORY_QUEUE = 10000
MAX_IN_MEMORY_QUEUE_FLUSH_COUNT = 1000

View file

@ -8,7 +8,7 @@ from typing import List, Optional
from litellm._logging import verbose_logger
from litellm.constants import AZURE_STORAGE_MSFT_VERSION
from litellm.integrations.custom_batch_logger import CustomBatchLogger
from litellm.llms.azure.common_utils import get_azure_ad_token_from_entrata_id
from litellm.llms.azure.common_utils import get_azure_ad_token_from_entra_id
from litellm.llms.custom_httpx.http_handler import (
AsyncHTTPHandler,
get_async_httpx_client,
@ -291,7 +291,7 @@ class AzureBlobStorageLogger(CustomBatchLogger):
"Missing required environment variable: AZURE_STORAGE_CLIENT_SECRET"
)
token_provider = get_azure_ad_token_from_entrata_id(
token_provider = get_azure_ad_token_from_entra_id(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret,

View file

@ -61,7 +61,7 @@ def process_azure_headers(headers: Union[httpx.Headers, dict]) -> dict:
return {**llm_response_headers, **openai_headers}
def get_azure_ad_token_from_entrata_id(
def get_azure_ad_token_from_entra_id(
tenant_id: str,
client_id: str,
client_secret: str,
@ -81,7 +81,7 @@ def get_azure_ad_token_from_entrata_id(
"""
from azure.identity import ClientSecretCredential, get_bearer_token_provider
verbose_logger.debug("Getting Azure AD Token from Entrata ID")
verbose_logger.debug("Getting Azure AD Token from Entra ID")
if tenant_id.startswith("os.environ/"):
_tenant_id = get_secret_str(tenant_id)
@ -324,9 +324,9 @@ class BaseAzureLLM(BaseOpenAILLM):
timeout = litellm_params.get("timeout")
if not api_key and tenant_id and client_id and client_secret:
verbose_logger.debug(
"Using Azure AD Token Provider from Entrata ID for Azure Auth"
"Using Azure AD Token Provider from Entra ID for Azure Auth"
)
azure_ad_token_provider = get_azure_ad_token_from_entrata_id(
azure_ad_token_provider = get_azure_ad_token_from_entra_id(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret,

View file

@ -2769,8 +2769,7 @@ class DefaultInternalUserParams(LiteLLMPydanticObjectBase):
)
class DailyUserSpendTransaction(TypedDict):
user_id: str
class BaseDailySpendTransaction(TypedDict):
date: str
api_key: str
model: str
@ -2790,6 +2789,14 @@ class DailyUserSpendTransaction(TypedDict):
failed_requests: int
class DailyTeamSpendTransaction(BaseDailySpendTransaction):
team_id: str
class DailyUserSpendTransaction(BaseDailySpendTransaction):
user_id: str
class DBSpendUpdateTransactions(TypedDict):
"""
Internal Data Structure for buffering spend updates in Redis or in memory before committing them to the database

View file

@ -11,7 +11,7 @@ import os
import time
import traceback
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union, cast
import litellm
from litellm._logging import verbose_proxy_logger
@ -19,6 +19,8 @@ from litellm.caching import DualCache, RedisCache
from litellm.constants import DB_SPEND_UPDATE_JOB_NAME
from litellm.proxy._types import (
DB_CONNECTION_ERROR_TYPES,
BaseDailySpendTransaction,
DailyTeamSpendTransaction,
DailyUserSpendTransaction,
DBSpendUpdateTransactions,
Litellm_EntityType,
@ -58,6 +60,7 @@ class DBSpendUpdateWriter:
self.pod_lock_manager = PodLockManager()
self.spend_update_queue = SpendUpdateQueue()
self.daily_spend_update_queue = DailySpendUpdateQueue()
self.daily_team_spend_update_queue = DailySpendUpdateQueue()
async def update_database(
# LiteLLM management object fields
@ -160,6 +163,13 @@ class DBSpendUpdateWriter:
)
)
asyncio.create_task(
self.add_spend_log_transaction_to_daily_team_transaction(
payload=payload,
prisma_client=prisma_client,
)
)
verbose_proxy_logger.debug("Runs spend update on all tables")
except Exception:
verbose_proxy_logger.debug(
@ -383,6 +393,7 @@ class DBSpendUpdateWriter:
await self.redis_update_buffer.store_in_memory_spend_updates_in_redis(
spend_update_queue=self.spend_update_queue,
daily_spend_update_queue=self.daily_spend_update_queue,
daily_team_spend_update_queue=self.daily_team_spend_update_queue,
)
# Only commit from redis to db if this pod is the leader
@ -413,6 +424,16 @@ class DBSpendUpdateWriter:
proxy_logging_obj=proxy_logging_obj,
daily_spend_transactions=daily_spend_update_transactions,
)
daily_team_spend_update_transactions = (
await self.redis_update_buffer.get_all_daily_team_spend_update_transactions_from_redis_buffer()
)
if daily_team_spend_update_transactions is not None:
await DBSpendUpdateWriter.update_daily_team_spend(
n_retry_times=n_retry_times,
prisma_client=prisma_client,
proxy_logging_obj=proxy_logging_obj,
daily_spend_transactions=daily_team_spend_update_transactions,
)
except Exception as e:
verbose_proxy_logger.error(f"Error committing spend updates: {e}")
finally:
@ -448,8 +469,9 @@ class DBSpendUpdateWriter:
################## Daily Spend Update Transactions ##################
# Aggregate all in memory daily spend transactions and commit to db
daily_spend_update_transactions = (
await self.daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
daily_spend_update_transactions = cast(
Dict[str, DailyUserSpendTransaction],
await self.daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions(),
)
await DBSpendUpdateWriter.update_daily_user_spend(
@ -459,6 +481,20 @@ class DBSpendUpdateWriter:
daily_spend_transactions=daily_spend_update_transactions,
)
################## Daily Team Spend Update Transactions ##################
# Aggregate all in memory daily team spend transactions and commit to db
daily_team_spend_update_transactions = cast(
Dict[str, DailyTeamSpendTransaction],
await self.daily_team_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions(),
)
await DBSpendUpdateWriter.update_daily_team_spend(
n_retry_times=n_retry_times,
prisma_client=prisma_client,
proxy_logging_obj=proxy_logging_obj,
daily_spend_transactions=daily_team_spend_update_transactions,
)
async def _commit_spend_updates_to_db( # noqa: PLR0915
self,
prisma_client: PrismaClient,
@ -853,6 +889,195 @@ class DBSpendUpdateWriter:
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)
@staticmethod
async def update_daily_team_spend(
n_retry_times: int,
prisma_client: PrismaClient,
proxy_logging_obj: ProxyLogging,
daily_spend_transactions: Dict[str, DailyTeamSpendTransaction],
):
"""
Batch job to update LiteLLM_DailyTeamSpend table using in-memory daily_spend_transactions
"""
from litellm.proxy.utils import _raise_failed_update_spend_exception
### UPDATE DAILY USER SPEND ###
verbose_proxy_logger.debug(
"Daily Team Spend transactions: {}".format(len(daily_spend_transactions))
)
BATCH_SIZE = (
100 # Number of aggregated records to update in each database operation
)
start_time = time.time()
try:
for i in range(n_retry_times + 1):
try:
# Get transactions to process
transactions_to_process = dict(
list(daily_spend_transactions.items())[:BATCH_SIZE]
)
if len(transactions_to_process) == 0:
verbose_proxy_logger.debug(
"No new transactions to process for daily spend update"
)
break
# Update DailyUserSpend table in batches
async with prisma_client.db.batch_() as batcher:
for _, transaction in transactions_to_process.items():
team_id = transaction.get("team_id")
if not team_id: # Skip if no team_id
continue
batcher.litellm_dailyteamspend.upsert(
where={
"team_id_date_api_key_model_custom_llm_provider": {
"team_id": team_id,
"date": transaction["date"],
"api_key": transaction["api_key"],
"model": transaction["model"],
"custom_llm_provider": transaction.get(
"custom_llm_provider"
),
}
},
data={
"create": {
"team_id": team_id,
"date": transaction["date"],
"api_key": transaction["api_key"],
"model": transaction["model"],
"model_group": transaction.get("model_group"),
"custom_llm_provider": transaction.get(
"custom_llm_provider"
),
"prompt_tokens": transaction["prompt_tokens"],
"completion_tokens": transaction[
"completion_tokens"
],
"spend": transaction["spend"],
"api_requests": transaction["api_requests"],
"successful_requests": transaction[
"successful_requests"
],
"failed_requests": transaction[
"failed_requests"
],
},
"update": {
"prompt_tokens": {
"increment": transaction["prompt_tokens"]
},
"completion_tokens": {
"increment": transaction[
"completion_tokens"
]
},
"spend": {"increment": transaction["spend"]},
"api_requests": {
"increment": transaction["api_requests"]
},
"successful_requests": {
"increment": transaction[
"successful_requests"
]
},
"failed_requests": {
"increment": transaction["failed_requests"]
},
},
},
)
verbose_proxy_logger.info(
f"Processed {len(transactions_to_process)} daily team transactions in {time.time() - start_time:.2f}s"
)
# Remove processed transactions
for key in transactions_to_process.keys():
daily_spend_transactions.pop(key, None)
verbose_proxy_logger.debug(
f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s"
)
break
except DB_CONNECTION_ERROR_TYPES as e:
if i >= n_retry_times:
_raise_failed_update_spend_exception(
e=e,
start_time=start_time,
proxy_logging_obj=proxy_logging_obj,
)
await asyncio.sleep(2**i) # Exponential backoff
except Exception as e:
# Remove processed transactions even if there was an error
if "transactions_to_process" in locals():
for key in transactions_to_process.keys(): # type: ignore
daily_spend_transactions.pop(key, None)
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)
async def _common_add_spend_log_transaction_to_daily_transaction(
self,
payload: Union[dict, SpendLogsPayload],
prisma_client: PrismaClient,
type: Literal["user", "team"] = "user",
) -> Optional[BaseDailySpendTransaction]:
common_expected_keys = ["startTime", "api_key", "model", "custom_llm_provider"]
if type == "user":
expected_keys = ["user", *common_expected_keys]
else:
expected_keys = ["team_id", *common_expected_keys]
if not all(key in payload for key in expected_keys):
verbose_proxy_logger.debug(
f"Missing expected keys: {expected_keys}, in payload, skipping from daily_user_spend_transactions"
)
return None
request_status = prisma_client.get_request_status(payload)
verbose_proxy_logger.info(f"Logged request status: {request_status}")
_metadata: SpendLogsMetadata = json.loads(payload["metadata"])
usage_obj = _metadata.get("usage_object", {}) or {}
if isinstance(payload["startTime"], datetime):
start_time = payload["startTime"].isoformat()
date = start_time.split("T")[0]
elif isinstance(payload["startTime"], str):
date = payload["startTime"].split("T")[0]
else:
verbose_proxy_logger.debug(
f"Invalid start time: {payload['startTime']}, skipping from daily_user_spend_transactions"
)
return None
try:
daily_transaction = BaseDailySpendTransaction(
date=date,
api_key=payload["api_key"],
model=payload["model"],
model_group=payload["model_group"],
custom_llm_provider=payload["custom_llm_provider"],
prompt_tokens=payload["prompt_tokens"],
completion_tokens=payload["completion_tokens"],
spend=payload["spend"],
api_requests=1,
successful_requests=1 if request_status == "success" else 0,
failed_requests=1 if request_status != "success" else 0,
cache_read_input_tokens=usage_obj.get("cache_read_input_tokens", 0)
or 0,
cache_creation_input_tokens=usage_obj.get(
"cache_creation_input_tokens", 0
)
or 0,
)
return daily_transaction
except Exception as e:
raise e
async def add_spend_log_transaction_to_daily_user_transaction(
self,
payload: Union[dict, SpendLogsPayload],
@ -870,55 +1095,51 @@ class DBSpendUpdateWriter:
"prisma_client is None. Skipping writing spend logs to db."
)
return
expected_keys = ["user", "startTime", "api_key", "model", "custom_llm_provider"]
if not all(key in payload for key in expected_keys):
verbose_proxy_logger.debug(
f"Missing expected keys: {expected_keys}, in payload, skipping from daily_user_spend_transactions"
base_daily_transaction = (
await self._common_add_spend_log_transaction_to_daily_transaction(
payload, prisma_client, "user"
)
)
if base_daily_transaction is None:
return
request_status = prisma_client.get_request_status(payload)
verbose_proxy_logger.info(f"Logged request status: {request_status}")
_metadata: SpendLogsMetadata = json.loads(payload["metadata"])
usage_obj = _metadata.get("usage_object", {}) or {}
cache_read_input_tokens = usage_obj.get("cache_read_input_tokens", 0) or 0
cache_creation_input_tokens = (
usage_obj.get("cache_creation_input_tokens", 0) or 0
daily_transaction_key = f"{payload['user']}_{base_daily_transaction['date']}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}"
daily_transaction = DailyUserSpendTransaction(
user_id=payload["user"], **base_daily_transaction
)
await self.daily_spend_update_queue.add_update(
update={daily_transaction_key: daily_transaction}
)
if isinstance(payload["startTime"], datetime):
start_time = payload["startTime"].isoformat()
date = start_time.split("T")[0]
elif isinstance(payload["startTime"], str):
date = payload["startTime"].split("T")[0]
else:
async def add_spend_log_transaction_to_daily_team_transaction(
self,
payload: SpendLogsPayload,
prisma_client: Optional[PrismaClient] = None,
) -> None:
if prisma_client is None:
verbose_proxy_logger.debug(
f"Invalid start time: {payload['startTime']}, skipping from daily_user_spend_transactions"
"prisma_client is None. Skipping writing spend logs to db."
)
return
try:
daily_transaction_key = f"{payload['user']}_{date}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}"
daily_transaction = DailyUserSpendTransaction(
user_id=payload["user"],
date=date,
api_key=payload["api_key"],
model=payload["model"],
model_group=payload["model_group"],
custom_llm_provider=payload["custom_llm_provider"],
prompt_tokens=payload["prompt_tokens"],
completion_tokens=payload["completion_tokens"],
spend=payload["spend"],
api_requests=1,
successful_requests=1 if request_status == "success" else 0,
failed_requests=1 if request_status != "success" else 0,
cache_read_input_tokens=cache_read_input_tokens,
cache_creation_input_tokens=cache_creation_input_tokens,
)
await self.daily_spend_update_queue.add_update(
update={daily_transaction_key: daily_transaction}
base_daily_transaction = (
await self._common_add_spend_log_transaction_to_daily_transaction(
payload, prisma_client, "team"
)
)
if base_daily_transaction is None:
return
if payload["team_id"] is None:
verbose_proxy_logger.debug(
"team_id is None for request. Skipping incrementing team spend."
)
return
except Exception as e:
raise e
daily_transaction_key = f"{payload['team_id']}_{base_daily_transaction['date']}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}"
daily_transaction = DailyTeamSpendTransaction(
team_id=payload["team_id"], **base_daily_transaction
)
await self.daily_team_spend_update_queue.add_update(
update={daily_transaction_key: daily_transaction}
)

View file

@ -3,7 +3,7 @@ from copy import deepcopy
from typing import Dict, List, Optional
from litellm._logging import verbose_proxy_logger
from litellm.proxy._types import DailyUserSpendTransaction
from litellm.proxy._types import BaseDailySpendTransaction
from litellm.proxy.db.db_transaction_queue.base_update_queue import (
BaseUpdateQueue,
service_logger_obj,
@ -53,11 +53,11 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
def __init__(self):
super().__init__()
self.update_queue: asyncio.Queue[Dict[str, DailyUserSpendTransaction]] = (
self.update_queue: asyncio.Queue[Dict[str, BaseDailySpendTransaction]] = (
asyncio.Queue()
)
async def add_update(self, update: Dict[str, DailyUserSpendTransaction]):
async def add_update(self, update: Dict[str, BaseDailySpendTransaction]):
"""Enqueue an update."""
verbose_proxy_logger.debug("Adding update to queue: %s", update)
await self.update_queue.put(update)
@ -72,7 +72,7 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
Combine all updates in the queue into a single update.
This is used to reduce the size of the in-memory queue.
"""
updates: List[Dict[str, DailyUserSpendTransaction]] = (
updates: List[Dict[str, BaseDailySpendTransaction]] = (
await self.flush_all_updates_from_in_memory_queue()
)
aggregated_updates = self.get_aggregated_daily_spend_update_transactions(
@ -82,8 +82,8 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
async def flush_and_get_aggregated_daily_spend_update_transactions(
self,
) -> Dict[str, DailyUserSpendTransaction]:
"""Get all updates from the queue and return all updates aggregated by daily_transaction_key."""
) -> Dict[str, BaseDailySpendTransaction]:
"""Get all updates from the queue and return all updates aggregated by daily_transaction_key. Works for both user and team spend updates."""
updates = await self.flush_all_updates_from_in_memory_queue()
aggregated_daily_spend_update_transactions = (
DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
@ -98,11 +98,11 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
@staticmethod
def get_aggregated_daily_spend_update_transactions(
updates: List[Dict[str, DailyUserSpendTransaction]],
) -> Dict[str, DailyUserSpendTransaction]:
updates: List[Dict[str, BaseDailySpendTransaction]],
) -> Dict[str, BaseDailySpendTransaction]:
"""Aggregate updates by daily_transaction_key."""
aggregated_daily_spend_update_transactions: Dict[
str, DailyUserSpendTransaction
str, BaseDailySpendTransaction
] = {}
for _update in updates:
for _key, payload in _update.items():

View file

@ -6,17 +6,22 @@ This is to prevent deadlocks and improve reliability
import asyncio
import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast
from litellm._logging import verbose_proxy_logger
from litellm.caching import RedisCache
from litellm.constants import (
MAX_REDIS_BUFFER_DEQUEUE_COUNT,
REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY,
REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY,
REDIS_UPDATE_BUFFER_KEY,
)
from litellm.litellm_core_utils.safe_json_dumps import safe_dumps
from litellm.proxy._types import DailyUserSpendTransaction, DBSpendUpdateTransactions
from litellm.proxy._types import (
DailyTeamSpendTransaction,
DailyUserSpendTransaction,
DBSpendUpdateTransactions,
)
from litellm.proxy.db.db_transaction_queue.base_update_queue import service_logger_obj
from litellm.proxy.db.db_transaction_queue.daily_spend_update_queue import (
DailySpendUpdateQueue,
@ -67,6 +72,7 @@ class RedisUpdateBuffer:
self,
spend_update_queue: SpendUpdateQueue,
daily_spend_update_queue: DailySpendUpdateQueue,
daily_team_spend_update_queue: DailySpendUpdateQueue,
):
"""
Stores the in-memory spend updates to Redis
@ -127,6 +133,9 @@ class RedisUpdateBuffer:
daily_spend_update_transactions = (
await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
)
daily_team_spend_update_transactions = (
await daily_team_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
)
verbose_proxy_logger.debug(
"ALL DAILY SPEND UPDATE TRANSACTIONS: %s", daily_spend_update_transactions
)
@ -161,6 +170,19 @@ class RedisUpdateBuffer:
service=ServiceTypes.REDIS_DAILY_SPEND_UPDATE_QUEUE,
)
list_of_daily_team_spend_update_transactions = [
safe_dumps(daily_team_spend_update_transactions)
]
current_redis_buffer_size = await self.redis_cache.async_rpush(
key=REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY,
values=list_of_daily_team_spend_update_transactions,
)
await self._emit_new_item_added_to_redis_buffer_event(
queue_size=current_redis_buffer_size,
service=ServiceTypes.REDIS_DAILY_TEAM_SPEND_UPDATE_QUEUE,
)
@staticmethod
def _number_of_transactions_to_store_in_redis(
db_spend_update_transactions: DBSpendUpdateTransactions,
@ -258,8 +280,35 @@ class RedisUpdateBuffer:
list_of_daily_spend_update_transactions = [
json.loads(transaction) for transaction in list_of_transactions
]
return DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
list_of_daily_spend_update_transactions
return cast(
Dict[str, DailyUserSpendTransaction],
DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
list_of_daily_spend_update_transactions
),
)
async def get_all_daily_team_spend_update_transactions_from_redis_buffer(
self,
) -> Optional[Dict[str, DailyTeamSpendTransaction]]:
"""
Gets all the daily team spend update transactions from Redis
"""
if self.redis_cache is None:
return None
list_of_transactions = await self.redis_cache.async_lpop(
key=REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY,
count=MAX_REDIS_BUFFER_DEQUEUE_COUNT,
)
if list_of_transactions is None:
return None
list_of_daily_spend_update_transactions = [
json.loads(transaction) for transaction in list_of_transactions
]
return cast(
Dict[str, DailyTeamSpendTransaction],
DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
list_of_daily_spend_update_transactions
),
)
@staticmethod

View file

@ -342,6 +342,31 @@ model LiteLLM_DailyUserSpend {
@@index([model])
}
// Track daily team spend metrics per model and key
model LiteLLM_DailyTeamSpend {
id String @id @default(uuid())
team_id String
date String
api_key String
model String
model_group String?
custom_llm_provider String?
prompt_tokens Int @default(0)
completion_tokens Int @default(0)
spend Float @default(0.0)
api_requests Int @default(0)
successful_requests Int @default(0)
failed_requests Int @default(0)
created_at DateTime @default(now())
updated_at DateTime @updatedAt
@@unique([team_id, date, api_key, model, custom_llm_provider])
@@index([date])
@@index([team_id])
@@index([api_key])
@@index([model])
}
// Track the status of cron jobs running. Only allow one pod to run the job at a time
model LiteLLM_CronJob {

View file

@ -33,7 +33,7 @@ class ServiceTypes(str, enum.Enum):
# daily spend update queue - actual transaction events
IN_MEMORY_DAILY_SPEND_UPDATE_QUEUE = "in_memory_daily_spend_update_queue"
REDIS_DAILY_SPEND_UPDATE_QUEUE = "redis_daily_spend_update_queue"
REDIS_DAILY_TEAM_SPEND_UPDATE_QUEUE = "redis_daily_team_spend_update_queue"
# spend update queue - current spend of key, user, team
IN_MEMORY_SPEND_UPDATE_QUEUE = "in_memory_spend_update_queue"
REDIS_SPEND_UPDATE_QUEUE = "redis_spend_update_queue"

42
poetry.lock generated
View file

@ -1011,13 +1011,13 @@ grpcio-gcp = ["grpcio-gcp (>=0.2.2,<1.0.dev0)"]
[[package]]
name = "google-auth"
version = "2.38.0"
version = "2.39.0"
description = "Google Authentication Library"
optional = true
python-versions = ">=3.7"
files = [
{file = "google_auth-2.38.0-py2.py3-none-any.whl", hash = "sha256:e7dae6694313f434a2727bf2906f27ad259bae090d7aa896590d86feec3d9d4a"},
{file = "google_auth-2.38.0.tar.gz", hash = "sha256:8285113607d3b80a3f1543b75962447ba8a09fe85783432a784fdeef6ac094c4"},
{file = "google_auth-2.39.0-py2.py3-none-any.whl", hash = "sha256:0150b6711e97fb9f52fe599f55648950cc4540015565d8fbb31be2ad6e1548a2"},
{file = "google_auth-2.39.0.tar.gz", hash = "sha256:73222d43cdc35a3aeacbfdcaf73142a97839f10de930550d89ebfe1d0a00cde7"},
]
[package.dependencies]
@ -1026,12 +1026,14 @@ pyasn1-modules = ">=0.2.1"
rsa = ">=3.1.4,<5"
[package.extras]
aiohttp = ["aiohttp (>=3.6.2,<4.0.0.dev0)", "requests (>=2.20.0,<3.0.0.dev0)"]
aiohttp = ["aiohttp (>=3.6.2,<4.0.0)", "requests (>=2.20.0,<3.0.0)"]
enterprise-cert = ["cryptography", "pyopenssl"]
pyjwt = ["cryptography (>=38.0.3)", "pyjwt (>=2.0)"]
pyopenssl = ["cryptography (>=38.0.3)", "pyopenssl (>=20.0.0)"]
pyjwt = ["cryptography (<39.0.0)", "cryptography (>=38.0.3)", "pyjwt (>=2.0)"]
pyopenssl = ["cryptography (<39.0.0)", "cryptography (>=38.0.3)", "pyopenssl (>=20.0.0)"]
reauth = ["pyu2f (>=0.1.5)"]
requests = ["requests (>=2.20.0,<3.0.0.dev0)"]
requests = ["requests (>=2.20.0,<3.0.0)"]
testing = ["aiohttp (<3.10.0)", "aiohttp (>=3.6.2,<4.0.0)", "aioresponses", "cryptography (<39.0.0)", "cryptography (>=38.0.3)", "flask", "freezegun", "grpcio", "mock", "oauth2client", "packaging", "pyjwt (>=2.0)", "pyopenssl (<24.3.0)", "pyopenssl (>=20.0.0)", "pytest", "pytest-asyncio", "pytest-cov", "pytest-localserver", "pyu2f (>=0.1.5)", "requests (>=2.20.0,<3.0.0)", "responses", "urllib3"]
urllib3 = ["packaging", "urllib3"]
[[package]]
name = "google-cloud-kms"
@ -1053,13 +1055,13 @@ protobuf = ">=3.20.2,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4
[[package]]
name = "googleapis-common-protos"
version = "1.69.2"
version = "1.70.0"
description = "Common protobufs used in Google APIs"
optional = true
python-versions = ">=3.7"
files = [
{file = "googleapis_common_protos-1.69.2-py3-none-any.whl", hash = "sha256:0b30452ff9c7a27d80bfc5718954063e8ab53dd3697093d3bc99581f5fd24212"},
{file = "googleapis_common_protos-1.69.2.tar.gz", hash = "sha256:3e1b904a27a33c821b4b749fd31d334c0c9c30e6113023d495e48979a3dc9c5f"},
{file = "googleapis_common_protos-1.70.0-py3-none-any.whl", hash = "sha256:b8bfcca8c25a2bb253e0e0b0adaf8c00773e5e6af6fd92397576680b807e0fd8"},
{file = "googleapis_common_protos-1.70.0.tar.gz", hash = "sha256:0e1b44e0ea153e6594f9f394fef15193a68aaaea2d843f83e2742717ca753257"},
]
[package.dependencies]
@ -1680,13 +1682,13 @@ referencing = ">=0.31.0"
[[package]]
name = "litellm-proxy-extras"
version = "0.1.7"
version = "0.1.8"
description = "Additional files for the LiteLLM Proxy. Reduces the size of the main litellm package."
optional = true
python-versions = "!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*,>=3.8"
files = [
{file = "litellm_proxy_extras-0.1.7-py3-none-any.whl", hash = "sha256:d07eb1b8827127222c671a4c2a1730975d7e403bb334dbdadb264d64c99c479e"},
{file = "litellm_proxy_extras-0.1.7.tar.gz", hash = "sha256:d34e4e91edbdac244f51fbfb973fff5a9f23850eff717fbdbdb2af0a9e85ef4a"},
{file = "litellm_proxy_extras-0.1.8-py3-none-any.whl", hash = "sha256:42f261b66a43bd47a25eee0df547f93e375de208b5cb9da524379626c1632dcb"},
{file = "litellm_proxy_extras-0.1.8.tar.gz", hash = "sha256:81c18b068184b87eb32088afa50358ac7f27a747d446c949291706bfe8158310"},
]
[[package]]
@ -2180,13 +2182,13 @@ signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"]
[[package]]
name = "openai"
version = "1.73.0"
version = "1.74.0"
description = "The official Python library for the openai API"
optional = false
python-versions = ">=3.8"
files = [
{file = "openai-1.73.0-py3-none-any.whl", hash = "sha256:f52d1f673fb4ce6069a40d544a80fcb062eba1b3f489004fac4f9923a074c425"},
{file = "openai-1.73.0.tar.gz", hash = "sha256:b58ea39ba589de07db85c9905557ac12d2fc77600dcd2b92a08b99c9a3dce9e0"},
{file = "openai-1.74.0-py3-none-any.whl", hash = "sha256:aff3e0f9fb209836382ec112778667027f4fd6ae38bdb2334bc9e173598b092a"},
{file = "openai-1.74.0.tar.gz", hash = "sha256:592c25b8747a7cad33a841958f5eb859a785caea9ee22b9e4f4a2ec062236526"},
]
[package.dependencies]
@ -3326,13 +3328,13 @@ files = [
[[package]]
name = "rq"
version = "2.3.1"
version = "2.3.2"
description = "RQ is a simple, lightweight, library for creating background jobs, and processing them."
optional = true
python-versions = ">=3.8"
files = [
{file = "rq-2.3.1-py3-none-any.whl", hash = "sha256:2bbd48b976fdd840865dcab4bed358eb94b4dd8a02e92add75a346a909c1793d"},
{file = "rq-2.3.1.tar.gz", hash = "sha256:9cb33be7a90c6b36c0d6b9a6524aaf85b8855251ace476d74a076e6dfc5684d6"},
{file = "rq-2.3.2-py3-none-any.whl", hash = "sha256:bf4dc622a7b9d5f7d4a39444f26d89ce6de8a1d6db61b21060612114dbf8d5ff"},
{file = "rq-2.3.2.tar.gz", hash = "sha256:5bd212992724428ec1689736abde783d245e7856bca39d89845884f5d580f5f1"},
]
[package.dependencies]
@ -4151,4 +4153,4 @@ proxy = ["PyJWT", "apscheduler", "backoff", "boto3", "cryptography", "fastapi",
[metadata]
lock-version = "2.0"
python-versions = ">=3.8.1,<4.0, !=3.9.7"
content-hash = "35a6b009d763180a0f7e00c95c9dc21bc07f339e5b2f0dd12f14c908cc1dd0df"
content-hash = "37dd81eae90a4d984b90067ddf934dcfa1ef61f45476b13af0e3634dfa309051"

View file

@ -1,6 +1,6 @@
[tool.poetry]
name = "litellm"
version = "1.66.1"
version = "1.66.2"
description = "Library to easily interface with LLM API providers"
authors = ["BerriAI"]
license = "MIT"
@ -55,7 +55,7 @@ websockets = {version = "^13.1.0", optional = true}
boto3 = {version = "1.34.34", optional = true}
redisvl = {version = "^0.4.1", optional = true, markers = "python_version >= '3.9' and python_version < '3.14'"}
mcp = {version = "1.5.0", optional = true, python = ">=3.10"}
litellm-proxy-extras = {version = "0.1.7", optional = true}
litellm-proxy-extras = {version = "0.1.8", optional = true}
[tool.poetry.extras]
proxy = [
@ -118,7 +118,7 @@ requires = ["poetry-core", "wheel"]
build-backend = "poetry.core.masonry.api"
[tool.commitizen]
version = "1.66.1"
version = "1.66.2"
version_files = [
"pyproject.toml:^version"
]

View file

@ -37,7 +37,7 @@ sentry_sdk==2.21.0 # for sentry error handling
detect-secrets==1.5.0 # Enterprise - secret detection / masking in LLM requests
cryptography==43.0.1
tzdata==2025.1 # IANA time zone database
litellm-proxy-extras==0.1.7 # for proxy extras - e.g. prisma migrations
litellm-proxy-extras==0.1.8 # for proxy extras - e.g. prisma migrations
### LITELLM PACKAGE DEPENDENCIES
python-dotenv==1.0.0 # for env
tiktoken==0.8.0 # for calculating usage

View file

@ -342,6 +342,31 @@ model LiteLLM_DailyUserSpend {
@@index([model])
}
// Track daily team spend metrics per model and key
model LiteLLM_DailyTeamSpend {
id String @id @default(uuid())
team_id String
date String
api_key String
model String
model_group String?
custom_llm_provider String?
prompt_tokens Int @default(0)
completion_tokens Int @default(0)
spend Float @default(0.0)
api_requests Int @default(0)
successful_requests Int @default(0)
failed_requests Int @default(0)
created_at DateTime @default(now())
updated_at DateTime @updatedAt
@@unique([team_id, date, api_key, model, custom_llm_provider])
@@index([date])
@@index([team_id])
@@index([api_key])
@@index([model])
}
// Track the status of cron jobs running. Only allow one pod to run the job at a time
model LiteLLM_CronJob {

View file

@ -19,8 +19,8 @@ from litellm.types.utils import CallTypes
@pytest.fixture
def setup_mocks():
with patch(
"litellm.llms.azure.common_utils.get_azure_ad_token_from_entrata_id"
) as mock_entrata_token, patch(
"litellm.llms.azure.common_utils.get_azure_ad_token_from_entra_id"
) as mock_entra_token, patch(
"litellm.llms.azure.common_utils.get_azure_ad_token_from_username_password"
) as mock_username_password_token, patch(
"litellm.llms.azure.common_utils.get_azure_ad_token_from_oidc"
@ -37,7 +37,7 @@ def setup_mocks():
mock_litellm.AZURE_DEFAULT_API_VERSION = "2023-05-15"
mock_litellm.enable_azure_ad_token_refresh = False
mock_entrata_token.return_value = lambda: "mock-entrata-token"
mock_entra_token.return_value = lambda: "mock-entra-token"
mock_username_password_token.return_value = (
lambda: "mock-username-password-token"
)
@ -49,7 +49,7 @@ def setup_mocks():
)
yield {
"entrata_token": mock_entrata_token,
"entra_token": mock_entra_token,
"username_password_token": mock_username_password_token,
"oidc_token": mock_oidc_token,
"token_provider": mock_token_provider,
@ -92,8 +92,8 @@ def test_initialize_with_tenant_credentials_env_var(setup_mocks, monkeypatch):
is_async=False,
)
# Verify that get_azure_ad_token_from_entrata_id was called
setup_mocks["entrata_token"].assert_called_once_with(
# Verify that get_azure_ad_token_from_entra_id was called
setup_mocks["entra_token"].assert_called_once_with(
tenant_id="test-tenant-id",
client_id="test-client-id",
client_secret="test-client-secret",
@ -120,8 +120,8 @@ def test_initialize_with_tenant_credentials(setup_mocks):
is_async=False,
)
# Verify that get_azure_ad_token_from_entrata_id was called
setup_mocks["entrata_token"].assert_called_once_with(
# Verify that get_azure_ad_token_from_entra_id was called
setup_mocks["entra_token"].assert_called_once_with(
tenant_id="test-tenant-id",
client_id="test-client-id",
client_secret="test-client-secret",

View file

@ -864,29 +864,6 @@ def test_vertex_ai_embedding_completion_cost(caplog):
# assert False
def test_completion_azure_ai():
try:
os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True"
litellm.model_cost = litellm.get_model_cost_map(url="")
litellm.set_verbose = True
response = litellm.completion(
model="azure_ai/Mistral-large-nmefg",
messages=[{"content": "what llm are you", "role": "user"}],
max_tokens=15,
num_retries=3,
api_base=os.getenv("AZURE_AI_MISTRAL_API_BASE"),
api_key=os.getenv("AZURE_AI_MISTRAL_API_KEY"),
)
print(response)
assert "response_cost" in response._hidden_params
assert isinstance(response._hidden_params["response_cost"], float)
except Exception as e:
pytest.fail(f"Error occurred: {e}")
@pytest.mark.parametrize("sync_mode", [True, False])
@pytest.mark.asyncio
async def test_completion_cost_hidden_params(sync_mode):

View file

@ -133,9 +133,10 @@
}
},
"node_modules/@babel/runtime": {
"version": "7.23.9",
"resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.23.9.tgz",
"integrity": "sha512-0CX6F+BI2s9dkUqr08KFrAIZgNFj75rdBU/DjCyYLIaV/quFjkk6T+EJ2LkZHyZTbEV4L5p97mNkUsHl2wLFAw==",
"version": "7.27.0",
"resolved": "https://registry.npmjs.org/@babel/runtime/-/runtime-7.27.0.tgz",
"integrity": "sha512-VtPOkrdPHZsKc/clNqyi9WUA8TINkZ4cGk63UUE3u4pmB2k+ZMQRDuIOagv8UVd6j7k0T3+RRIb7beKTebNbcw==",
"license": "MIT",
"dependencies": {
"regenerator-runtime": "^0.14.0"
},

View file

@ -170,12 +170,6 @@ const AdvancedSettings: React.FC<AdvancedSettingsProps> = ({
</div>
)}
<CacheControlSettings
form={form}
showCacheControl={showCacheControl}
onCacheControlChange={handleCacheControlChange}
/>
<Form.Item
label="Use in pass through routes"
name="use_in_pass_through"
@ -195,6 +189,12 @@ const AdvancedSettings: React.FC<AdvancedSettingsProps> = ({
className="bg-gray-600"
/>
</Form.Item>
<CacheControlSettings
form={form}
showCacheControl={showCacheControl}
onCacheControlChange={handleCacheControlChange}
/>
<Form.Item
label="LiteLLM Params"
name="litellm_extra_params"

View file

@ -44,7 +44,7 @@ const CacheControlSettings: React.FC<CacheControlSettingsProps> = ({
return (
<>
<Form.Item
label="Cache Control"
label="Cache Control Injection Points"
name="cache_control"
valuePropName="checked"
className="mb-4"
@ -56,8 +56,8 @@ const CacheControlSettings: React.FC<CacheControlSettingsProps> = ({
{showCacheControl && (
<div className="ml-6 pl-4 border-l-2 border-gray-200">
<Text className="text-sm text-gray-500 block mb-4">
Specify either a role (to cache all messages of that role) or a specific message index.
If both are provided, the index takes precedence.
Providers like Anthropic, Bedrock API require users to specify where to inject cache control checkpoints,
litellm can automatically add them for you as a cost saving feature.
</Text>
<Form.List
@ -85,7 +85,7 @@ const CacheControlSettings: React.FC<CacheControlSettingsProps> = ({
name={[field.name, 'role']}
className="mb-0"
style={{ width: '180px' }}
tooltip="Select a role to cache all messages of this type"
tooltip="LiteLLM will mark all messages of this role as cacheable"
>
<Select
placeholder="Select a role"
@ -108,7 +108,7 @@ const CacheControlSettings: React.FC<CacheControlSettingsProps> = ({
name={[field.name, 'index']}
className="mb-0"
style={{ width: '180px' }}
tooltip="Specify a specific message index (optional)"
tooltip="(Optional) If set litellm will mark the message at this index as cacheable"
>
<NumericalInput
type="number"
@ -124,7 +124,7 @@ const CacheControlSettings: React.FC<CacheControlSettingsProps> = ({
{fields.length > 1 && (
<MinusCircleOutlined
className="text-red-500 cursor-pointer text-lg mt-8"
className="text-red-500 cursor-pointer text-lg ml-12"
onClick={() => {
remove(field.name);
setTimeout(() => {