Add aggregate team based usage logging (#10039)

* feat(schema.prisma): initial commit adding aggregate table for team spend

allows team spend to be visible at 1m+ logs

* feat(db_spend_update_writer.py): support logging aggregate team spend

allows usage dashboard to work at 1m+ logs

* feat(litellm-proxy-extras/): add new migration file

* fix(db_spend_update_writer.py): fix return type

* build: bump requirements

* fix: fix ruff error
This commit is contained in:
Krish Dholakia 2025-04-15 20:58:48 -07:00 committed by GitHub
parent 61f687f5c2
commit 08053e8f12
15 changed files with 444 additions and 77 deletions

Binary file not shown.

View file

@ -0,0 +1,36 @@
-- CreateTable
CREATE TABLE "LiteLLM_DailyTeamSpend" (
"id" TEXT NOT NULL,
"team_id" TEXT NOT NULL,
"date" TEXT NOT NULL,
"api_key" TEXT NOT NULL,
"model" TEXT NOT NULL,
"model_group" TEXT,
"custom_llm_provider" TEXT,
"prompt_tokens" INTEGER NOT NULL DEFAULT 0,
"completion_tokens" INTEGER NOT NULL DEFAULT 0,
"spend" DOUBLE PRECISION NOT NULL DEFAULT 0.0,
"api_requests" INTEGER NOT NULL DEFAULT 0,
"successful_requests" INTEGER NOT NULL DEFAULT 0,
"failed_requests" INTEGER NOT NULL DEFAULT 0,
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updated_at" TIMESTAMP(3) NOT NULL,
CONSTRAINT "LiteLLM_DailyTeamSpend_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE INDEX "LiteLLM_DailyTeamSpend_date_idx" ON "LiteLLM_DailyTeamSpend"("date");
-- CreateIndex
CREATE INDEX "LiteLLM_DailyTeamSpend_team_id_idx" ON "LiteLLM_DailyTeamSpend"("team_id");
-- CreateIndex
CREATE INDEX "LiteLLM_DailyTeamSpend_api_key_idx" ON "LiteLLM_DailyTeamSpend"("api_key");
-- CreateIndex
CREATE INDEX "LiteLLM_DailyTeamSpend_model_idx" ON "LiteLLM_DailyTeamSpend"("model");
-- CreateIndex
CREATE UNIQUE INDEX "LiteLLM_DailyTeamSpend_team_id_date_api_key_model_custom_ll_key" ON "LiteLLM_DailyTeamSpend"("team_id", "date", "api_key", "model", "custom_llm_provider");

View file

@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "litellm-proxy-extras" name = "litellm-proxy-extras"
version = "0.1.7" version = "0.1.8"
description = "Additional files for the LiteLLM Proxy. Reduces the size of the main litellm package." description = "Additional files for the LiteLLM Proxy. Reduces the size of the main litellm package."
authors = ["BerriAI"] authors = ["BerriAI"]
readme = "README.md" readme = "README.md"
@ -22,7 +22,7 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api" build-backend = "poetry.core.masonry.api"
[tool.commitizen] [tool.commitizen]
version = "0.1.7" version = "0.1.8"
version_files = [ version_files = [
"pyproject.toml:version", "pyproject.toml:version",
"../requirements.txt:litellm-proxy-extras==", "../requirements.txt:litellm-proxy-extras==",

View file

@ -24,6 +24,7 @@ SINGLE_DEPLOYMENT_TRAFFIC_FAILURE_THRESHOLD = 1000 # Minimum number of requests
########### v2 Architecture constants for managing writing updates to the database ########### ########### v2 Architecture constants for managing writing updates to the database ###########
REDIS_UPDATE_BUFFER_KEY = "litellm_spend_update_buffer" REDIS_UPDATE_BUFFER_KEY = "litellm_spend_update_buffer"
REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_spend_update_buffer" REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_spend_update_buffer"
REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_team_spend_update_buffer"
MAX_REDIS_BUFFER_DEQUEUE_COUNT = 100 MAX_REDIS_BUFFER_DEQUEUE_COUNT = 100
MAX_SIZE_IN_MEMORY_QUEUE = 10000 MAX_SIZE_IN_MEMORY_QUEUE = 10000
MAX_IN_MEMORY_QUEUE_FLUSH_COUNT = 1000 MAX_IN_MEMORY_QUEUE_FLUSH_COUNT = 1000

View file

@ -2769,8 +2769,7 @@ class DefaultInternalUserParams(LiteLLMPydanticObjectBase):
) )
class DailyUserSpendTransaction(TypedDict): class BaseDailySpendTransaction(TypedDict):
user_id: str
date: str date: str
api_key: str api_key: str
model: str model: str
@ -2784,6 +2783,14 @@ class DailyUserSpendTransaction(TypedDict):
failed_requests: int failed_requests: int
class DailyTeamSpendTransaction(BaseDailySpendTransaction):
team_id: str
class DailyUserSpendTransaction(BaseDailySpendTransaction):
user_id: str
class DBSpendUpdateTransactions(TypedDict): class DBSpendUpdateTransactions(TypedDict):
""" """
Internal Data Structure for buffering spend updates in Redis or in memory before committing them to the database Internal Data Structure for buffering spend updates in Redis or in memory before committing them to the database

View file

@ -10,7 +10,7 @@ import os
import time import time
import traceback import traceback
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Dict, Optional, Union from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union, cast
import litellm import litellm
from litellm._logging import verbose_proxy_logger from litellm._logging import verbose_proxy_logger
@ -18,6 +18,8 @@ from litellm.caching import DualCache, RedisCache
from litellm.constants import DB_SPEND_UPDATE_JOB_NAME from litellm.constants import DB_SPEND_UPDATE_JOB_NAME
from litellm.proxy._types import ( from litellm.proxy._types import (
DB_CONNECTION_ERROR_TYPES, DB_CONNECTION_ERROR_TYPES,
BaseDailySpendTransaction,
DailyTeamSpendTransaction,
DailyUserSpendTransaction, DailyUserSpendTransaction,
DBSpendUpdateTransactions, DBSpendUpdateTransactions,
Litellm_EntityType, Litellm_EntityType,
@ -56,6 +58,7 @@ class DBSpendUpdateWriter:
self.pod_lock_manager = PodLockManager() self.pod_lock_manager = PodLockManager()
self.spend_update_queue = SpendUpdateQueue() self.spend_update_queue = SpendUpdateQueue()
self.daily_spend_update_queue = DailySpendUpdateQueue() self.daily_spend_update_queue = DailySpendUpdateQueue()
self.daily_team_spend_update_queue = DailySpendUpdateQueue()
async def update_database( async def update_database(
# LiteLLM management object fields # LiteLLM management object fields
@ -158,6 +161,13 @@ class DBSpendUpdateWriter:
) )
) )
asyncio.create_task(
self.add_spend_log_transaction_to_daily_team_transaction(
payload=payload,
prisma_client=prisma_client,
)
)
verbose_proxy_logger.debug("Runs spend update on all tables") verbose_proxy_logger.debug("Runs spend update on all tables")
except Exception: except Exception:
verbose_proxy_logger.debug( verbose_proxy_logger.debug(
@ -381,6 +391,7 @@ class DBSpendUpdateWriter:
await self.redis_update_buffer.store_in_memory_spend_updates_in_redis( await self.redis_update_buffer.store_in_memory_spend_updates_in_redis(
spend_update_queue=self.spend_update_queue, spend_update_queue=self.spend_update_queue,
daily_spend_update_queue=self.daily_spend_update_queue, daily_spend_update_queue=self.daily_spend_update_queue,
daily_team_spend_update_queue=self.daily_team_spend_update_queue,
) )
# Only commit from redis to db if this pod is the leader # Only commit from redis to db if this pod is the leader
@ -411,6 +422,16 @@ class DBSpendUpdateWriter:
proxy_logging_obj=proxy_logging_obj, proxy_logging_obj=proxy_logging_obj,
daily_spend_transactions=daily_spend_update_transactions, daily_spend_transactions=daily_spend_update_transactions,
) )
daily_team_spend_update_transactions = (
await self.redis_update_buffer.get_all_daily_team_spend_update_transactions_from_redis_buffer()
)
if daily_team_spend_update_transactions is not None:
await DBSpendUpdateWriter.update_daily_team_spend(
n_retry_times=n_retry_times,
prisma_client=prisma_client,
proxy_logging_obj=proxy_logging_obj,
daily_spend_transactions=daily_team_spend_update_transactions,
)
except Exception as e: except Exception as e:
verbose_proxy_logger.error(f"Error committing spend updates: {e}") verbose_proxy_logger.error(f"Error committing spend updates: {e}")
finally: finally:
@ -446,8 +467,9 @@ class DBSpendUpdateWriter:
################## Daily Spend Update Transactions ################## ################## Daily Spend Update Transactions ##################
# Aggregate all in memory daily spend transactions and commit to db # Aggregate all in memory daily spend transactions and commit to db
daily_spend_update_transactions = ( daily_spend_update_transactions = cast(
await self.daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions() Dict[str, DailyUserSpendTransaction],
await self.daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions(),
) )
await DBSpendUpdateWriter.update_daily_user_spend( await DBSpendUpdateWriter.update_daily_user_spend(
@ -457,6 +479,20 @@ class DBSpendUpdateWriter:
daily_spend_transactions=daily_spend_update_transactions, daily_spend_transactions=daily_spend_update_transactions,
) )
################## Daily Team Spend Update Transactions ##################
# Aggregate all in memory daily team spend transactions and commit to db
daily_team_spend_update_transactions = cast(
Dict[str, DailyTeamSpendTransaction],
await self.daily_team_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions(),
)
await DBSpendUpdateWriter.update_daily_team_spend(
n_retry_times=n_retry_times,
prisma_client=prisma_client,
proxy_logging_obj=proxy_logging_obj,
daily_spend_transactions=daily_team_spend_update_transactions,
)
async def _commit_spend_updates_to_db( # noqa: PLR0915 async def _commit_spend_updates_to_db( # noqa: PLR0915
self, self,
prisma_client: PrismaClient, prisma_client: PrismaClient,
@ -835,6 +871,187 @@ class DBSpendUpdateWriter:
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
) )
@staticmethod
async def update_daily_team_spend(
n_retry_times: int,
prisma_client: PrismaClient,
proxy_logging_obj: ProxyLogging,
daily_spend_transactions: Dict[str, DailyTeamSpendTransaction],
):
"""
Batch job to update LiteLLM_DailyTeamSpend table using in-memory daily_spend_transactions
"""
from litellm.proxy.utils import _raise_failed_update_spend_exception
### UPDATE DAILY USER SPEND ###
verbose_proxy_logger.debug(
"Daily Team Spend transactions: {}".format(len(daily_spend_transactions))
)
BATCH_SIZE = (
100 # Number of aggregated records to update in each database operation
)
start_time = time.time()
try:
for i in range(n_retry_times + 1):
try:
# Get transactions to process
transactions_to_process = dict(
list(daily_spend_transactions.items())[:BATCH_SIZE]
)
if len(transactions_to_process) == 0:
verbose_proxy_logger.debug(
"No new transactions to process for daily spend update"
)
break
# Update DailyUserSpend table in batches
async with prisma_client.db.batch_() as batcher:
for _, transaction in transactions_to_process.items():
team_id = transaction.get("team_id")
if not team_id: # Skip if no team_id
continue
batcher.litellm_dailyteamspend.upsert(
where={
"team_id_date_api_key_model_custom_llm_provider": {
"team_id": team_id,
"date": transaction["date"],
"api_key": transaction["api_key"],
"model": transaction["model"],
"custom_llm_provider": transaction.get(
"custom_llm_provider"
),
}
},
data={
"create": {
"team_id": team_id,
"date": transaction["date"],
"api_key": transaction["api_key"],
"model": transaction["model"],
"model_group": transaction.get("model_group"),
"custom_llm_provider": transaction.get(
"custom_llm_provider"
),
"prompt_tokens": transaction["prompt_tokens"],
"completion_tokens": transaction[
"completion_tokens"
],
"spend": transaction["spend"],
"api_requests": transaction["api_requests"],
"successful_requests": transaction[
"successful_requests"
],
"failed_requests": transaction[
"failed_requests"
],
},
"update": {
"prompt_tokens": {
"increment": transaction["prompt_tokens"]
},
"completion_tokens": {
"increment": transaction[
"completion_tokens"
]
},
"spend": {"increment": transaction["spend"]},
"api_requests": {
"increment": transaction["api_requests"]
},
"successful_requests": {
"increment": transaction[
"successful_requests"
]
},
"failed_requests": {
"increment": transaction["failed_requests"]
},
},
},
)
verbose_proxy_logger.info(
f"Processed {len(transactions_to_process)} daily team transactions in {time.time() - start_time:.2f}s"
)
# Remove processed transactions
for key in transactions_to_process.keys():
daily_spend_transactions.pop(key, None)
verbose_proxy_logger.debug(
f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s"
)
break
except DB_CONNECTION_ERROR_TYPES as e:
if i >= n_retry_times:
_raise_failed_update_spend_exception(
e=e,
start_time=start_time,
proxy_logging_obj=proxy_logging_obj,
)
await asyncio.sleep(2**i) # Exponential backoff
except Exception as e:
# Remove processed transactions even if there was an error
if "transactions_to_process" in locals():
for key in transactions_to_process.keys(): # type: ignore
daily_spend_transactions.pop(key, None)
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)
async def _common_add_spend_log_transaction_to_daily_transaction(
self,
payload: Union[dict, SpendLogsPayload],
prisma_client: PrismaClient,
type: Literal["user", "team"] = "user",
) -> Optional[BaseDailySpendTransaction]:
common_expected_keys = ["startTime", "api_key", "model", "custom_llm_provider"]
if type == "user":
expected_keys = ["user", *common_expected_keys]
else:
expected_keys = ["team_id", *common_expected_keys]
if not all(key in payload for key in expected_keys):
verbose_proxy_logger.debug(
f"Missing expected keys: {expected_keys}, in payload, skipping from daily_user_spend_transactions"
)
return None
request_status = prisma_client.get_request_status(payload)
verbose_proxy_logger.info(f"Logged request status: {request_status}")
if isinstance(payload["startTime"], datetime):
start_time = payload["startTime"].isoformat()
date = start_time.split("T")[0]
elif isinstance(payload["startTime"], str):
date = payload["startTime"].split("T")[0]
else:
verbose_proxy_logger.debug(
f"Invalid start time: {payload['startTime']}, skipping from daily_user_spend_transactions"
)
return None
try:
daily_transaction = BaseDailySpendTransaction(
date=date,
api_key=payload["api_key"],
model=payload["model"],
model_group=payload["model_group"],
custom_llm_provider=payload["custom_llm_provider"],
prompt_tokens=payload["prompt_tokens"],
completion_tokens=payload["completion_tokens"],
spend=payload["spend"],
api_requests=1,
successful_requests=1 if request_status == "success" else 0,
failed_requests=1 if request_status != "success" else 0,
)
return daily_transaction
except Exception as e:
raise e
async def add_spend_log_transaction_to_daily_user_transaction( async def add_spend_log_transaction_to_daily_user_transaction(
self, self,
payload: Union[dict, SpendLogsPayload], payload: Union[dict, SpendLogsPayload],
@ -852,46 +1069,51 @@ class DBSpendUpdateWriter:
"prisma_client is None. Skipping writing spend logs to db." "prisma_client is None. Skipping writing spend logs to db."
) )
return return
expected_keys = ["user", "startTime", "api_key", "model", "custom_llm_provider"]
if not all(key in payload for key in expected_keys): base_daily_transaction = (
verbose_proxy_logger.debug( await self._common_add_spend_log_transaction_to_daily_transaction(
f"Missing expected keys: {expected_keys}, in payload, skipping from daily_user_spend_transactions" payload, prisma_client, "user"
) )
)
if base_daily_transaction is None:
return return
request_status = prisma_client.get_request_status(payload) daily_transaction_key = f"{payload['user']}_{base_daily_transaction['date']}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}"
verbose_proxy_logger.info(f"Logged request status: {request_status}")
if isinstance(payload["startTime"], datetime):
start_time = payload["startTime"].isoformat()
date = start_time.split("T")[0]
elif isinstance(payload["startTime"], str):
date = payload["startTime"].split("T")[0]
else:
verbose_proxy_logger.debug(
f"Invalid start time: {payload['startTime']}, skipping from daily_user_spend_transactions"
)
return
try:
daily_transaction_key = f"{payload['user']}_{date}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}"
daily_transaction = DailyUserSpendTransaction( daily_transaction = DailyUserSpendTransaction(
user_id=payload["user"], user_id=payload["user"], **base_daily_transaction
date=date,
api_key=payload["api_key"],
model=payload["model"],
model_group=payload["model_group"],
custom_llm_provider=payload["custom_llm_provider"],
prompt_tokens=payload["prompt_tokens"],
completion_tokens=payload["completion_tokens"],
spend=payload["spend"],
api_requests=1,
successful_requests=1 if request_status == "success" else 0,
failed_requests=1 if request_status != "success" else 0,
) )
await self.daily_spend_update_queue.add_update( await self.daily_spend_update_queue.add_update(
update={daily_transaction_key: daily_transaction} update={daily_transaction_key: daily_transaction}
) )
except Exception as e: async def add_spend_log_transaction_to_daily_team_transaction(
raise e self,
payload: SpendLogsPayload,
prisma_client: Optional[PrismaClient] = None,
) -> None:
if prisma_client is None:
verbose_proxy_logger.debug(
"prisma_client is None. Skipping writing spend logs to db."
)
return
base_daily_transaction = (
await self._common_add_spend_log_transaction_to_daily_transaction(
payload, prisma_client, "team"
)
)
if base_daily_transaction is None:
return
if payload["team_id"] is None:
verbose_proxy_logger.debug(
"team_id is None for request. Skipping incrementing team spend."
)
return
daily_transaction_key = f"{payload['team_id']}_{base_daily_transaction['date']}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}"
daily_transaction = DailyTeamSpendTransaction(
team_id=payload["team_id"], **base_daily_transaction
)
await self.daily_team_spend_update_queue.add_update(
update={daily_transaction_key: daily_transaction}
)

View file

@ -3,7 +3,7 @@ from copy import deepcopy
from typing import Dict, List, Optional from typing import Dict, List, Optional
from litellm._logging import verbose_proxy_logger from litellm._logging import verbose_proxy_logger
from litellm.proxy._types import DailyUserSpendTransaction from litellm.proxy._types import BaseDailySpendTransaction
from litellm.proxy.db.db_transaction_queue.base_update_queue import ( from litellm.proxy.db.db_transaction_queue.base_update_queue import (
BaseUpdateQueue, BaseUpdateQueue,
service_logger_obj, service_logger_obj,
@ -54,10 +54,10 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.update_queue: asyncio.Queue[ self.update_queue: asyncio.Queue[
Dict[str, DailyUserSpendTransaction] Dict[str, BaseDailySpendTransaction]
] = asyncio.Queue() ] = asyncio.Queue()
async def add_update(self, update: Dict[str, DailyUserSpendTransaction]): async def add_update(self, update: Dict[str, BaseDailySpendTransaction]):
"""Enqueue an update.""" """Enqueue an update."""
verbose_proxy_logger.debug("Adding update to queue: %s", update) verbose_proxy_logger.debug("Adding update to queue: %s", update)
await self.update_queue.put(update) await self.update_queue.put(update)
@ -73,7 +73,7 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
This is used to reduce the size of the in-memory queue. This is used to reduce the size of the in-memory queue.
""" """
updates: List[ updates: List[
Dict[str, DailyUserSpendTransaction] Dict[str, BaseDailySpendTransaction]
] = await self.flush_all_updates_from_in_memory_queue() ] = await self.flush_all_updates_from_in_memory_queue()
aggregated_updates = self.get_aggregated_daily_spend_update_transactions( aggregated_updates = self.get_aggregated_daily_spend_update_transactions(
updates updates
@ -82,8 +82,8 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
async def flush_and_get_aggregated_daily_spend_update_transactions( async def flush_and_get_aggregated_daily_spend_update_transactions(
self, self,
) -> Dict[str, DailyUserSpendTransaction]: ) -> Dict[str, BaseDailySpendTransaction]:
"""Get all updates from the queue and return all updates aggregated by daily_transaction_key.""" """Get all updates from the queue and return all updates aggregated by daily_transaction_key. Works for both user and team spend updates."""
updates = await self.flush_all_updates_from_in_memory_queue() updates = await self.flush_all_updates_from_in_memory_queue()
aggregated_daily_spend_update_transactions = ( aggregated_daily_spend_update_transactions = (
DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions( DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
@ -98,11 +98,11 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
@staticmethod @staticmethod
def get_aggregated_daily_spend_update_transactions( def get_aggregated_daily_spend_update_transactions(
updates: List[Dict[str, DailyUserSpendTransaction]] updates: List[Dict[str, BaseDailySpendTransaction]]
) -> Dict[str, DailyUserSpendTransaction]: ) -> Dict[str, BaseDailySpendTransaction]:
"""Aggregate updates by daily_transaction_key.""" """Aggregate updates by daily_transaction_key."""
aggregated_daily_spend_update_transactions: Dict[ aggregated_daily_spend_update_transactions: Dict[
str, DailyUserSpendTransaction str, BaseDailySpendTransaction
] = {} ] = {}
for _update in updates: for _update in updates:
for _key, payload in _update.items(): for _key, payload in _update.items():

View file

@ -6,17 +6,22 @@ This is to prevent deadlocks and improve reliability
import asyncio import asyncio
import json import json
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast
from litellm._logging import verbose_proxy_logger from litellm._logging import verbose_proxy_logger
from litellm.caching import RedisCache from litellm.caching import RedisCache
from litellm.constants import ( from litellm.constants import (
MAX_REDIS_BUFFER_DEQUEUE_COUNT, MAX_REDIS_BUFFER_DEQUEUE_COUNT,
REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY, REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY,
REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY,
REDIS_UPDATE_BUFFER_KEY, REDIS_UPDATE_BUFFER_KEY,
) )
from litellm.litellm_core_utils.safe_json_dumps import safe_dumps from litellm.litellm_core_utils.safe_json_dumps import safe_dumps
from litellm.proxy._types import DailyUserSpendTransaction, DBSpendUpdateTransactions from litellm.proxy._types import (
DailyTeamSpendTransaction,
DailyUserSpendTransaction,
DBSpendUpdateTransactions,
)
from litellm.proxy.db.db_transaction_queue.base_update_queue import service_logger_obj from litellm.proxy.db.db_transaction_queue.base_update_queue import service_logger_obj
from litellm.proxy.db.db_transaction_queue.daily_spend_update_queue import ( from litellm.proxy.db.db_transaction_queue.daily_spend_update_queue import (
DailySpendUpdateQueue, DailySpendUpdateQueue,
@ -67,6 +72,7 @@ class RedisUpdateBuffer:
self, self,
spend_update_queue: SpendUpdateQueue, spend_update_queue: SpendUpdateQueue,
daily_spend_update_queue: DailySpendUpdateQueue, daily_spend_update_queue: DailySpendUpdateQueue,
daily_team_spend_update_queue: DailySpendUpdateQueue,
): ):
""" """
Stores the in-memory spend updates to Redis Stores the in-memory spend updates to Redis
@ -127,6 +133,9 @@ class RedisUpdateBuffer:
daily_spend_update_transactions = ( daily_spend_update_transactions = (
await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions() await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
) )
daily_team_spend_update_transactions = (
await daily_team_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
)
verbose_proxy_logger.debug( verbose_proxy_logger.debug(
"ALL DAILY SPEND UPDATE TRANSACTIONS: %s", daily_spend_update_transactions "ALL DAILY SPEND UPDATE TRANSACTIONS: %s", daily_spend_update_transactions
) )
@ -161,6 +170,19 @@ class RedisUpdateBuffer:
service=ServiceTypes.REDIS_DAILY_SPEND_UPDATE_QUEUE, service=ServiceTypes.REDIS_DAILY_SPEND_UPDATE_QUEUE,
) )
list_of_daily_team_spend_update_transactions = [
safe_dumps(daily_team_spend_update_transactions)
]
current_redis_buffer_size = await self.redis_cache.async_rpush(
key=REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY,
values=list_of_daily_team_spend_update_transactions,
)
await self._emit_new_item_added_to_redis_buffer_event(
queue_size=current_redis_buffer_size,
service=ServiceTypes.REDIS_DAILY_TEAM_SPEND_UPDATE_QUEUE,
)
@staticmethod @staticmethod
def _number_of_transactions_to_store_in_redis( def _number_of_transactions_to_store_in_redis(
db_spend_update_transactions: DBSpendUpdateTransactions, db_spend_update_transactions: DBSpendUpdateTransactions,
@ -258,8 +280,35 @@ class RedisUpdateBuffer:
list_of_daily_spend_update_transactions = [ list_of_daily_spend_update_transactions = [
json.loads(transaction) for transaction in list_of_transactions json.loads(transaction) for transaction in list_of_transactions
] ]
return DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions( return cast(
Dict[str, DailyUserSpendTransaction],
DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
list_of_daily_spend_update_transactions list_of_daily_spend_update_transactions
),
)
async def get_all_daily_team_spend_update_transactions_from_redis_buffer(
self,
) -> Optional[Dict[str, DailyTeamSpendTransaction]]:
"""
Gets all the daily team spend update transactions from Redis
"""
if self.redis_cache is None:
return None
list_of_transactions = await self.redis_cache.async_lpop(
key=REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY,
count=MAX_REDIS_BUFFER_DEQUEUE_COUNT,
)
if list_of_transactions is None:
return None
list_of_daily_spend_update_transactions = [
json.loads(transaction) for transaction in list_of_transactions
]
return cast(
Dict[str, DailyTeamSpendTransaction],
DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
list_of_daily_spend_update_transactions
),
) )
@staticmethod @staticmethod

View file

@ -340,6 +340,31 @@ model LiteLLM_DailyUserSpend {
@@index([model]) @@index([model])
} }
// Track daily team spend metrics per model and key
model LiteLLM_DailyTeamSpend {
id String @id @default(uuid())
team_id String
date String
api_key String
model String
model_group String?
custom_llm_provider String?
prompt_tokens Int @default(0)
completion_tokens Int @default(0)
spend Float @default(0.0)
api_requests Int @default(0)
successful_requests Int @default(0)
failed_requests Int @default(0)
created_at DateTime @default(now())
updated_at DateTime @updatedAt
@@unique([team_id, date, api_key, model, custom_llm_provider])
@@index([date])
@@index([team_id])
@@index([api_key])
@@index([model])
}
// Track the status of cron jobs running. Only allow one pod to run the job at a time // Track the status of cron jobs running. Only allow one pod to run the job at a time
model LiteLLM_CronJob { model LiteLLM_CronJob {

View file

@ -33,7 +33,7 @@ class ServiceTypes(str, enum.Enum):
# daily spend update queue - actual transaction events # daily spend update queue - actual transaction events
IN_MEMORY_DAILY_SPEND_UPDATE_QUEUE = "in_memory_daily_spend_update_queue" IN_MEMORY_DAILY_SPEND_UPDATE_QUEUE = "in_memory_daily_spend_update_queue"
REDIS_DAILY_SPEND_UPDATE_QUEUE = "redis_daily_spend_update_queue" REDIS_DAILY_SPEND_UPDATE_QUEUE = "redis_daily_spend_update_queue"
REDIS_DAILY_TEAM_SPEND_UPDATE_QUEUE = "redis_daily_team_spend_update_queue"
# spend update queue - current spend of key, user, team # spend update queue - current spend of key, user, team
IN_MEMORY_SPEND_UPDATE_QUEUE = "in_memory_spend_update_queue" IN_MEMORY_SPEND_UPDATE_QUEUE = "in_memory_spend_update_queue"
REDIS_SPEND_UPDATE_QUEUE = "redis_spend_update_queue" REDIS_SPEND_UPDATE_QUEUE = "redis_spend_update_queue"

42
poetry.lock generated
View file

@ -1011,13 +1011,13 @@ grpcio-gcp = ["grpcio-gcp (>=0.2.2,<1.0.dev0)"]
[[package]] [[package]]
name = "google-auth" name = "google-auth"
version = "2.38.0" version = "2.39.0"
description = "Google Authentication Library" description = "Google Authentication Library"
optional = true optional = true
python-versions = ">=3.7" python-versions = ">=3.7"
files = [ files = [
{file = "google_auth-2.38.0-py2.py3-none-any.whl", hash = "sha256:e7dae6694313f434a2727bf2906f27ad259bae090d7aa896590d86feec3d9d4a"}, {file = "google_auth-2.39.0-py2.py3-none-any.whl", hash = "sha256:0150b6711e97fb9f52fe599f55648950cc4540015565d8fbb31be2ad6e1548a2"},
{file = "google_auth-2.38.0.tar.gz", hash = "sha256:8285113607d3b80a3f1543b75962447ba8a09fe85783432a784fdeef6ac094c4"}, {file = "google_auth-2.39.0.tar.gz", hash = "sha256:73222d43cdc35a3aeacbfdcaf73142a97839f10de930550d89ebfe1d0a00cde7"},
] ]
[package.dependencies] [package.dependencies]
@ -1026,12 +1026,14 @@ pyasn1-modules = ">=0.2.1"
rsa = ">=3.1.4,<5" rsa = ">=3.1.4,<5"
[package.extras] [package.extras]
aiohttp = ["aiohttp (>=3.6.2,<4.0.0.dev0)", "requests (>=2.20.0,<3.0.0.dev0)"] aiohttp = ["aiohttp (>=3.6.2,<4.0.0)", "requests (>=2.20.0,<3.0.0)"]
enterprise-cert = ["cryptography", "pyopenssl"] enterprise-cert = ["cryptography", "pyopenssl"]
pyjwt = ["cryptography (>=38.0.3)", "pyjwt (>=2.0)"] pyjwt = ["cryptography (<39.0.0)", "cryptography (>=38.0.3)", "pyjwt (>=2.0)"]
pyopenssl = ["cryptography (>=38.0.3)", "pyopenssl (>=20.0.0)"] pyopenssl = ["cryptography (<39.0.0)", "cryptography (>=38.0.3)", "pyopenssl (>=20.0.0)"]
reauth = ["pyu2f (>=0.1.5)"] reauth = ["pyu2f (>=0.1.5)"]
requests = ["requests (>=2.20.0,<3.0.0.dev0)"] requests = ["requests (>=2.20.0,<3.0.0)"]
testing = ["aiohttp (<3.10.0)", "aiohttp (>=3.6.2,<4.0.0)", "aioresponses", "cryptography (<39.0.0)", "cryptography (>=38.0.3)", "flask", "freezegun", "grpcio", "mock", "oauth2client", "packaging", "pyjwt (>=2.0)", "pyopenssl (<24.3.0)", "pyopenssl (>=20.0.0)", "pytest", "pytest-asyncio", "pytest-cov", "pytest-localserver", "pyu2f (>=0.1.5)", "requests (>=2.20.0,<3.0.0)", "responses", "urllib3"]
urllib3 = ["packaging", "urllib3"]
[[package]] [[package]]
name = "google-cloud-kms" name = "google-cloud-kms"
@ -1053,13 +1055,13 @@ protobuf = ">=3.20.2,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4
[[package]] [[package]]
name = "googleapis-common-protos" name = "googleapis-common-protos"
version = "1.69.2" version = "1.70.0"
description = "Common protobufs used in Google APIs" description = "Common protobufs used in Google APIs"
optional = true optional = true
python-versions = ">=3.7" python-versions = ">=3.7"
files = [ files = [
{file = "googleapis_common_protos-1.69.2-py3-none-any.whl", hash = "sha256:0b30452ff9c7a27d80bfc5718954063e8ab53dd3697093d3bc99581f5fd24212"}, {file = "googleapis_common_protos-1.70.0-py3-none-any.whl", hash = "sha256:b8bfcca8c25a2bb253e0e0b0adaf8c00773e5e6af6fd92397576680b807e0fd8"},
{file = "googleapis_common_protos-1.69.2.tar.gz", hash = "sha256:3e1b904a27a33c821b4b749fd31d334c0c9c30e6113023d495e48979a3dc9c5f"}, {file = "googleapis_common_protos-1.70.0.tar.gz", hash = "sha256:0e1b44e0ea153e6594f9f394fef15193a68aaaea2d843f83e2742717ca753257"},
] ]
[package.dependencies] [package.dependencies]
@ -1680,13 +1682,13 @@ referencing = ">=0.31.0"
[[package]] [[package]]
name = "litellm-proxy-extras" name = "litellm-proxy-extras"
version = "0.1.7" version = "0.1.8"
description = "Additional files for the LiteLLM Proxy. Reduces the size of the main litellm package." description = "Additional files for the LiteLLM Proxy. Reduces the size of the main litellm package."
optional = true optional = true
python-versions = "!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*,>=3.8" python-versions = "!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*,>=3.8"
files = [ files = [
{file = "litellm_proxy_extras-0.1.7-py3-none-any.whl", hash = "sha256:d07eb1b8827127222c671a4c2a1730975d7e403bb334dbdadb264d64c99c479e"}, {file = "litellm_proxy_extras-0.1.8-py3-none-any.whl", hash = "sha256:42f261b66a43bd47a25eee0df547f93e375de208b5cb9da524379626c1632dcb"},
{file = "litellm_proxy_extras-0.1.7.tar.gz", hash = "sha256:d34e4e91edbdac244f51fbfb973fff5a9f23850eff717fbdbdb2af0a9e85ef4a"}, {file = "litellm_proxy_extras-0.1.8.tar.gz", hash = "sha256:81c18b068184b87eb32088afa50358ac7f27a747d446c949291706bfe8158310"},
] ]
[[package]] [[package]]
@ -2180,13 +2182,13 @@ signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"]
[[package]] [[package]]
name = "openai" name = "openai"
version = "1.73.0" version = "1.74.0"
description = "The official Python library for the openai API" description = "The official Python library for the openai API"
optional = false optional = false
python-versions = ">=3.8" python-versions = ">=3.8"
files = [ files = [
{file = "openai-1.73.0-py3-none-any.whl", hash = "sha256:f52d1f673fb4ce6069a40d544a80fcb062eba1b3f489004fac4f9923a074c425"}, {file = "openai-1.74.0-py3-none-any.whl", hash = "sha256:aff3e0f9fb209836382ec112778667027f4fd6ae38bdb2334bc9e173598b092a"},
{file = "openai-1.73.0.tar.gz", hash = "sha256:b58ea39ba589de07db85c9905557ac12d2fc77600dcd2b92a08b99c9a3dce9e0"}, {file = "openai-1.74.0.tar.gz", hash = "sha256:592c25b8747a7cad33a841958f5eb859a785caea9ee22b9e4f4a2ec062236526"},
] ]
[package.dependencies] [package.dependencies]
@ -3326,13 +3328,13 @@ files = [
[[package]] [[package]]
name = "rq" name = "rq"
version = "2.3.1" version = "2.3.2"
description = "RQ is a simple, lightweight, library for creating background jobs, and processing them." description = "RQ is a simple, lightweight, library for creating background jobs, and processing them."
optional = true optional = true
python-versions = ">=3.8" python-versions = ">=3.8"
files = [ files = [
{file = "rq-2.3.1-py3-none-any.whl", hash = "sha256:2bbd48b976fdd840865dcab4bed358eb94b4dd8a02e92add75a346a909c1793d"}, {file = "rq-2.3.2-py3-none-any.whl", hash = "sha256:bf4dc622a7b9d5f7d4a39444f26d89ce6de8a1d6db61b21060612114dbf8d5ff"},
{file = "rq-2.3.1.tar.gz", hash = "sha256:9cb33be7a90c6b36c0d6b9a6524aaf85b8855251ace476d74a076e6dfc5684d6"}, {file = "rq-2.3.2.tar.gz", hash = "sha256:5bd212992724428ec1689736abde783d245e7856bca39d89845884f5d580f5f1"},
] ]
[package.dependencies] [package.dependencies]
@ -4151,4 +4153,4 @@ proxy = ["PyJWT", "apscheduler", "backoff", "boto3", "cryptography", "fastapi",
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = ">=3.8.1,<4.0, !=3.9.7" python-versions = ">=3.8.1,<4.0, !=3.9.7"
content-hash = "35a6b009d763180a0f7e00c95c9dc21bc07f339e5b2f0dd12f14c908cc1dd0df" content-hash = "37dd81eae90a4d984b90067ddf934dcfa1ef61f45476b13af0e3634dfa309051"

View file

@ -55,7 +55,7 @@ websockets = {version = "^13.1.0", optional = true}
boto3 = {version = "1.34.34", optional = true} boto3 = {version = "1.34.34", optional = true}
redisvl = {version = "^0.4.1", optional = true, markers = "python_version >= '3.9' and python_version < '3.14'"} redisvl = {version = "^0.4.1", optional = true, markers = "python_version >= '3.9' and python_version < '3.14'"}
mcp = {version = "1.5.0", optional = true, python = ">=3.10"} mcp = {version = "1.5.0", optional = true, python = ">=3.10"}
litellm-proxy-extras = {version = "0.1.7", optional = true} litellm-proxy-extras = {version = "0.1.8", optional = true}
[tool.poetry.extras] [tool.poetry.extras]
proxy = [ proxy = [

View file

@ -37,7 +37,7 @@ sentry_sdk==2.21.0 # for sentry error handling
detect-secrets==1.5.0 # Enterprise - secret detection / masking in LLM requests detect-secrets==1.5.0 # Enterprise - secret detection / masking in LLM requests
cryptography==43.0.1 cryptography==43.0.1
tzdata==2025.1 # IANA time zone database tzdata==2025.1 # IANA time zone database
litellm-proxy-extras==0.1.7 # for proxy extras - e.g. prisma migrations litellm-proxy-extras==0.1.8 # for proxy extras - e.g. prisma migrations
### LITELLM PACKAGE DEPENDENCIES ### LITELLM PACKAGE DEPENDENCIES
python-dotenv==1.0.0 # for env python-dotenv==1.0.0 # for env
tiktoken==0.8.0 # for calculating usage tiktoken==0.8.0 # for calculating usage

View file

@ -340,6 +340,31 @@ model LiteLLM_DailyUserSpend {
@@index([model]) @@index([model])
} }
// Track daily team spend metrics per model and key
model LiteLLM_DailyTeamSpend {
id String @id @default(uuid())
team_id String
date String
api_key String
model String
model_group String?
custom_llm_provider String?
prompt_tokens Int @default(0)
completion_tokens Int @default(0)
spend Float @default(0.0)
api_requests Int @default(0)
successful_requests Int @default(0)
failed_requests Int @default(0)
created_at DateTime @default(now())
updated_at DateTime @updatedAt
@@unique([team_id, date, api_key, model, custom_llm_provider])
@@index([date])
@@index([team_id])
@@index([api_key])
@@index([model])
}
// Track the status of cron jobs running. Only allow one pod to run the job at a time // Track the status of cron jobs running. Only allow one pod to run the job at a time
model LiteLLM_CronJob { model LiteLLM_CronJob {