mirror of
https://github.com/BerriAI/litellm.git
synced 2025-04-26 03:04:13 +00:00
Add Daily User Spend Aggregate view - allows UI Usage tab to work > 1m rows (#9538)
* ci: update github action * build(schema.prisma): enable daily user spend table allows storing aggregate view of user's daily spend * build(schema.prisma): add new daily user spend table * feat: working daily user spend tracking maintains an aggregate view for easier querying in high traffic * setup_google_dns * ci: update ci yaml --------- Co-authored-by: Ishaan Jaff <ishaanjaffer0324@gmail.com>
This commit is contained in:
parent
5b52d246a9
commit
eeab48ce5a
5 changed files with 290 additions and 1 deletions
|
@ -2646,3 +2646,15 @@ class DefaultInternalUserParams(LiteLLMPydanticObjectBase):
|
||||||
models: Optional[List[str]] = Field(
|
models: Optional[List[str]] = Field(
|
||||||
default=None, description="Default list of models that new users can access"
|
default=None, description="Default list of models that new users can access"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class DailyUserSpendTransaction(TypedDict):
|
||||||
|
user_id: str
|
||||||
|
date: str
|
||||||
|
api_key: str
|
||||||
|
model: str
|
||||||
|
model_group: Optional[str]
|
||||||
|
custom_llm_provider: Optional[str]
|
||||||
|
prompt_tokens: int
|
||||||
|
completion_tokens: int
|
||||||
|
spend: float
|
||||||
|
|
|
@ -558,6 +558,7 @@ async def proxy_startup_event(app: FastAPI):
|
||||||
proxy_batch_write_at=proxy_batch_write_at,
|
proxy_batch_write_at=proxy_batch_write_at,
|
||||||
proxy_logging_obj=proxy_logging_obj,
|
proxy_logging_obj=proxy_logging_obj,
|
||||||
)
|
)
|
||||||
|
|
||||||
## [Optional] Initialize dd tracer
|
## [Optional] Initialize dd tracer
|
||||||
ProxyStartupEvent._init_dd_tracer()
|
ProxyStartupEvent._init_dd_tracer()
|
||||||
|
|
||||||
|
@ -914,6 +915,8 @@ def _set_spend_logs_payload(
|
||||||
prisma_client.spend_log_transactions.append(payload)
|
prisma_client.spend_log_transactions.append(payload)
|
||||||
elif prisma_client is not None:
|
elif prisma_client is not None:
|
||||||
prisma_client.spend_log_transactions.append(payload)
|
prisma_client.spend_log_transactions.append(payload)
|
||||||
|
|
||||||
|
prisma_client.add_spend_log_transaction_to_daily_user_transaction(payload.copy())
|
||||||
return prisma_client
|
return prisma_client
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -313,3 +313,26 @@ model LiteLLM_AuditLog {
|
||||||
before_value Json? // value of the row
|
before_value Json? // value of the row
|
||||||
updated_values Json? // value of the row after change
|
updated_values Json? // value of the row after change
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Track daily user spend metrics per model and key
|
||||||
|
model LiteLLM_DailyUserSpend {
|
||||||
|
id String @id @default(uuid())
|
||||||
|
user_id String
|
||||||
|
date String
|
||||||
|
api_key String // Hashed API Token
|
||||||
|
model String // The specific model used
|
||||||
|
model_group String? // public model_name / model_group
|
||||||
|
custom_llm_provider String? // The LLM provider (e.g., "openai", "anthropic")
|
||||||
|
prompt_tokens Int @default(0)
|
||||||
|
completion_tokens Int @default(0)
|
||||||
|
spend Float @default(0.0)
|
||||||
|
created_at DateTime @default(now())
|
||||||
|
updated_at DateTime @updatedAt
|
||||||
|
|
||||||
|
@@unique([user_id, date, api_key, model, custom_llm_provider])
|
||||||
|
@@index([date])
|
||||||
|
@@index([user_id])
|
||||||
|
@@index([api_key])
|
||||||
|
@@index([model])
|
||||||
|
}
|
||||||
|
|
|
@ -10,13 +10,15 @@ import traceback
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from email.mime.multipart import MIMEMultipart
|
from email.mime.multipart import MIMEMultipart
|
||||||
from email.mime.text import MIMEText
|
from email.mime.text import MIMEText
|
||||||
from typing import TYPE_CHECKING, Any, List, Literal, Optional, Union, overload
|
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union, overload
|
||||||
|
|
||||||
from litellm.proxy._types import (
|
from litellm.proxy._types import (
|
||||||
DB_CONNECTION_ERROR_TYPES,
|
DB_CONNECTION_ERROR_TYPES,
|
||||||
CommonProxyErrors,
|
CommonProxyErrors,
|
||||||
|
DailyUserSpendTransaction,
|
||||||
ProxyErrorTypes,
|
ProxyErrorTypes,
|
||||||
ProxyException,
|
ProxyException,
|
||||||
|
SpendLogsPayload,
|
||||||
)
|
)
|
||||||
from litellm.types.guardrails import GuardrailEventHooks
|
from litellm.types.guardrails import GuardrailEventHooks
|
||||||
|
|
||||||
|
@ -1103,6 +1105,7 @@ class PrismaClient:
|
||||||
team_member_list_transactons: dict = {} # key is ["team_id" + "user_id"]
|
team_member_list_transactons: dict = {} # key is ["team_id" + "user_id"]
|
||||||
org_list_transactons: dict = {}
|
org_list_transactons: dict = {}
|
||||||
spend_log_transactions: List = []
|
spend_log_transactions: List = []
|
||||||
|
daily_user_spend_transactions: Dict[str, DailyUserSpendTransaction] = {}
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
@ -1140,6 +1143,61 @@ class PrismaClient:
|
||||||
) # Client to connect to Prisma db
|
) # Client to connect to Prisma db
|
||||||
verbose_proxy_logger.debug("Success - Created Prisma Client")
|
verbose_proxy_logger.debug("Success - Created Prisma Client")
|
||||||
|
|
||||||
|
def add_spend_log_transaction_to_daily_user_transaction(
|
||||||
|
self, payload: Union[dict, SpendLogsPayload]
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Add a spend log transaction to the daily user transaction list
|
||||||
|
|
||||||
|
Key = @@unique([user_id, date, api_key, model, custom_llm_provider]) )
|
||||||
|
|
||||||
|
If key exists, update the transaction with the new spend and usage
|
||||||
|
"""
|
||||||
|
expected_keys = ["user", "startTime", "api_key", "model", "custom_llm_provider"]
|
||||||
|
if not all(key in payload for key in expected_keys):
|
||||||
|
verbose_proxy_logger.debug(
|
||||||
|
f"Missing expected keys: {expected_keys}, in payload, skipping from daily_user_spend_transactions"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
if isinstance(payload["startTime"], datetime):
|
||||||
|
start_time = payload["startTime"].isoformat()
|
||||||
|
date = start_time.split("T")[0]
|
||||||
|
elif isinstance(payload["startTime"], str):
|
||||||
|
date = payload["startTime"].split("T")[0]
|
||||||
|
else:
|
||||||
|
verbose_proxy_logger.debug(
|
||||||
|
f"Invalid start time: {payload['startTime']}, skipping from daily_user_spend_transactions"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
daily_transaction_key = f"{payload['user']}_{date}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}"
|
||||||
|
if daily_transaction_key in self.daily_user_spend_transactions:
|
||||||
|
daily_transaction = self.daily_user_spend_transactions[
|
||||||
|
daily_transaction_key
|
||||||
|
]
|
||||||
|
daily_transaction["spend"] += payload["spend"]
|
||||||
|
daily_transaction["prompt_tokens"] += payload["prompt_tokens"]
|
||||||
|
daily_transaction["completion_tokens"] += payload["completion_tokens"]
|
||||||
|
else:
|
||||||
|
daily_transaction = DailyUserSpendTransaction(
|
||||||
|
user_id=payload["user"],
|
||||||
|
date=date,
|
||||||
|
api_key=payload["api_key"],
|
||||||
|
model=payload["model"],
|
||||||
|
model_group=payload["model_group"],
|
||||||
|
custom_llm_provider=payload["custom_llm_provider"],
|
||||||
|
prompt_tokens=payload["prompt_tokens"],
|
||||||
|
completion_tokens=payload["completion_tokens"],
|
||||||
|
spend=payload["spend"],
|
||||||
|
)
|
||||||
|
|
||||||
|
self.daily_user_spend_transactions[daily_transaction_key] = (
|
||||||
|
daily_transaction
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
raise e
|
||||||
|
|
||||||
def hash_token(self, token: str):
|
def hash_token(self, token: str):
|
||||||
# Hash the string using SHA-256
|
# Hash the string using SHA-256
|
||||||
hashed_token = hashlib.sha256(token.encode()).hexdigest()
|
hashed_token = hashlib.sha256(token.encode()).hexdigest()
|
||||||
|
@ -2476,6 +2534,116 @@ class ProxyUpdateSpend:
|
||||||
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
|
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def update_daily_user_spend(
|
||||||
|
n_retry_times: int,
|
||||||
|
prisma_client: PrismaClient,
|
||||||
|
proxy_logging_obj: ProxyLogging,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Batch job to update LiteLLM_DailyUserSpend table using in-memory daily_spend_transactions
|
||||||
|
"""
|
||||||
|
BATCH_SIZE = (
|
||||||
|
100 # Number of aggregated records to update in each database operation
|
||||||
|
)
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
for i in range(n_retry_times + 1):
|
||||||
|
try:
|
||||||
|
# Get transactions to process
|
||||||
|
transactions_to_process = dict(
|
||||||
|
list(prisma_client.daily_user_spend_transactions.items())[
|
||||||
|
:BATCH_SIZE
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(transactions_to_process) == 0:
|
||||||
|
verbose_proxy_logger.debug(
|
||||||
|
"No new transactions to process for daily spend update"
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Update DailyUserSpend table in batches
|
||||||
|
async with prisma_client.db.batch_() as batcher:
|
||||||
|
for _, transaction in transactions_to_process.items():
|
||||||
|
user_id = transaction.get("user_id")
|
||||||
|
if not user_id: # Skip if no user_id
|
||||||
|
continue
|
||||||
|
|
||||||
|
batcher.litellm_dailyuserspend.upsert(
|
||||||
|
where={
|
||||||
|
"user_id_date_api_key_model_custom_llm_provider": {
|
||||||
|
"user_id": user_id,
|
||||||
|
"date": transaction["date"],
|
||||||
|
"api_key": transaction["api_key"],
|
||||||
|
"model": transaction["model"],
|
||||||
|
"custom_llm_provider": transaction.get(
|
||||||
|
"custom_llm_provider"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
data={
|
||||||
|
"create": {
|
||||||
|
"user_id": user_id,
|
||||||
|
"date": transaction["date"],
|
||||||
|
"api_key": transaction["api_key"],
|
||||||
|
"model": transaction["model"],
|
||||||
|
"model_group": transaction.get("model_group"),
|
||||||
|
"custom_llm_provider": transaction.get(
|
||||||
|
"custom_llm_provider"
|
||||||
|
),
|
||||||
|
"prompt_tokens": transaction["prompt_tokens"],
|
||||||
|
"completion_tokens": transaction[
|
||||||
|
"completion_tokens"
|
||||||
|
],
|
||||||
|
"spend": transaction["spend"],
|
||||||
|
},
|
||||||
|
"update": {
|
||||||
|
"prompt_tokens": {
|
||||||
|
"increment": transaction["prompt_tokens"]
|
||||||
|
},
|
||||||
|
"completion_tokens": {
|
||||||
|
"increment": transaction[
|
||||||
|
"completion_tokens"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"spend": {"increment": transaction["spend"]},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
verbose_proxy_logger.info(
|
||||||
|
f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Remove processed transactions
|
||||||
|
for key in transactions_to_process.keys():
|
||||||
|
prisma_client.daily_user_spend_transactions.pop(key, None)
|
||||||
|
|
||||||
|
verbose_proxy_logger.debug(
|
||||||
|
f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s"
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
except DB_CONNECTION_ERROR_TYPES as e:
|
||||||
|
if i >= n_retry_times:
|
||||||
|
_raise_failed_update_spend_exception(
|
||||||
|
e=e,
|
||||||
|
start_time=start_time,
|
||||||
|
proxy_logging_obj=proxy_logging_obj,
|
||||||
|
)
|
||||||
|
await asyncio.sleep(2**i) # Exponential backoff
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
# Remove processed transactions even if there was an error
|
||||||
|
if "transactions_to_process" in locals():
|
||||||
|
for key in transactions_to_process.keys(): # type: ignore
|
||||||
|
prisma_client.daily_user_spend_transactions.pop(key, None)
|
||||||
|
_raise_failed_update_spend_exception(
|
||||||
|
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def disable_spend_updates() -> bool:
|
def disable_spend_updates() -> bool:
|
||||||
"""
|
"""
|
||||||
|
@ -2716,6 +2884,20 @@ async def update_spend( # noqa: PLR0915
|
||||||
db_writer_client=db_writer_client,
|
db_writer_client=db_writer_client,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
### UPDATE DAILY USER SPEND ###
|
||||||
|
verbose_proxy_logger.debug(
|
||||||
|
"Daily User Spend transactions: {}".format(
|
||||||
|
len(prisma_client.daily_user_spend_transactions)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(prisma_client.daily_user_spend_transactions) > 0:
|
||||||
|
await ProxyUpdateSpend.update_daily_user_spend(
|
||||||
|
n_retry_times=n_retry_times,
|
||||||
|
prisma_client=prisma_client,
|
||||||
|
proxy_logging_obj=proxy_logging_obj,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def _raise_failed_update_spend_exception(
|
def _raise_failed_update_spend_exception(
|
||||||
e: Exception, start_time: float, proxy_logging_obj: ProxyLogging
|
e: Exception, start_time: float, proxy_logging_obj: ProxyLogging
|
||||||
|
@ -2911,3 +3093,50 @@ def _premium_user_check():
|
||||||
"error": f"This feature is only available for LiteLLM Enterprise users. {CommonProxyErrors.not_premium_user.value}"
|
"error": f"This feature is only available for LiteLLM Enterprise users. {CommonProxyErrors.not_premium_user.value}"
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _update_daily_spend_batch(prisma_client, spend_aggregates):
|
||||||
|
"""Helper function to update daily spend in batches"""
|
||||||
|
async with prisma_client.db.batch_() as batcher:
|
||||||
|
for (
|
||||||
|
user_id,
|
||||||
|
date,
|
||||||
|
api_key,
|
||||||
|
model,
|
||||||
|
model_group,
|
||||||
|
provider,
|
||||||
|
), metrics in spend_aggregates.items():
|
||||||
|
if not user_id: # Skip if no user_id
|
||||||
|
continue
|
||||||
|
|
||||||
|
batcher.litellm_dailyuserspend.upsert(
|
||||||
|
where={
|
||||||
|
"user_id_date_api_key_model_custom_llm_provider": {
|
||||||
|
"user_id": user_id,
|
||||||
|
"date": date,
|
||||||
|
"api_key": api_key,
|
||||||
|
"model": model,
|
||||||
|
"custom_llm_provider": provider,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
data={
|
||||||
|
"create": {
|
||||||
|
"user_id": user_id,
|
||||||
|
"date": date,
|
||||||
|
"api_key": api_key,
|
||||||
|
"model": model,
|
||||||
|
"model_group": model_group,
|
||||||
|
"custom_llm_provider": provider,
|
||||||
|
"prompt_tokens": metrics["prompt_tokens"],
|
||||||
|
"completion_tokens": metrics["completion_tokens"],
|
||||||
|
"spend": metrics["spend"],
|
||||||
|
},
|
||||||
|
"update": {
|
||||||
|
"prompt_tokens": {"increment": metrics["prompt_tokens"]},
|
||||||
|
"completion_tokens": {
|
||||||
|
"increment": metrics["completion_tokens"]
|
||||||
|
},
|
||||||
|
"spend": {"increment": metrics["spend"]},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
|
@ -313,3 +313,25 @@ model LiteLLM_AuditLog {
|
||||||
before_value Json? // value of the row
|
before_value Json? // value of the row
|
||||||
updated_values Json? // value of the row after change
|
updated_values Json? // value of the row after change
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Track daily user spend metrics per model and key
|
||||||
|
model LiteLLM_DailyUserSpend {
|
||||||
|
id String @id @default(uuid())
|
||||||
|
user_id String
|
||||||
|
date String
|
||||||
|
api_key String
|
||||||
|
model String
|
||||||
|
model_group String?
|
||||||
|
custom_llm_provider String?
|
||||||
|
prompt_tokens Int @default(0)
|
||||||
|
completion_tokens Int @default(0)
|
||||||
|
spend Float @default(0.0)
|
||||||
|
created_at DateTime @default(now())
|
||||||
|
updated_at DateTime @updatedAt
|
||||||
|
|
||||||
|
@@unique([user_id, date, api_key, model, custom_llm_provider])
|
||||||
|
@@index([date])
|
||||||
|
@@index([user_id])
|
||||||
|
@@index([api_key])
|
||||||
|
@@index([model])
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue