Connect UI to "LiteLLM_DailyUserSpend" spend table - enables usage tab to work at 1m+ spend logs (#9603)

* feat(spend_management_endpoints.py): expose new endpoint for querying user's usage at 1m+ spend logs

Allows user to view their spend at 1m+ spend logs

* build(schema.prisma): add api_requests to dailyuserspend table

* build(migration.sql): add migration file for new column to daily user spend table

* build(prisma_client.py): add logic for copying over migration folder, if deploy/migrations present in expected location

enables easier testing of prisma migration flow

* build(ui/): initial commit successfully using the dailyuserspend table on the UI

* refactor(internal_user_endpoints.py): refactor `/user/daily/activity` to give breakdowns by provider/model/key

* feat: feature parity (cost page) with existing 'usage' page

* build(ui/): add activity tab to new_usage.tsx

gets to feature parity on 'All Up' page of 'usage.tsx'

* fix(proxy/utils.py): count number of api requests in daily user spend table

allows us to see activity by model on new usage tab

* style(new_usage.tsx): fix y-axis to be in ascending order of date

* fix: fix linting errors

* fix: fix ruff check errors
This commit is contained in:
Krish Dholakia 2025-03-27 23:29:15 -07:00 committed by GitHub
parent b155f4f4a6
commit cdcc8ea9b7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 909 additions and 22 deletions

View file

@ -14,8 +14,9 @@ These are members of a Team on LiteLLM
import asyncio
import traceback
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, List, Optional, Union, cast
from datetime import date, datetime, timedelta, timezone
from enum import Enum
from typing import Any, Dict, List, Optional, TypedDict, Union, cast
import fastapi
from fastapi import APIRouter, Depends, Header, HTTPException, Request, status
@ -1242,3 +1243,291 @@ async def ui_view_users(
except Exception as e:
verbose_proxy_logger.exception(f"Error searching users: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error searching users: {str(e)}")
class GroupByDimension(str, Enum):
DATE = "date"
MODEL = "model"
API_KEY = "api_key"
TEAM = "team"
ORGANIZATION = "organization"
MODEL_GROUP = "model_group"
PROVIDER = "custom_llm_provider"
class SpendMetrics(BaseModel):
spend: float = Field(default=0.0)
prompt_tokens: int = Field(default=0)
completion_tokens: int = Field(default=0)
total_tokens: int = Field(default=0)
api_requests: int = Field(default=0)
class BreakdownMetrics(BaseModel):
"""Breakdown of spend by different dimensions"""
models: Dict[str, SpendMetrics] = Field(default_factory=dict) # model -> metrics
providers: Dict[str, SpendMetrics] = Field(
default_factory=dict
) # provider -> metrics
api_keys: Dict[str, SpendMetrics] = Field(
default_factory=dict
) # api_key -> metrics
class DailySpendData(BaseModel):
date: date
metrics: SpendMetrics
breakdown: BreakdownMetrics = Field(default_factory=BreakdownMetrics)
class DailySpendMetadata(BaseModel):
total_spend: float = Field(default=0.0)
total_prompt_tokens: int = Field(default=0)
total_completion_tokens: int = Field(default=0)
total_api_requests: int = Field(default=0)
page: int = Field(default=1)
total_pages: int = Field(default=1)
has_more: bool = Field(default=False)
class SpendAnalyticsPaginatedResponse(BaseModel):
results: List[DailySpendData]
metadata: DailySpendMetadata = Field(default_factory=DailySpendMetadata)
class LiteLLM_DailyUserSpend(BaseModel):
id: str
user_id: str
date: str
api_key: str
model: str
model_group: Optional[str] = None
custom_llm_provider: Optional[str] = None
prompt_tokens: int = 0
completion_tokens: int = 0
spend: float = 0.0
api_requests: int = 0
class GroupedData(TypedDict):
metrics: SpendMetrics
breakdown: BreakdownMetrics
def update_metrics(
group_metrics: SpendMetrics, record: LiteLLM_DailyUserSpend
) -> SpendMetrics:
group_metrics.spend += record.spend
group_metrics.prompt_tokens += record.prompt_tokens
group_metrics.completion_tokens += record.completion_tokens
group_metrics.total_tokens += record.prompt_tokens + record.completion_tokens
group_metrics.api_requests += record.api_requests
return group_metrics
def update_breakdown_metrics(
breakdown: BreakdownMetrics, record: LiteLLM_DailyUserSpend
) -> BreakdownMetrics:
"""Updates breakdown metrics for a single record using the existing update_metrics function"""
# Update model breakdown
if record.model not in breakdown.models:
breakdown.models[record.model] = SpendMetrics()
breakdown.models[record.model] = update_metrics(
breakdown.models[record.model], record
)
# Update provider breakdown
provider = record.custom_llm_provider or "unknown"
if provider not in breakdown.providers:
breakdown.providers[provider] = SpendMetrics()
breakdown.providers[provider] = update_metrics(
breakdown.providers[provider], record
)
# Update api key breakdown
if record.api_key not in breakdown.api_keys:
breakdown.api_keys[record.api_key] = SpendMetrics()
breakdown.api_keys[record.api_key] = update_metrics(
breakdown.api_keys[record.api_key], record
)
return breakdown
@router.get(
"/user/daily/activity",
tags=["Budget & Spend Tracking", "Internal User management"],
dependencies=[Depends(user_api_key_auth)],
response_model=SpendAnalyticsPaginatedResponse,
)
async def get_user_daily_activity(
start_date: Optional[str] = fastapi.Query(
default=None,
description="Start date in YYYY-MM-DD format",
),
end_date: Optional[str] = fastapi.Query(
default=None,
description="End date in YYYY-MM-DD format",
),
group_by: List[GroupByDimension] = fastapi.Query(
default=[GroupByDimension.DATE],
description="Dimensions to group by. Can combine multiple (e.g. date,team)",
),
view_by: Literal["team", "organization", "user"] = fastapi.Query(
default="user",
description="View spend at team/org/user level",
),
team_id: Optional[str] = fastapi.Query(
default=None,
description="Filter by specific team",
),
org_id: Optional[str] = fastapi.Query(
default=None,
description="Filter by specific organization",
),
model: Optional[str] = fastapi.Query(
default=None,
description="Filter by specific model",
),
api_key: Optional[str] = fastapi.Query(
default=None,
description="Filter by specific API key",
),
page: int = fastapi.Query(
default=1, description="Page number for pagination", ge=1
),
page_size: int = fastapi.Query(
default=50, description="Items per page", ge=1, le=100
),
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
) -> SpendAnalyticsPaginatedResponse:
"""
[BETA] This is a beta endpoint. It will change.
Meant to optimize querying spend data for analytics for a user.
Returns:
(by date/team/org/user/model/api_key/model_group/provider)
- spend
- prompt_tokens
- completion_tokens
- total_tokens
- api_requests
- breakdown by team, organization, user, model, api_key, model_group, provider
"""
from litellm.proxy.proxy_server import prisma_client
if prisma_client is None:
raise HTTPException(
status_code=500,
detail={"error": CommonProxyErrors.db_not_connected_error.value},
)
if start_date is None or end_date is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={"error": "Please provide start_date and end_date"},
)
try:
# Build filter conditions
where_conditions: Dict[str, Any] = {
"date": {
"gte": start_date,
"lte": end_date,
}
}
if team_id:
where_conditions["team_id"] = team_id
if org_id:
where_conditions["organization_id"] = org_id
if model:
where_conditions["model"] = model
if api_key:
where_conditions["api_key"] = api_key
# Get total count for pagination
total_count = await prisma_client.db.litellm_dailyuserspend.count(
where=where_conditions
)
# Fetch paginated results
daily_spend_data = await prisma_client.db.litellm_dailyuserspend.find_many(
where=where_conditions,
order=[
{"date": "desc"},
],
skip=(page - 1) * page_size,
take=page_size,
)
# Process results
results = []
total_metrics = SpendMetrics()
# Group data by date and other dimensions
grouped_data: Dict[str, Dict[str, Any]] = {}
for record in daily_spend_data:
date_str = record.date
if date_str not in grouped_data:
grouped_data[date_str] = {
"metrics": SpendMetrics(),
"breakdown": BreakdownMetrics(),
}
# Update metrics
grouped_data[date_str]["metrics"] = update_metrics(
grouped_data[date_str]["metrics"], record
)
# Update breakdowns
grouped_data[date_str]["breakdown"] = update_breakdown_metrics(
grouped_data[date_str]["breakdown"], record
)
# Update total metrics
total_metrics.spend += record.spend
total_metrics.prompt_tokens += record.prompt_tokens
total_metrics.completion_tokens += record.completion_tokens
total_metrics.total_tokens += (
record.prompt_tokens + record.completion_tokens
)
total_metrics.api_requests += 1
# Convert grouped data to response format
for date_str, data in grouped_data.items():
results.append(
DailySpendData(
date=datetime.strptime(date_str, "%Y-%m-%d").date(),
metrics=data["metrics"],
breakdown=data["breakdown"],
)
)
# Sort results by date
results.sort(key=lambda x: x.date, reverse=True)
return SpendAnalyticsPaginatedResponse(
results=results,
metadata=DailySpendMetadata(
total_spend=total_metrics.spend,
total_prompt_tokens=total_metrics.prompt_tokens,
total_completion_tokens=total_metrics.completion_tokens,
total_api_requests=total_metrics.api_requests,
page=page,
total_pages=-(-total_count // page_size), # Ceiling division
has_more=(page * page_size) < total_count,
),
)
except Exception as e:
verbose_proxy_logger.exception(
"/spend/daily/analytics: Exception occured - {}".format(str(e))
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={"error": f"Failed to fetch analytics: {str(e)}"},
)