mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-24 18:24:20 +00:00
Add aggregate team based usage logging (#10039)
* feat(schema.prisma): initial commit adding aggregate table for team spend allows team spend to be visible at 1m+ logs * feat(db_spend_update_writer.py): support logging aggregate team spend allows usage dashboard to work at 1m+ logs * feat(litellm-proxy-extras/): add new migration file * fix(db_spend_update_writer.py): fix return type * build: bump requirements * fix: fix ruff error
This commit is contained in:
parent
d3e7a137ad
commit
9b77559ccf
15 changed files with 444 additions and 77 deletions
BIN
litellm-proxy-extras/dist/litellm_proxy_extras-0.1.8-py3-none-any.whl
vendored
Normal file
BIN
litellm-proxy-extras/dist/litellm_proxy_extras-0.1.8-py3-none-any.whl
vendored
Normal file
Binary file not shown.
BIN
litellm-proxy-extras/dist/litellm_proxy_extras-0.1.8.tar.gz
vendored
Normal file
BIN
litellm-proxy-extras/dist/litellm_proxy_extras-0.1.8.tar.gz
vendored
Normal file
Binary file not shown.
|
@ -0,0 +1,36 @@
|
|||
-- CreateTable
|
||||
CREATE TABLE "LiteLLM_DailyTeamSpend" (
|
||||
"id" TEXT NOT NULL,
|
||||
"team_id" TEXT NOT NULL,
|
||||
"date" TEXT NOT NULL,
|
||||
"api_key" TEXT NOT NULL,
|
||||
"model" TEXT NOT NULL,
|
||||
"model_group" TEXT,
|
||||
"custom_llm_provider" TEXT,
|
||||
"prompt_tokens" INTEGER NOT NULL DEFAULT 0,
|
||||
"completion_tokens" INTEGER NOT NULL DEFAULT 0,
|
||||
"spend" DOUBLE PRECISION NOT NULL DEFAULT 0.0,
|
||||
"api_requests" INTEGER NOT NULL DEFAULT 0,
|
||||
"successful_requests" INTEGER NOT NULL DEFAULT 0,
|
||||
"failed_requests" INTEGER NOT NULL DEFAULT 0,
|
||||
"created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"updated_at" TIMESTAMP(3) NOT NULL,
|
||||
|
||||
CONSTRAINT "LiteLLM_DailyTeamSpend_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "LiteLLM_DailyTeamSpend_date_idx" ON "LiteLLM_DailyTeamSpend"("date");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "LiteLLM_DailyTeamSpend_team_id_idx" ON "LiteLLM_DailyTeamSpend"("team_id");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "LiteLLM_DailyTeamSpend_api_key_idx" ON "LiteLLM_DailyTeamSpend"("api_key");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "LiteLLM_DailyTeamSpend_model_idx" ON "LiteLLM_DailyTeamSpend"("model");
|
||||
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "LiteLLM_DailyTeamSpend_team_id_date_api_key_model_custom_ll_key" ON "LiteLLM_DailyTeamSpend"("team_id", "date", "api_key", "model", "custom_llm_provider");
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
[tool.poetry]
|
||||
name = "litellm-proxy-extras"
|
||||
version = "0.1.7"
|
||||
version = "0.1.8"
|
||||
description = "Additional files for the LiteLLM Proxy. Reduces the size of the main litellm package."
|
||||
authors = ["BerriAI"]
|
||||
readme = "README.md"
|
||||
|
@ -22,7 +22,7 @@ requires = ["poetry-core"]
|
|||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.commitizen]
|
||||
version = "0.1.7"
|
||||
version = "0.1.8"
|
||||
version_files = [
|
||||
"pyproject.toml:version",
|
||||
"../requirements.txt:litellm-proxy-extras==",
|
||||
|
|
|
@ -24,6 +24,7 @@ SINGLE_DEPLOYMENT_TRAFFIC_FAILURE_THRESHOLD = 1000 # Minimum number of requests
|
|||
########### v2 Architecture constants for managing writing updates to the database ###########
|
||||
REDIS_UPDATE_BUFFER_KEY = "litellm_spend_update_buffer"
|
||||
REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_spend_update_buffer"
|
||||
REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_team_spend_update_buffer"
|
||||
MAX_REDIS_BUFFER_DEQUEUE_COUNT = 100
|
||||
MAX_SIZE_IN_MEMORY_QUEUE = 10000
|
||||
MAX_IN_MEMORY_QUEUE_FLUSH_COUNT = 1000
|
||||
|
|
|
@ -2769,8 +2769,7 @@ class DefaultInternalUserParams(LiteLLMPydanticObjectBase):
|
|||
)
|
||||
|
||||
|
||||
class DailyUserSpendTransaction(TypedDict):
|
||||
user_id: str
|
||||
class BaseDailySpendTransaction(TypedDict):
|
||||
date: str
|
||||
api_key: str
|
||||
model: str
|
||||
|
@ -2784,6 +2783,14 @@ class DailyUserSpendTransaction(TypedDict):
|
|||
failed_requests: int
|
||||
|
||||
|
||||
class DailyTeamSpendTransaction(BaseDailySpendTransaction):
|
||||
team_id: str
|
||||
|
||||
|
||||
class DailyUserSpendTransaction(BaseDailySpendTransaction):
|
||||
user_id: str
|
||||
|
||||
|
||||
class DBSpendUpdateTransactions(TypedDict):
|
||||
"""
|
||||
Internal Data Structure for buffering spend updates in Redis or in memory before committing them to the database
|
||||
|
|
|
@ -10,7 +10,7 @@ import os
|
|||
import time
|
||||
import traceback
|
||||
from datetime import datetime, timedelta
|
||||
from typing import TYPE_CHECKING, Any, Dict, Optional, Union
|
||||
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union, cast
|
||||
|
||||
import litellm
|
||||
from litellm._logging import verbose_proxy_logger
|
||||
|
@ -18,6 +18,8 @@ from litellm.caching import DualCache, RedisCache
|
|||
from litellm.constants import DB_SPEND_UPDATE_JOB_NAME
|
||||
from litellm.proxy._types import (
|
||||
DB_CONNECTION_ERROR_TYPES,
|
||||
BaseDailySpendTransaction,
|
||||
DailyTeamSpendTransaction,
|
||||
DailyUserSpendTransaction,
|
||||
DBSpendUpdateTransactions,
|
||||
Litellm_EntityType,
|
||||
|
@ -56,6 +58,7 @@ class DBSpendUpdateWriter:
|
|||
self.pod_lock_manager = PodLockManager()
|
||||
self.spend_update_queue = SpendUpdateQueue()
|
||||
self.daily_spend_update_queue = DailySpendUpdateQueue()
|
||||
self.daily_team_spend_update_queue = DailySpendUpdateQueue()
|
||||
|
||||
async def update_database(
|
||||
# LiteLLM management object fields
|
||||
|
@ -158,6 +161,13 @@ class DBSpendUpdateWriter:
|
|||
)
|
||||
)
|
||||
|
||||
asyncio.create_task(
|
||||
self.add_spend_log_transaction_to_daily_team_transaction(
|
||||
payload=payload,
|
||||
prisma_client=prisma_client,
|
||||
)
|
||||
)
|
||||
|
||||
verbose_proxy_logger.debug("Runs spend update on all tables")
|
||||
except Exception:
|
||||
verbose_proxy_logger.debug(
|
||||
|
@ -381,6 +391,7 @@ class DBSpendUpdateWriter:
|
|||
await self.redis_update_buffer.store_in_memory_spend_updates_in_redis(
|
||||
spend_update_queue=self.spend_update_queue,
|
||||
daily_spend_update_queue=self.daily_spend_update_queue,
|
||||
daily_team_spend_update_queue=self.daily_team_spend_update_queue,
|
||||
)
|
||||
|
||||
# Only commit from redis to db if this pod is the leader
|
||||
|
@ -411,6 +422,16 @@ class DBSpendUpdateWriter:
|
|||
proxy_logging_obj=proxy_logging_obj,
|
||||
daily_spend_transactions=daily_spend_update_transactions,
|
||||
)
|
||||
daily_team_spend_update_transactions = (
|
||||
await self.redis_update_buffer.get_all_daily_team_spend_update_transactions_from_redis_buffer()
|
||||
)
|
||||
if daily_team_spend_update_transactions is not None:
|
||||
await DBSpendUpdateWriter.update_daily_team_spend(
|
||||
n_retry_times=n_retry_times,
|
||||
prisma_client=prisma_client,
|
||||
proxy_logging_obj=proxy_logging_obj,
|
||||
daily_spend_transactions=daily_team_spend_update_transactions,
|
||||
)
|
||||
except Exception as e:
|
||||
verbose_proxy_logger.error(f"Error committing spend updates: {e}")
|
||||
finally:
|
||||
|
@ -446,8 +467,9 @@ class DBSpendUpdateWriter:
|
|||
|
||||
################## Daily Spend Update Transactions ##################
|
||||
# Aggregate all in memory daily spend transactions and commit to db
|
||||
daily_spend_update_transactions = (
|
||||
await self.daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
|
||||
daily_spend_update_transactions = cast(
|
||||
Dict[str, DailyUserSpendTransaction],
|
||||
await self.daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions(),
|
||||
)
|
||||
|
||||
await DBSpendUpdateWriter.update_daily_user_spend(
|
||||
|
@ -457,6 +479,20 @@ class DBSpendUpdateWriter:
|
|||
daily_spend_transactions=daily_spend_update_transactions,
|
||||
)
|
||||
|
||||
################## Daily Team Spend Update Transactions ##################
|
||||
# Aggregate all in memory daily team spend transactions and commit to db
|
||||
daily_team_spend_update_transactions = cast(
|
||||
Dict[str, DailyTeamSpendTransaction],
|
||||
await self.daily_team_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions(),
|
||||
)
|
||||
|
||||
await DBSpendUpdateWriter.update_daily_team_spend(
|
||||
n_retry_times=n_retry_times,
|
||||
prisma_client=prisma_client,
|
||||
proxy_logging_obj=proxy_logging_obj,
|
||||
daily_spend_transactions=daily_team_spend_update_transactions,
|
||||
)
|
||||
|
||||
async def _commit_spend_updates_to_db( # noqa: PLR0915
|
||||
self,
|
||||
prisma_client: PrismaClient,
|
||||
|
@ -835,6 +871,187 @@ class DBSpendUpdateWriter:
|
|||
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
async def update_daily_team_spend(
|
||||
n_retry_times: int,
|
||||
prisma_client: PrismaClient,
|
||||
proxy_logging_obj: ProxyLogging,
|
||||
daily_spend_transactions: Dict[str, DailyTeamSpendTransaction],
|
||||
):
|
||||
"""
|
||||
Batch job to update LiteLLM_DailyTeamSpend table using in-memory daily_spend_transactions
|
||||
"""
|
||||
from litellm.proxy.utils import _raise_failed_update_spend_exception
|
||||
|
||||
### UPDATE DAILY USER SPEND ###
|
||||
verbose_proxy_logger.debug(
|
||||
"Daily Team Spend transactions: {}".format(len(daily_spend_transactions))
|
||||
)
|
||||
BATCH_SIZE = (
|
||||
100 # Number of aggregated records to update in each database operation
|
||||
)
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
for i in range(n_retry_times + 1):
|
||||
try:
|
||||
# Get transactions to process
|
||||
transactions_to_process = dict(
|
||||
list(daily_spend_transactions.items())[:BATCH_SIZE]
|
||||
)
|
||||
|
||||
if len(transactions_to_process) == 0:
|
||||
verbose_proxy_logger.debug(
|
||||
"No new transactions to process for daily spend update"
|
||||
)
|
||||
break
|
||||
|
||||
# Update DailyUserSpend table in batches
|
||||
async with prisma_client.db.batch_() as batcher:
|
||||
for _, transaction in transactions_to_process.items():
|
||||
team_id = transaction.get("team_id")
|
||||
if not team_id: # Skip if no team_id
|
||||
continue
|
||||
|
||||
batcher.litellm_dailyteamspend.upsert(
|
||||
where={
|
||||
"team_id_date_api_key_model_custom_llm_provider": {
|
||||
"team_id": team_id,
|
||||
"date": transaction["date"],
|
||||
"api_key": transaction["api_key"],
|
||||
"model": transaction["model"],
|
||||
"custom_llm_provider": transaction.get(
|
||||
"custom_llm_provider"
|
||||
),
|
||||
}
|
||||
},
|
||||
data={
|
||||
"create": {
|
||||
"team_id": team_id,
|
||||
"date": transaction["date"],
|
||||
"api_key": transaction["api_key"],
|
||||
"model": transaction["model"],
|
||||
"model_group": transaction.get("model_group"),
|
||||
"custom_llm_provider": transaction.get(
|
||||
"custom_llm_provider"
|
||||
),
|
||||
"prompt_tokens": transaction["prompt_tokens"],
|
||||
"completion_tokens": transaction[
|
||||
"completion_tokens"
|
||||
],
|
||||
"spend": transaction["spend"],
|
||||
"api_requests": transaction["api_requests"],
|
||||
"successful_requests": transaction[
|
||||
"successful_requests"
|
||||
],
|
||||
"failed_requests": transaction[
|
||||
"failed_requests"
|
||||
],
|
||||
},
|
||||
"update": {
|
||||
"prompt_tokens": {
|
||||
"increment": transaction["prompt_tokens"]
|
||||
},
|
||||
"completion_tokens": {
|
||||
"increment": transaction[
|
||||
"completion_tokens"
|
||||
]
|
||||
},
|
||||
"spend": {"increment": transaction["spend"]},
|
||||
"api_requests": {
|
||||
"increment": transaction["api_requests"]
|
||||
},
|
||||
"successful_requests": {
|
||||
"increment": transaction[
|
||||
"successful_requests"
|
||||
]
|
||||
},
|
||||
"failed_requests": {
|
||||
"increment": transaction["failed_requests"]
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
verbose_proxy_logger.info(
|
||||
f"Processed {len(transactions_to_process)} daily team transactions in {time.time() - start_time:.2f}s"
|
||||
)
|
||||
|
||||
# Remove processed transactions
|
||||
for key in transactions_to_process.keys():
|
||||
daily_spend_transactions.pop(key, None)
|
||||
|
||||
verbose_proxy_logger.debug(
|
||||
f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s"
|
||||
)
|
||||
break
|
||||
|
||||
except DB_CONNECTION_ERROR_TYPES as e:
|
||||
if i >= n_retry_times:
|
||||
_raise_failed_update_spend_exception(
|
||||
e=e,
|
||||
start_time=start_time,
|
||||
proxy_logging_obj=proxy_logging_obj,
|
||||
)
|
||||
await asyncio.sleep(2**i) # Exponential backoff
|
||||
|
||||
except Exception as e:
|
||||
# Remove processed transactions even if there was an error
|
||||
if "transactions_to_process" in locals():
|
||||
for key in transactions_to_process.keys(): # type: ignore
|
||||
daily_spend_transactions.pop(key, None)
|
||||
_raise_failed_update_spend_exception(
|
||||
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
|
||||
)
|
||||
|
||||
async def _common_add_spend_log_transaction_to_daily_transaction(
|
||||
self,
|
||||
payload: Union[dict, SpendLogsPayload],
|
||||
prisma_client: PrismaClient,
|
||||
type: Literal["user", "team"] = "user",
|
||||
) -> Optional[BaseDailySpendTransaction]:
|
||||
common_expected_keys = ["startTime", "api_key", "model", "custom_llm_provider"]
|
||||
if type == "user":
|
||||
expected_keys = ["user", *common_expected_keys]
|
||||
else:
|
||||
expected_keys = ["team_id", *common_expected_keys]
|
||||
|
||||
if not all(key in payload for key in expected_keys):
|
||||
verbose_proxy_logger.debug(
|
||||
f"Missing expected keys: {expected_keys}, in payload, skipping from daily_user_spend_transactions"
|
||||
)
|
||||
return None
|
||||
|
||||
request_status = prisma_client.get_request_status(payload)
|
||||
verbose_proxy_logger.info(f"Logged request status: {request_status}")
|
||||
if isinstance(payload["startTime"], datetime):
|
||||
start_time = payload["startTime"].isoformat()
|
||||
date = start_time.split("T")[0]
|
||||
elif isinstance(payload["startTime"], str):
|
||||
date = payload["startTime"].split("T")[0]
|
||||
else:
|
||||
verbose_proxy_logger.debug(
|
||||
f"Invalid start time: {payload['startTime']}, skipping from daily_user_spend_transactions"
|
||||
)
|
||||
return None
|
||||
try:
|
||||
daily_transaction = BaseDailySpendTransaction(
|
||||
date=date,
|
||||
api_key=payload["api_key"],
|
||||
model=payload["model"],
|
||||
model_group=payload["model_group"],
|
||||
custom_llm_provider=payload["custom_llm_provider"],
|
||||
prompt_tokens=payload["prompt_tokens"],
|
||||
completion_tokens=payload["completion_tokens"],
|
||||
spend=payload["spend"],
|
||||
api_requests=1,
|
||||
successful_requests=1 if request_status == "success" else 0,
|
||||
failed_requests=1 if request_status != "success" else 0,
|
||||
)
|
||||
return daily_transaction
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
async def add_spend_log_transaction_to_daily_user_transaction(
|
||||
self,
|
||||
payload: Union[dict, SpendLogsPayload],
|
||||
|
@ -852,46 +1069,51 @@ class DBSpendUpdateWriter:
|
|||
"prisma_client is None. Skipping writing spend logs to db."
|
||||
)
|
||||
return
|
||||
expected_keys = ["user", "startTime", "api_key", "model", "custom_llm_provider"]
|
||||
|
||||
if not all(key in payload for key in expected_keys):
|
||||
base_daily_transaction = (
|
||||
await self._common_add_spend_log_transaction_to_daily_transaction(
|
||||
payload, prisma_client, "user"
|
||||
)
|
||||
)
|
||||
if base_daily_transaction is None:
|
||||
return
|
||||
|
||||
daily_transaction_key = f"{payload['user']}_{base_daily_transaction['date']}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}"
|
||||
daily_transaction = DailyUserSpendTransaction(
|
||||
user_id=payload["user"], **base_daily_transaction
|
||||
)
|
||||
await self.daily_spend_update_queue.add_update(
|
||||
update={daily_transaction_key: daily_transaction}
|
||||
)
|
||||
|
||||
async def add_spend_log_transaction_to_daily_team_transaction(
|
||||
self,
|
||||
payload: SpendLogsPayload,
|
||||
prisma_client: Optional[PrismaClient] = None,
|
||||
) -> None:
|
||||
if prisma_client is None:
|
||||
verbose_proxy_logger.debug(
|
||||
f"Missing expected keys: {expected_keys}, in payload, skipping from daily_user_spend_transactions"
|
||||
"prisma_client is None. Skipping writing spend logs to db."
|
||||
)
|
||||
return
|
||||
|
||||
request_status = prisma_client.get_request_status(payload)
|
||||
verbose_proxy_logger.info(f"Logged request status: {request_status}")
|
||||
if isinstance(payload["startTime"], datetime):
|
||||
start_time = payload["startTime"].isoformat()
|
||||
date = start_time.split("T")[0]
|
||||
elif isinstance(payload["startTime"], str):
|
||||
date = payload["startTime"].split("T")[0]
|
||||
else:
|
||||
base_daily_transaction = (
|
||||
await self._common_add_spend_log_transaction_to_daily_transaction(
|
||||
payload, prisma_client, "team"
|
||||
)
|
||||
)
|
||||
if base_daily_transaction is None:
|
||||
return
|
||||
if payload["team_id"] is None:
|
||||
verbose_proxy_logger.debug(
|
||||
f"Invalid start time: {payload['startTime']}, skipping from daily_user_spend_transactions"
|
||||
"team_id is None for request. Skipping incrementing team spend."
|
||||
)
|
||||
return
|
||||
try:
|
||||
daily_transaction_key = f"{payload['user']}_{date}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}"
|
||||
daily_transaction = DailyUserSpendTransaction(
|
||||
user_id=payload["user"],
|
||||
date=date,
|
||||
api_key=payload["api_key"],
|
||||
model=payload["model"],
|
||||
model_group=payload["model_group"],
|
||||
custom_llm_provider=payload["custom_llm_provider"],
|
||||
prompt_tokens=payload["prompt_tokens"],
|
||||
completion_tokens=payload["completion_tokens"],
|
||||
spend=payload["spend"],
|
||||
api_requests=1,
|
||||
successful_requests=1 if request_status == "success" else 0,
|
||||
failed_requests=1 if request_status != "success" else 0,
|
||||
)
|
||||
|
||||
await self.daily_spend_update_queue.add_update(
|
||||
update={daily_transaction_key: daily_transaction}
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
raise e
|
||||
daily_transaction_key = f"{payload['team_id']}_{base_daily_transaction['date']}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}"
|
||||
daily_transaction = DailyTeamSpendTransaction(
|
||||
team_id=payload["team_id"], **base_daily_transaction
|
||||
)
|
||||
await self.daily_team_spend_update_queue.add_update(
|
||||
update={daily_transaction_key: daily_transaction}
|
||||
)
|
||||
|
|
|
@ -3,7 +3,7 @@ from copy import deepcopy
|
|||
from typing import Dict, List, Optional
|
||||
|
||||
from litellm._logging import verbose_proxy_logger
|
||||
from litellm.proxy._types import DailyUserSpendTransaction
|
||||
from litellm.proxy._types import BaseDailySpendTransaction
|
||||
from litellm.proxy.db.db_transaction_queue.base_update_queue import (
|
||||
BaseUpdateQueue,
|
||||
service_logger_obj,
|
||||
|
@ -54,10 +54,10 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
|
|||
def __init__(self):
|
||||
super().__init__()
|
||||
self.update_queue: asyncio.Queue[
|
||||
Dict[str, DailyUserSpendTransaction]
|
||||
Dict[str, BaseDailySpendTransaction]
|
||||
] = asyncio.Queue()
|
||||
|
||||
async def add_update(self, update: Dict[str, DailyUserSpendTransaction]):
|
||||
async def add_update(self, update: Dict[str, BaseDailySpendTransaction]):
|
||||
"""Enqueue an update."""
|
||||
verbose_proxy_logger.debug("Adding update to queue: %s", update)
|
||||
await self.update_queue.put(update)
|
||||
|
@ -73,7 +73,7 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
|
|||
This is used to reduce the size of the in-memory queue.
|
||||
"""
|
||||
updates: List[
|
||||
Dict[str, DailyUserSpendTransaction]
|
||||
Dict[str, BaseDailySpendTransaction]
|
||||
] = await self.flush_all_updates_from_in_memory_queue()
|
||||
aggregated_updates = self.get_aggregated_daily_spend_update_transactions(
|
||||
updates
|
||||
|
@ -82,8 +82,8 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
|
|||
|
||||
async def flush_and_get_aggregated_daily_spend_update_transactions(
|
||||
self,
|
||||
) -> Dict[str, DailyUserSpendTransaction]:
|
||||
"""Get all updates from the queue and return all updates aggregated by daily_transaction_key."""
|
||||
) -> Dict[str, BaseDailySpendTransaction]:
|
||||
"""Get all updates from the queue and return all updates aggregated by daily_transaction_key. Works for both user and team spend updates."""
|
||||
updates = await self.flush_all_updates_from_in_memory_queue()
|
||||
aggregated_daily_spend_update_transactions = (
|
||||
DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
|
||||
|
@ -98,11 +98,11 @@ class DailySpendUpdateQueue(BaseUpdateQueue):
|
|||
|
||||
@staticmethod
|
||||
def get_aggregated_daily_spend_update_transactions(
|
||||
updates: List[Dict[str, DailyUserSpendTransaction]]
|
||||
) -> Dict[str, DailyUserSpendTransaction]:
|
||||
updates: List[Dict[str, BaseDailySpendTransaction]]
|
||||
) -> Dict[str, BaseDailySpendTransaction]:
|
||||
"""Aggregate updates by daily_transaction_key."""
|
||||
aggregated_daily_spend_update_transactions: Dict[
|
||||
str, DailyUserSpendTransaction
|
||||
str, BaseDailySpendTransaction
|
||||
] = {}
|
||||
for _update in updates:
|
||||
for _key, payload in _update.items():
|
||||
|
|
|
@ -6,17 +6,22 @@ This is to prevent deadlocks and improve reliability
|
|||
|
||||
import asyncio
|
||||
import json
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast
|
||||
|
||||
from litellm._logging import verbose_proxy_logger
|
||||
from litellm.caching import RedisCache
|
||||
from litellm.constants import (
|
||||
MAX_REDIS_BUFFER_DEQUEUE_COUNT,
|
||||
REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY,
|
||||
REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY,
|
||||
REDIS_UPDATE_BUFFER_KEY,
|
||||
)
|
||||
from litellm.litellm_core_utils.safe_json_dumps import safe_dumps
|
||||
from litellm.proxy._types import DailyUserSpendTransaction, DBSpendUpdateTransactions
|
||||
from litellm.proxy._types import (
|
||||
DailyTeamSpendTransaction,
|
||||
DailyUserSpendTransaction,
|
||||
DBSpendUpdateTransactions,
|
||||
)
|
||||
from litellm.proxy.db.db_transaction_queue.base_update_queue import service_logger_obj
|
||||
from litellm.proxy.db.db_transaction_queue.daily_spend_update_queue import (
|
||||
DailySpendUpdateQueue,
|
||||
|
@ -67,6 +72,7 @@ class RedisUpdateBuffer:
|
|||
self,
|
||||
spend_update_queue: SpendUpdateQueue,
|
||||
daily_spend_update_queue: DailySpendUpdateQueue,
|
||||
daily_team_spend_update_queue: DailySpendUpdateQueue,
|
||||
):
|
||||
"""
|
||||
Stores the in-memory spend updates to Redis
|
||||
|
@ -127,6 +133,9 @@ class RedisUpdateBuffer:
|
|||
daily_spend_update_transactions = (
|
||||
await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
|
||||
)
|
||||
daily_team_spend_update_transactions = (
|
||||
await daily_team_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
|
||||
)
|
||||
verbose_proxy_logger.debug(
|
||||
"ALL DAILY SPEND UPDATE TRANSACTIONS: %s", daily_spend_update_transactions
|
||||
)
|
||||
|
@ -161,6 +170,19 @@ class RedisUpdateBuffer:
|
|||
service=ServiceTypes.REDIS_DAILY_SPEND_UPDATE_QUEUE,
|
||||
)
|
||||
|
||||
list_of_daily_team_spend_update_transactions = [
|
||||
safe_dumps(daily_team_spend_update_transactions)
|
||||
]
|
||||
|
||||
current_redis_buffer_size = await self.redis_cache.async_rpush(
|
||||
key=REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY,
|
||||
values=list_of_daily_team_spend_update_transactions,
|
||||
)
|
||||
await self._emit_new_item_added_to_redis_buffer_event(
|
||||
queue_size=current_redis_buffer_size,
|
||||
service=ServiceTypes.REDIS_DAILY_TEAM_SPEND_UPDATE_QUEUE,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _number_of_transactions_to_store_in_redis(
|
||||
db_spend_update_transactions: DBSpendUpdateTransactions,
|
||||
|
@ -258,8 +280,35 @@ class RedisUpdateBuffer:
|
|||
list_of_daily_spend_update_transactions = [
|
||||
json.loads(transaction) for transaction in list_of_transactions
|
||||
]
|
||||
return DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
|
||||
list_of_daily_spend_update_transactions
|
||||
return cast(
|
||||
Dict[str, DailyUserSpendTransaction],
|
||||
DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
|
||||
list_of_daily_spend_update_transactions
|
||||
),
|
||||
)
|
||||
|
||||
async def get_all_daily_team_spend_update_transactions_from_redis_buffer(
|
||||
self,
|
||||
) -> Optional[Dict[str, DailyTeamSpendTransaction]]:
|
||||
"""
|
||||
Gets all the daily team spend update transactions from Redis
|
||||
"""
|
||||
if self.redis_cache is None:
|
||||
return None
|
||||
list_of_transactions = await self.redis_cache.async_lpop(
|
||||
key=REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY,
|
||||
count=MAX_REDIS_BUFFER_DEQUEUE_COUNT,
|
||||
)
|
||||
if list_of_transactions is None:
|
||||
return None
|
||||
list_of_daily_spend_update_transactions = [
|
||||
json.loads(transaction) for transaction in list_of_transactions
|
||||
]
|
||||
return cast(
|
||||
Dict[str, DailyTeamSpendTransaction],
|
||||
DailySpendUpdateQueue.get_aggregated_daily_spend_update_transactions(
|
||||
list_of_daily_spend_update_transactions
|
||||
),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
|
|
|
@ -340,6 +340,31 @@ model LiteLLM_DailyUserSpend {
|
|||
@@index([model])
|
||||
}
|
||||
|
||||
// Track daily team spend metrics per model and key
|
||||
model LiteLLM_DailyTeamSpend {
|
||||
id String @id @default(uuid())
|
||||
team_id String
|
||||
date String
|
||||
api_key String
|
||||
model String
|
||||
model_group String?
|
||||
custom_llm_provider String?
|
||||
prompt_tokens Int @default(0)
|
||||
completion_tokens Int @default(0)
|
||||
spend Float @default(0.0)
|
||||
api_requests Int @default(0)
|
||||
successful_requests Int @default(0)
|
||||
failed_requests Int @default(0)
|
||||
created_at DateTime @default(now())
|
||||
updated_at DateTime @updatedAt
|
||||
|
||||
@@unique([team_id, date, api_key, model, custom_llm_provider])
|
||||
@@index([date])
|
||||
@@index([team_id])
|
||||
@@index([api_key])
|
||||
@@index([model])
|
||||
}
|
||||
|
||||
|
||||
// Track the status of cron jobs running. Only allow one pod to run the job at a time
|
||||
model LiteLLM_CronJob {
|
||||
|
|
|
@ -33,7 +33,7 @@ class ServiceTypes(str, enum.Enum):
|
|||
# daily spend update queue - actual transaction events
|
||||
IN_MEMORY_DAILY_SPEND_UPDATE_QUEUE = "in_memory_daily_spend_update_queue"
|
||||
REDIS_DAILY_SPEND_UPDATE_QUEUE = "redis_daily_spend_update_queue"
|
||||
|
||||
REDIS_DAILY_TEAM_SPEND_UPDATE_QUEUE = "redis_daily_team_spend_update_queue"
|
||||
# spend update queue - current spend of key, user, team
|
||||
IN_MEMORY_SPEND_UPDATE_QUEUE = "in_memory_spend_update_queue"
|
||||
REDIS_SPEND_UPDATE_QUEUE = "redis_spend_update_queue"
|
||||
|
|
42
poetry.lock
generated
42
poetry.lock
generated
|
@ -1011,13 +1011,13 @@ grpcio-gcp = ["grpcio-gcp (>=0.2.2,<1.0.dev0)"]
|
|||
|
||||
[[package]]
|
||||
name = "google-auth"
|
||||
version = "2.38.0"
|
||||
version = "2.39.0"
|
||||
description = "Google Authentication Library"
|
||||
optional = true
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "google_auth-2.38.0-py2.py3-none-any.whl", hash = "sha256:e7dae6694313f434a2727bf2906f27ad259bae090d7aa896590d86feec3d9d4a"},
|
||||
{file = "google_auth-2.38.0.tar.gz", hash = "sha256:8285113607d3b80a3f1543b75962447ba8a09fe85783432a784fdeef6ac094c4"},
|
||||
{file = "google_auth-2.39.0-py2.py3-none-any.whl", hash = "sha256:0150b6711e97fb9f52fe599f55648950cc4540015565d8fbb31be2ad6e1548a2"},
|
||||
{file = "google_auth-2.39.0.tar.gz", hash = "sha256:73222d43cdc35a3aeacbfdcaf73142a97839f10de930550d89ebfe1d0a00cde7"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
|
@ -1026,12 +1026,14 @@ pyasn1-modules = ">=0.2.1"
|
|||
rsa = ">=3.1.4,<5"
|
||||
|
||||
[package.extras]
|
||||
aiohttp = ["aiohttp (>=3.6.2,<4.0.0.dev0)", "requests (>=2.20.0,<3.0.0.dev0)"]
|
||||
aiohttp = ["aiohttp (>=3.6.2,<4.0.0)", "requests (>=2.20.0,<3.0.0)"]
|
||||
enterprise-cert = ["cryptography", "pyopenssl"]
|
||||
pyjwt = ["cryptography (>=38.0.3)", "pyjwt (>=2.0)"]
|
||||
pyopenssl = ["cryptography (>=38.0.3)", "pyopenssl (>=20.0.0)"]
|
||||
pyjwt = ["cryptography (<39.0.0)", "cryptography (>=38.0.3)", "pyjwt (>=2.0)"]
|
||||
pyopenssl = ["cryptography (<39.0.0)", "cryptography (>=38.0.3)", "pyopenssl (>=20.0.0)"]
|
||||
reauth = ["pyu2f (>=0.1.5)"]
|
||||
requests = ["requests (>=2.20.0,<3.0.0.dev0)"]
|
||||
requests = ["requests (>=2.20.0,<3.0.0)"]
|
||||
testing = ["aiohttp (<3.10.0)", "aiohttp (>=3.6.2,<4.0.0)", "aioresponses", "cryptography (<39.0.0)", "cryptography (>=38.0.3)", "flask", "freezegun", "grpcio", "mock", "oauth2client", "packaging", "pyjwt (>=2.0)", "pyopenssl (<24.3.0)", "pyopenssl (>=20.0.0)", "pytest", "pytest-asyncio", "pytest-cov", "pytest-localserver", "pyu2f (>=0.1.5)", "requests (>=2.20.0,<3.0.0)", "responses", "urllib3"]
|
||||
urllib3 = ["packaging", "urllib3"]
|
||||
|
||||
[[package]]
|
||||
name = "google-cloud-kms"
|
||||
|
@ -1053,13 +1055,13 @@ protobuf = ">=3.20.2,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4
|
|||
|
||||
[[package]]
|
||||
name = "googleapis-common-protos"
|
||||
version = "1.69.2"
|
||||
version = "1.70.0"
|
||||
description = "Common protobufs used in Google APIs"
|
||||
optional = true
|
||||
python-versions = ">=3.7"
|
||||
files = [
|
||||
{file = "googleapis_common_protos-1.69.2-py3-none-any.whl", hash = "sha256:0b30452ff9c7a27d80bfc5718954063e8ab53dd3697093d3bc99581f5fd24212"},
|
||||
{file = "googleapis_common_protos-1.69.2.tar.gz", hash = "sha256:3e1b904a27a33c821b4b749fd31d334c0c9c30e6113023d495e48979a3dc9c5f"},
|
||||
{file = "googleapis_common_protos-1.70.0-py3-none-any.whl", hash = "sha256:b8bfcca8c25a2bb253e0e0b0adaf8c00773e5e6af6fd92397576680b807e0fd8"},
|
||||
{file = "googleapis_common_protos-1.70.0.tar.gz", hash = "sha256:0e1b44e0ea153e6594f9f394fef15193a68aaaea2d843f83e2742717ca753257"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
|
@ -1680,13 +1682,13 @@ referencing = ">=0.31.0"
|
|||
|
||||
[[package]]
|
||||
name = "litellm-proxy-extras"
|
||||
version = "0.1.7"
|
||||
version = "0.1.8"
|
||||
description = "Additional files for the LiteLLM Proxy. Reduces the size of the main litellm package."
|
||||
optional = true
|
||||
python-versions = "!=2.7.*,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,!=3.7.*,>=3.8"
|
||||
files = [
|
||||
{file = "litellm_proxy_extras-0.1.7-py3-none-any.whl", hash = "sha256:d07eb1b8827127222c671a4c2a1730975d7e403bb334dbdadb264d64c99c479e"},
|
||||
{file = "litellm_proxy_extras-0.1.7.tar.gz", hash = "sha256:d34e4e91edbdac244f51fbfb973fff5a9f23850eff717fbdbdb2af0a9e85ef4a"},
|
||||
{file = "litellm_proxy_extras-0.1.8-py3-none-any.whl", hash = "sha256:42f261b66a43bd47a25eee0df547f93e375de208b5cb9da524379626c1632dcb"},
|
||||
{file = "litellm_proxy_extras-0.1.8.tar.gz", hash = "sha256:81c18b068184b87eb32088afa50358ac7f27a747d446c949291706bfe8158310"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -2180,13 +2182,13 @@ signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"]
|
|||
|
||||
[[package]]
|
||||
name = "openai"
|
||||
version = "1.73.0"
|
||||
version = "1.74.0"
|
||||
description = "The official Python library for the openai API"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "openai-1.73.0-py3-none-any.whl", hash = "sha256:f52d1f673fb4ce6069a40d544a80fcb062eba1b3f489004fac4f9923a074c425"},
|
||||
{file = "openai-1.73.0.tar.gz", hash = "sha256:b58ea39ba589de07db85c9905557ac12d2fc77600dcd2b92a08b99c9a3dce9e0"},
|
||||
{file = "openai-1.74.0-py3-none-any.whl", hash = "sha256:aff3e0f9fb209836382ec112778667027f4fd6ae38bdb2334bc9e173598b092a"},
|
||||
{file = "openai-1.74.0.tar.gz", hash = "sha256:592c25b8747a7cad33a841958f5eb859a785caea9ee22b9e4f4a2ec062236526"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
|
@ -3326,13 +3328,13 @@ files = [
|
|||
|
||||
[[package]]
|
||||
name = "rq"
|
||||
version = "2.3.1"
|
||||
version = "2.3.2"
|
||||
description = "RQ is a simple, lightweight, library for creating background jobs, and processing them."
|
||||
optional = true
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "rq-2.3.1-py3-none-any.whl", hash = "sha256:2bbd48b976fdd840865dcab4bed358eb94b4dd8a02e92add75a346a909c1793d"},
|
||||
{file = "rq-2.3.1.tar.gz", hash = "sha256:9cb33be7a90c6b36c0d6b9a6524aaf85b8855251ace476d74a076e6dfc5684d6"},
|
||||
{file = "rq-2.3.2-py3-none-any.whl", hash = "sha256:bf4dc622a7b9d5f7d4a39444f26d89ce6de8a1d6db61b21060612114dbf8d5ff"},
|
||||
{file = "rq-2.3.2.tar.gz", hash = "sha256:5bd212992724428ec1689736abde783d245e7856bca39d89845884f5d580f5f1"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
|
@ -4151,4 +4153,4 @@ proxy = ["PyJWT", "apscheduler", "backoff", "boto3", "cryptography", "fastapi",
|
|||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = ">=3.8.1,<4.0, !=3.9.7"
|
||||
content-hash = "35a6b009d763180a0f7e00c95c9dc21bc07f339e5b2f0dd12f14c908cc1dd0df"
|
||||
content-hash = "37dd81eae90a4d984b90067ddf934dcfa1ef61f45476b13af0e3634dfa309051"
|
||||
|
|
|
@ -55,7 +55,7 @@ websockets = {version = "^13.1.0", optional = true}
|
|||
boto3 = {version = "1.34.34", optional = true}
|
||||
redisvl = {version = "^0.4.1", optional = true, markers = "python_version >= '3.9' and python_version < '3.14'"}
|
||||
mcp = {version = "1.5.0", optional = true, python = ">=3.10"}
|
||||
litellm-proxy-extras = {version = "0.1.7", optional = true}
|
||||
litellm-proxy-extras = {version = "0.1.8", optional = true}
|
||||
|
||||
[tool.poetry.extras]
|
||||
proxy = [
|
||||
|
|
|
@ -37,7 +37,7 @@ sentry_sdk==2.21.0 # for sentry error handling
|
|||
detect-secrets==1.5.0 # Enterprise - secret detection / masking in LLM requests
|
||||
cryptography==43.0.1
|
||||
tzdata==2025.1 # IANA time zone database
|
||||
litellm-proxy-extras==0.1.7 # for proxy extras - e.g. prisma migrations
|
||||
litellm-proxy-extras==0.1.8 # for proxy extras - e.g. prisma migrations
|
||||
### LITELLM PACKAGE DEPENDENCIES
|
||||
python-dotenv==1.0.0 # for env
|
||||
tiktoken==0.8.0 # for calculating usage
|
||||
|
|
|
@ -340,6 +340,31 @@ model LiteLLM_DailyUserSpend {
|
|||
@@index([model])
|
||||
}
|
||||
|
||||
// Track daily team spend metrics per model and key
|
||||
model LiteLLM_DailyTeamSpend {
|
||||
id String @id @default(uuid())
|
||||
team_id String
|
||||
date String
|
||||
api_key String
|
||||
model String
|
||||
model_group String?
|
||||
custom_llm_provider String?
|
||||
prompt_tokens Int @default(0)
|
||||
completion_tokens Int @default(0)
|
||||
spend Float @default(0.0)
|
||||
api_requests Int @default(0)
|
||||
successful_requests Int @default(0)
|
||||
failed_requests Int @default(0)
|
||||
created_at DateTime @default(now())
|
||||
updated_at DateTime @updatedAt
|
||||
|
||||
@@unique([team_id, date, api_key, model, custom_llm_provider])
|
||||
@@index([date])
|
||||
@@index([team_id])
|
||||
@@index([api_key])
|
||||
@@index([model])
|
||||
}
|
||||
|
||||
|
||||
// Track the status of cron jobs running. Only allow one pod to run the job at a time
|
||||
model LiteLLM_CronJob {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue