From d8a1071bc4cbf55273a184d4b1f6abaf746234b7 Mon Sep 17 00:00:00 2001 From: Krish Dholakia Date: Wed, 16 Apr 2025 12:26:21 -0700 Subject: [PATCH 1/2] Add aggregate spend by tag (#10071) * feat: initial commit adding daily tag spend table to db * feat(db_spend_update_writer.py): correctly log tag spend transactions * build(schema.prisma): add new tag table to root * build: add new migration file --- .../migration.sql | 45 ++ .../litellm_proxy_extras/schema.prisma | 54 ++ litellm/constants.py | 1 + .../proxy/_experimental/out/onboarding.html | 1 - litellm/proxy/_types.py | 70 ++- litellm/proxy/db/db_spend_update_writer.py | 572 ++++++++++-------- .../redis_update_buffer.py | 89 ++- litellm/proxy/schema.prisma | 29 + litellm/proxy/utils.py | 47 -- litellm/types/services.py | 1 + schema.prisma | 29 + 11 files changed, 565 insertions(+), 373 deletions(-) create mode 100644 litellm-proxy-extras/litellm_proxy_extras/migrations/20250416115320_add_tag_table_to_db/migration.sql delete mode 100644 litellm/proxy/_experimental/out/onboarding.html diff --git a/litellm-proxy-extras/litellm_proxy_extras/migrations/20250416115320_add_tag_table_to_db/migration.sql b/litellm-proxy-extras/litellm_proxy_extras/migrations/20250416115320_add_tag_table_to_db/migration.sql new file mode 100644 index 0000000000..8c3cea7093 --- /dev/null +++ b/litellm-proxy-extras/litellm_proxy_extras/migrations/20250416115320_add_tag_table_to_db/migration.sql @@ -0,0 +1,45 @@ +-- AlterTable +ALTER TABLE "LiteLLM_DailyTeamSpend" ADD COLUMN "cache_creation_input_tokens" INTEGER NOT NULL DEFAULT 0, +ADD COLUMN "cache_read_input_tokens" INTEGER NOT NULL DEFAULT 0; + +-- CreateTable +CREATE TABLE "LiteLLM_DailyTagSpend" ( + "id" TEXT NOT NULL, + "tag" TEXT NOT NULL, + "date" TEXT NOT NULL, + "api_key" TEXT NOT NULL, + "model" TEXT NOT NULL, + "model_group" TEXT, + "custom_llm_provider" TEXT, + "prompt_tokens" INTEGER NOT NULL DEFAULT 0, + "completion_tokens" INTEGER NOT NULL DEFAULT 0, + "cache_read_input_tokens" INTEGER NOT NULL DEFAULT 0, + "cache_creation_input_tokens" INTEGER NOT NULL DEFAULT 0, + "spend" DOUBLE PRECISION NOT NULL DEFAULT 0.0, + "api_requests" INTEGER NOT NULL DEFAULT 0, + "successful_requests" INTEGER NOT NULL DEFAULT 0, + "failed_requests" INTEGER NOT NULL DEFAULT 0, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updated_at" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "LiteLLM_DailyTagSpend_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "LiteLLM_DailyTagSpend_tag_key" ON "LiteLLM_DailyTagSpend"("tag"); + +-- CreateIndex +CREATE INDEX "LiteLLM_DailyTagSpend_date_idx" ON "LiteLLM_DailyTagSpend"("date"); + +-- CreateIndex +CREATE INDEX "LiteLLM_DailyTagSpend_tag_idx" ON "LiteLLM_DailyTagSpend"("tag"); + +-- CreateIndex +CREATE INDEX "LiteLLM_DailyTagSpend_api_key_idx" ON "LiteLLM_DailyTagSpend"("api_key"); + +-- CreateIndex +CREATE INDEX "LiteLLM_DailyTagSpend_model_idx" ON "LiteLLM_DailyTagSpend"("model"); + +-- CreateIndex +CREATE UNIQUE INDEX "LiteLLM_DailyTagSpend_tag_date_api_key_model_custom_llm_pro_key" ON "LiteLLM_DailyTagSpend"("tag", "date", "api_key", "model", "custom_llm_provider"); + diff --git a/litellm-proxy-extras/litellm_proxy_extras/schema.prisma b/litellm-proxy-extras/litellm_proxy_extras/schema.prisma index b470eba64e..b21550413b 100644 --- a/litellm-proxy-extras/litellm_proxy_extras/schema.prisma +++ b/litellm-proxy-extras/litellm_proxy_extras/schema.prisma @@ -342,6 +342,60 @@ model LiteLLM_DailyUserSpend { @@index([model]) } +// Track daily team spend metrics per model and key +model LiteLLM_DailyTeamSpend { + id String @id @default(uuid()) + team_id String + date String + api_key String + model String + model_group String? + custom_llm_provider String? + prompt_tokens Int @default(0) + completion_tokens Int @default(0) + cache_read_input_tokens Int @default(0) + cache_creation_input_tokens Int @default(0) + spend Float @default(0.0) + api_requests Int @default(0) + successful_requests Int @default(0) + failed_requests Int @default(0) + created_at DateTime @default(now()) + updated_at DateTime @updatedAt + + @@unique([team_id, date, api_key, model, custom_llm_provider]) + @@index([date]) + @@index([team_id]) + @@index([api_key]) + @@index([model]) +} + +// Track daily team spend metrics per model and key +model LiteLLM_DailyTagSpend { + id String @id @default(uuid()) + tag String @unique + date String + api_key String + model String + model_group String? + custom_llm_provider String? + prompt_tokens Int @default(0) + completion_tokens Int @default(0) + cache_read_input_tokens Int @default(0) + cache_creation_input_tokens Int @default(0) + spend Float @default(0.0) + api_requests Int @default(0) + successful_requests Int @default(0) + failed_requests Int @default(0) + created_at DateTime @default(now()) + updated_at DateTime @updatedAt + + @@unique([tag, date, api_key, model, custom_llm_provider]) + @@index([date]) + @@index([tag]) + @@index([api_key]) + @@index([model]) +} + // Track the status of cron jobs running. Only allow one pod to run the job at a time model LiteLLM_CronJob { diff --git a/litellm/constants.py b/litellm/constants.py index f25c7c71e4..9c30dc06a2 100644 --- a/litellm/constants.py +++ b/litellm/constants.py @@ -28,6 +28,7 @@ _DEFAULT_TTL_FOR_HTTPX_CLIENTS = 3600 # 1 hour, re-use the same httpx client fo REDIS_UPDATE_BUFFER_KEY = "litellm_spend_update_buffer" REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_spend_update_buffer" REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_team_spend_update_buffer" +REDIS_DAILY_TAG_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_tag_spend_update_buffer" MAX_REDIS_BUFFER_DEQUEUE_COUNT = 100 MAX_SIZE_IN_MEMORY_QUEUE = 10000 MAX_IN_MEMORY_QUEUE_FLUSH_COUNT = 1000 diff --git a/litellm/proxy/_experimental/out/onboarding.html b/litellm/proxy/_experimental/out/onboarding.html deleted file mode 100644 index f14d344ab3..0000000000 --- a/litellm/proxy/_experimental/out/onboarding.html +++ /dev/null @@ -1 +0,0 @@ -LiteLLM Dashboard \ No newline at end of file diff --git a/litellm/proxy/_types.py b/litellm/proxy/_types.py index d72ce2cb3b..835a639c34 100644 --- a/litellm/proxy/_types.py +++ b/litellm/proxy/_types.py @@ -650,9 +650,9 @@ class GenerateRequestBase(LiteLLMPydanticObjectBase): allowed_cache_controls: Optional[list] = [] config: Optional[dict] = {} permissions: Optional[dict] = {} - model_max_budget: Optional[dict] = ( - {} - ) # {"gpt-4": 5.0, "gpt-3.5-turbo": 5.0}, defaults to {} + model_max_budget: Optional[ + dict + ] = {} # {"gpt-4": 5.0, "gpt-3.5-turbo": 5.0}, defaults to {} model_config = ConfigDict(protected_namespaces=()) model_rpm_limit: Optional[dict] = None @@ -908,12 +908,12 @@ class NewCustomerRequest(BudgetNewRequest): alias: Optional[str] = None # human-friendly alias blocked: bool = False # allow/disallow requests for this end-user budget_id: Optional[str] = None # give either a budget_id or max_budget - allowed_model_region: Optional[AllowedModelRegion] = ( - None # require all user requests to use models in this specific region - ) - default_model: Optional[str] = ( - None # if no equivalent model in allowed region - default all requests to this model - ) + allowed_model_region: Optional[ + AllowedModelRegion + ] = None # require all user requests to use models in this specific region + default_model: Optional[ + str + ] = None # if no equivalent model in allowed region - default all requests to this model @model_validator(mode="before") @classmethod @@ -935,12 +935,12 @@ class UpdateCustomerRequest(LiteLLMPydanticObjectBase): blocked: bool = False # allow/disallow requests for this end-user max_budget: Optional[float] = None budget_id: Optional[str] = None # give either a budget_id or max_budget - allowed_model_region: Optional[AllowedModelRegion] = ( - None # require all user requests to use models in this specific region - ) - default_model: Optional[str] = ( - None # if no equivalent model in allowed region - default all requests to this model - ) + allowed_model_region: Optional[ + AllowedModelRegion + ] = None # require all user requests to use models in this specific region + default_model: Optional[ + str + ] = None # if no equivalent model in allowed region - default all requests to this model class DeleteCustomerRequest(LiteLLMPydanticObjectBase): @@ -1076,9 +1076,9 @@ class BlockKeyRequest(LiteLLMPydanticObjectBase): class AddTeamCallback(LiteLLMPydanticObjectBase): callback_name: str - callback_type: Optional[Literal["success", "failure", "success_and_failure"]] = ( - "success_and_failure" - ) + callback_type: Optional[ + Literal["success", "failure", "success_and_failure"] + ] = "success_and_failure" callback_vars: Dict[str, str] @model_validator(mode="before") @@ -1335,9 +1335,9 @@ class ConfigList(LiteLLMPydanticObjectBase): stored_in_db: Optional[bool] field_default_value: Any premium_field: bool = False - nested_fields: Optional[List[FieldDetail]] = ( - None # For nested dictionary or Pydantic fields - ) + nested_fields: Optional[ + List[FieldDetail] + ] = None # For nested dictionary or Pydantic fields class ConfigGeneralSettings(LiteLLMPydanticObjectBase): @@ -1604,9 +1604,9 @@ class LiteLLM_OrganizationMembershipTable(LiteLLMPydanticObjectBase): budget_id: Optional[str] = None created_at: datetime updated_at: datetime - user: Optional[Any] = ( - None # You might want to replace 'Any' with a more specific type if available - ) + user: Optional[ + Any + ] = None # You might want to replace 'Any' with a more specific type if available litellm_budget_table: Optional[LiteLLM_BudgetTable] = None model_config = ConfigDict(protected_namespaces=()) @@ -2354,9 +2354,9 @@ class TeamModelDeleteRequest(BaseModel): # Organization Member Requests class OrganizationMemberAddRequest(OrgMemberAddRequest): organization_id: str - max_budget_in_organization: Optional[float] = ( - None # Users max budget within the organization - ) + max_budget_in_organization: Optional[ + float + ] = None # Users max budget within the organization class OrganizationMemberDeleteRequest(MemberDeleteRequest): @@ -2545,9 +2545,9 @@ class ProviderBudgetResponse(LiteLLMPydanticObjectBase): Maps provider names to their budget configs. """ - providers: Dict[str, ProviderBudgetResponseObject] = ( - {} - ) # Dictionary mapping provider names to their budget configurations + providers: Dict[ + str, ProviderBudgetResponseObject + ] = {} # Dictionary mapping provider names to their budget configurations class ProxyStateVariables(TypedDict): @@ -2675,9 +2675,9 @@ class LiteLLM_JWTAuth(LiteLLMPydanticObjectBase): enforce_rbac: bool = False roles_jwt_field: Optional[str] = None # v2 on role mappings role_mappings: Optional[List[RoleMapping]] = None - object_id_jwt_field: Optional[str] = ( - None # can be either user / team, inferred from the role mapping - ) + object_id_jwt_field: Optional[ + str + ] = None # can be either user / team, inferred from the role mapping scope_mappings: Optional[List[ScopeMapping]] = None enforce_scope_based_access: bool = False enforce_team_based_model_access: bool = False @@ -2799,6 +2799,10 @@ class DailyUserSpendTransaction(BaseDailySpendTransaction): user_id: str +class DailyTagSpendTransaction(BaseDailySpendTransaction): + tag: str + + class DBSpendUpdateTransactions(TypedDict): """ Internal Data Structure for buffering spend updates in Redis or in memory before committing them to the database diff --git a/litellm/proxy/db/db_spend_update_writer.py b/litellm/proxy/db/db_spend_update_writer.py index e6bc0c3b7a..61ea930387 100644 --- a/litellm/proxy/db/db_spend_update_writer.py +++ b/litellm/proxy/db/db_spend_update_writer.py @@ -11,7 +11,7 @@ import os import time import traceback from datetime import datetime, timedelta -from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union, cast, overload import litellm from litellm._logging import verbose_proxy_logger @@ -20,6 +20,7 @@ from litellm.constants import DB_SPEND_UPDATE_JOB_NAME from litellm.proxy._types import ( DB_CONNECTION_ERROR_TYPES, BaseDailySpendTransaction, + DailyTagSpendTransaction, DailyTeamSpendTransaction, DailyUserSpendTransaction, DBSpendUpdateTransactions, @@ -61,6 +62,7 @@ class DBSpendUpdateWriter: self.spend_update_queue = SpendUpdateQueue() self.daily_spend_update_queue = DailySpendUpdateQueue() self.daily_team_spend_update_queue = DailySpendUpdateQueue() + self.daily_tag_spend_update_queue = DailySpendUpdateQueue() async def update_database( # LiteLLM management object fields @@ -170,6 +172,13 @@ class DBSpendUpdateWriter: ) ) + asyncio.create_task( + self.add_spend_log_transaction_to_daily_tag_transaction( + payload=payload, + prisma_client=prisma_client, + ) + ) + verbose_proxy_logger.debug("Runs spend update on all tables") except Exception: verbose_proxy_logger.debug( @@ -394,6 +403,7 @@ class DBSpendUpdateWriter: spend_update_queue=self.spend_update_queue, daily_spend_update_queue=self.daily_spend_update_queue, daily_team_spend_update_queue=self.daily_team_spend_update_queue, + daily_tag_spend_update_queue=self.daily_tag_spend_update_queue, ) # Only commit from redis to db if this pod is the leader @@ -495,6 +505,20 @@ class DBSpendUpdateWriter: daily_spend_transactions=daily_team_spend_update_transactions, ) + ################## Daily Tag Spend Update Transactions ################## + # Aggregate all in memory daily tag spend transactions and commit to db + daily_tag_spend_update_transactions = cast( + Dict[str, DailyTagSpendTransaction], + await self.daily_tag_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions(), + ) + + await DBSpendUpdateWriter.update_daily_tag_spend( + n_retry_times=n_retry_times, + prisma_client=prisma_client, + proxy_logging_obj=proxy_logging_obj, + daily_spend_transactions=daily_tag_spend_update_transactions, + ) + async def _commit_spend_updates_to_db( # noqa: PLR0915 self, prisma_client: PrismaClient, @@ -740,6 +764,208 @@ class DBSpendUpdateWriter: e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj ) + @overload + @staticmethod + async def _update_daily_spend( + n_retry_times: int, + prisma_client: PrismaClient, + proxy_logging_obj: ProxyLogging, + daily_spend_transactions: Dict[str, DailyUserSpendTransaction], + entity_type: Literal["user"], + entity_id_field: str, + table_name: str, + unique_constraint_name: str, + ) -> None: + ... + + @overload + @staticmethod + async def _update_daily_spend( + n_retry_times: int, + prisma_client: PrismaClient, + proxy_logging_obj: ProxyLogging, + daily_spend_transactions: Dict[str, DailyTeamSpendTransaction], + entity_type: Literal["team"], + entity_id_field: str, + table_name: str, + unique_constraint_name: str, + ) -> None: + ... + + @overload + @staticmethod + async def _update_daily_spend( + n_retry_times: int, + prisma_client: PrismaClient, + proxy_logging_obj: ProxyLogging, + daily_spend_transactions: Dict[str, DailyTagSpendTransaction], + entity_type: Literal["tag"], + entity_id_field: str, + table_name: str, + unique_constraint_name: str, + ) -> None: + ... + + @staticmethod + async def _update_daily_spend( + n_retry_times: int, + prisma_client: PrismaClient, + proxy_logging_obj: ProxyLogging, + daily_spend_transactions: Union[ + Dict[str, DailyUserSpendTransaction], + Dict[str, DailyTeamSpendTransaction], + Dict[str, DailyTagSpendTransaction], + ], + entity_type: Literal["user", "team", "tag"], + entity_id_field: str, + table_name: str, + unique_constraint_name: str, + ) -> None: + """ + Generic function to update daily spend for any entity type (user, team, tag) + """ + from litellm.proxy.utils import _raise_failed_update_spend_exception + + verbose_proxy_logger.debug( + f"Daily {entity_type.capitalize()} Spend transactions: {len(daily_spend_transactions)}" + ) + BATCH_SIZE = 100 + start_time = time.time() + + try: + for i in range(n_retry_times + 1): + try: + transactions_to_process = dict( + list(daily_spend_transactions.items())[:BATCH_SIZE] + ) + + if len(transactions_to_process) == 0: + verbose_proxy_logger.debug( + f"No new transactions to process for daily {entity_type} spend update" + ) + break + + async with prisma_client.db.batch_() as batcher: + for _, transaction in transactions_to_process.items(): + entity_id = transaction.get(entity_id_field) + if not entity_id: + continue + + # Construct the where clause dynamically + where_clause = { + unique_constraint_name: { + entity_id_field: entity_id, + "date": transaction["date"], + "api_key": transaction["api_key"], + "model": transaction["model"], + "custom_llm_provider": transaction.get( + "custom_llm_provider" + ), + } + } + + # Get the table dynamically + table = getattr(batcher, table_name) + + # Common data structure for both create and update + common_data = { + entity_id_field: entity_id, + "date": transaction["date"], + "api_key": transaction["api_key"], + "model": transaction["model"], + "model_group": transaction.get("model_group"), + "custom_llm_provider": transaction.get( + "custom_llm_provider" + ), + "prompt_tokens": transaction["prompt_tokens"], + "completion_tokens": transaction["completion_tokens"], + "spend": transaction["spend"], + "api_requests": transaction["api_requests"], + "successful_requests": transaction[ + "successful_requests" + ], + "failed_requests": transaction["failed_requests"], + } + + # Add cache-related fields if they exist + if "cache_read_input_tokens" in transaction: + common_data[ + "cache_read_input_tokens" + ] = transaction.get("cache_read_input_tokens", 0) + if "cache_creation_input_tokens" in transaction: + common_data[ + "cache_creation_input_tokens" + ] = transaction.get("cache_creation_input_tokens", 0) + + # Create update data structure + update_data = { + "prompt_tokens": { + "increment": transaction["prompt_tokens"] + }, + "completion_tokens": { + "increment": transaction["completion_tokens"] + }, + "spend": {"increment": transaction["spend"]}, + "api_requests": { + "increment": transaction["api_requests"] + }, + "successful_requests": { + "increment": transaction["successful_requests"] + }, + "failed_requests": { + "increment": transaction["failed_requests"] + }, + } + + # Add cache-related fields to update if they exist + if "cache_read_input_tokens" in transaction: + update_data["cache_read_input_tokens"] = { + "increment": transaction.get( + "cache_read_input_tokens", 0 + ) + } + if "cache_creation_input_tokens" in transaction: + update_data["cache_creation_input_tokens"] = { + "increment": transaction.get( + "cache_creation_input_tokens", 0 + ) + } + + table.upsert( + where=where_clause, + data={ + "create": common_data, + "update": update_data, + }, + ) + + verbose_proxy_logger.info( + f"Processed {len(transactions_to_process)} daily {entity_type} transactions in {time.time() - start_time:.2f}s" + ) + + # Remove processed transactions + for key in transactions_to_process.keys(): + daily_spend_transactions.pop(key, None) + + break + + except DB_CONNECTION_ERROR_TYPES as e: + if i >= n_retry_times: + _raise_failed_update_spend_exception( + e=e, + start_time=start_time, + proxy_logging_obj=proxy_logging_obj, + ) + await asyncio.sleep(2**i) + + except Exception as e: + if "transactions_to_process" in locals(): + for key in transactions_to_process.keys(): # type: ignore + daily_spend_transactions.pop(key, None) + _raise_failed_update_spend_exception( + e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj + ) + @staticmethod async def update_daily_user_spend( n_retry_times: int, @@ -750,144 +976,16 @@ class DBSpendUpdateWriter: """ Batch job to update LiteLLM_DailyUserSpend table using in-memory daily_spend_transactions """ - from litellm.proxy.utils import _raise_failed_update_spend_exception - - ### UPDATE DAILY USER SPEND ### - verbose_proxy_logger.debug( - "Daily User Spend transactions: {}".format(len(daily_spend_transactions)) + await DBSpendUpdateWriter._update_daily_spend( + n_retry_times=n_retry_times, + prisma_client=prisma_client, + proxy_logging_obj=proxy_logging_obj, + daily_spend_transactions=daily_spend_transactions, + entity_type="user", + entity_id_field="user_id", + table_name="litellm_dailyuserspend", + unique_constraint_name="user_id_date_api_key_model_custom_llm_provider", ) - BATCH_SIZE = ( - 100 # Number of aggregated records to update in each database operation - ) - start_time = time.time() - - try: - for i in range(n_retry_times + 1): - try: - # Get transactions to process - transactions_to_process = dict( - list(daily_spend_transactions.items())[:BATCH_SIZE] - ) - - if len(transactions_to_process) == 0: - verbose_proxy_logger.debug( - "No new transactions to process for daily spend update" - ) - break - - # Update DailyUserSpend table in batches - async with prisma_client.db.batch_() as batcher: - for _, transaction in transactions_to_process.items(): - user_id = transaction.get("user_id") - if not user_id: # Skip if no user_id - continue - - batcher.litellm_dailyuserspend.upsert( - where={ - "user_id_date_api_key_model_custom_llm_provider": { - "user_id": user_id, - "date": transaction["date"], - "api_key": transaction["api_key"], - "model": transaction["model"], - "custom_llm_provider": transaction.get( - "custom_llm_provider" - ), - } - }, - data={ - "create": { - "user_id": user_id, - "date": transaction["date"], - "api_key": transaction["api_key"], - "model": transaction["model"], - "model_group": transaction.get("model_group"), - "custom_llm_provider": transaction.get( - "custom_llm_provider" - ), - "prompt_tokens": transaction["prompt_tokens"], - "completion_tokens": transaction[ - "completion_tokens" - ], - "cache_read_input_tokens": transaction.get( - "cache_read_input_tokens", 0 - ), - "cache_creation_input_tokens": transaction.get( - "cache_creation_input_tokens", 0 - ), - "spend": transaction["spend"], - "api_requests": transaction["api_requests"], - "successful_requests": transaction[ - "successful_requests" - ], - "failed_requests": transaction[ - "failed_requests" - ], - }, - "update": { - "prompt_tokens": { - "increment": transaction["prompt_tokens"] - }, - "completion_tokens": { - "increment": transaction[ - "completion_tokens" - ] - }, - "cache_read_input_tokens": { - "increment": transaction.get( - "cache_read_input_tokens", 0 - ) - }, - "cache_creation_input_tokens": { - "increment": transaction.get( - "cache_creation_input_tokens", 0 - ) - }, - "spend": {"increment": transaction["spend"]}, - "api_requests": { - "increment": transaction["api_requests"] - }, - "successful_requests": { - "increment": transaction[ - "successful_requests" - ] - }, - "failed_requests": { - "increment": transaction["failed_requests"] - }, - }, - }, - ) - - verbose_proxy_logger.info( - f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s" - ) - - # Remove processed transactions - for key in transactions_to_process.keys(): - daily_spend_transactions.pop(key, None) - - verbose_proxy_logger.debug( - f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s" - ) - break - - except DB_CONNECTION_ERROR_TYPES as e: - if i >= n_retry_times: - _raise_failed_update_spend_exception( - e=e, - start_time=start_time, - proxy_logging_obj=proxy_logging_obj, - ) - await asyncio.sleep(2**i) # Exponential backoff - - except Exception as e: - # Remove processed transactions even if there was an error - if "transactions_to_process" in locals(): - for key in transactions_to_process.keys(): # type: ignore - daily_spend_transactions.pop(key, None) - _raise_failed_update_spend_exception( - e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj - ) @staticmethod async def update_daily_team_spend( @@ -899,140 +997,53 @@ class DBSpendUpdateWriter: """ Batch job to update LiteLLM_DailyTeamSpend table using in-memory daily_spend_transactions """ - from litellm.proxy.utils import _raise_failed_update_spend_exception - - ### UPDATE DAILY USER SPEND ### - verbose_proxy_logger.debug( - "Daily Team Spend transactions: {}".format(len(daily_spend_transactions)) + await DBSpendUpdateWriter._update_daily_spend( + n_retry_times=n_retry_times, + prisma_client=prisma_client, + proxy_logging_obj=proxy_logging_obj, + daily_spend_transactions=daily_spend_transactions, + entity_type="team", + entity_id_field="team_id", + table_name="litellm_dailyteamspend", + unique_constraint_name="team_id_date_api_key_model_custom_llm_provider", ) - BATCH_SIZE = ( - 100 # Number of aggregated records to update in each database operation + + @staticmethod + async def update_daily_tag_spend( + n_retry_times: int, + prisma_client: PrismaClient, + proxy_logging_obj: ProxyLogging, + daily_spend_transactions: Dict[str, DailyTagSpendTransaction], + ): + """ + Batch job to update LiteLLM_DailyTagSpend table using in-memory daily_spend_transactions + """ + await DBSpendUpdateWriter._update_daily_spend( + n_retry_times=n_retry_times, + prisma_client=prisma_client, + proxy_logging_obj=proxy_logging_obj, + daily_spend_transactions=daily_spend_transactions, + entity_type="tag", + entity_id_field="tag", + table_name="litellm_dailytagspend", + unique_constraint_name="tag_date_api_key_model_custom_llm_provider", ) - start_time = time.time() - - try: - for i in range(n_retry_times + 1): - try: - # Get transactions to process - transactions_to_process = dict( - list(daily_spend_transactions.items())[:BATCH_SIZE] - ) - - if len(transactions_to_process) == 0: - verbose_proxy_logger.debug( - "No new transactions to process for daily spend update" - ) - break - - # Update DailyUserSpend table in batches - async with prisma_client.db.batch_() as batcher: - for _, transaction in transactions_to_process.items(): - team_id = transaction.get("team_id") - if not team_id: # Skip if no team_id - continue - - batcher.litellm_dailyteamspend.upsert( - where={ - "team_id_date_api_key_model_custom_llm_provider": { - "team_id": team_id, - "date": transaction["date"], - "api_key": transaction["api_key"], - "model": transaction["model"], - "custom_llm_provider": transaction.get( - "custom_llm_provider" - ), - } - }, - data={ - "create": { - "team_id": team_id, - "date": transaction["date"], - "api_key": transaction["api_key"], - "model": transaction["model"], - "model_group": transaction.get("model_group"), - "custom_llm_provider": transaction.get( - "custom_llm_provider" - ), - "prompt_tokens": transaction["prompt_tokens"], - "completion_tokens": transaction[ - "completion_tokens" - ], - "spend": transaction["spend"], - "api_requests": transaction["api_requests"], - "successful_requests": transaction[ - "successful_requests" - ], - "failed_requests": transaction[ - "failed_requests" - ], - }, - "update": { - "prompt_tokens": { - "increment": transaction["prompt_tokens"] - }, - "completion_tokens": { - "increment": transaction[ - "completion_tokens" - ] - }, - "spend": {"increment": transaction["spend"]}, - "api_requests": { - "increment": transaction["api_requests"] - }, - "successful_requests": { - "increment": transaction[ - "successful_requests" - ] - }, - "failed_requests": { - "increment": transaction["failed_requests"] - }, - }, - }, - ) - - verbose_proxy_logger.info( - f"Processed {len(transactions_to_process)} daily team transactions in {time.time() - start_time:.2f}s" - ) - - # Remove processed transactions - for key in transactions_to_process.keys(): - daily_spend_transactions.pop(key, None) - - verbose_proxy_logger.debug( - f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s" - ) - break - - except DB_CONNECTION_ERROR_TYPES as e: - if i >= n_retry_times: - _raise_failed_update_spend_exception( - e=e, - start_time=start_time, - proxy_logging_obj=proxy_logging_obj, - ) - await asyncio.sleep(2**i) # Exponential backoff - - except Exception as e: - # Remove processed transactions even if there was an error - if "transactions_to_process" in locals(): - for key in transactions_to_process.keys(): # type: ignore - daily_spend_transactions.pop(key, None) - _raise_failed_update_spend_exception( - e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj - ) async def _common_add_spend_log_transaction_to_daily_transaction( self, payload: Union[dict, SpendLogsPayload], prisma_client: PrismaClient, - type: Literal["user", "team"] = "user", + type: Literal["user", "team", "request_tags"] = "user", ) -> Optional[BaseDailySpendTransaction]: common_expected_keys = ["startTime", "api_key", "model", "custom_llm_provider"] if type == "user": expected_keys = ["user", *common_expected_keys] - else: + elif type == "team": expected_keys = ["team_id", *common_expected_keys] + elif type == "request_tags": + expected_keys = ["request_tags", *common_expected_keys] + else: + raise ValueError(f"Invalid type: {type}") if not all(key in payload for key in expected_keys): verbose_proxy_logger.debug( @@ -1143,3 +1154,44 @@ class DBSpendUpdateWriter: await self.daily_team_spend_update_queue.add_update( update={daily_transaction_key: daily_transaction} ) + + async def add_spend_log_transaction_to_daily_tag_transaction( + self, + payload: SpendLogsPayload, + prisma_client: Optional[PrismaClient] = None, + ) -> None: + if prisma_client is None: + verbose_proxy_logger.debug( + "prisma_client is None. Skipping writing spend logs to db." + ) + return + + base_daily_transaction = ( + await self._common_add_spend_log_transaction_to_daily_transaction( + payload, prisma_client, "request_tags" + ) + ) + if base_daily_transaction is None: + return + if payload["request_tags"] is None: + verbose_proxy_logger.debug( + "request_tags is None for request. Skipping incrementing tag spend." + ) + return + + request_tags = [] + if isinstance(payload["request_tags"], str): + request_tags = json.loads(payload["request_tags"]) + elif isinstance(payload["request_tags"], list): + request_tags = payload["request_tags"] + else: + raise ValueError(f"Invalid request_tags: {payload['request_tags']}") + for tag in request_tags: + daily_transaction_key = f"{tag}_{base_daily_transaction['date']}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}" + daily_transaction = DailyTagSpendTransaction( + tag=tag, **base_daily_transaction + ) + + await self.daily_tag_spend_update_queue.add_update( + update={daily_transaction_key: daily_transaction} + ) diff --git a/litellm/proxy/db/db_transaction_queue/redis_update_buffer.py b/litellm/proxy/db/db_transaction_queue/redis_update_buffer.py index 828778e288..03bd9dca9e 100644 --- a/litellm/proxy/db/db_transaction_queue/redis_update_buffer.py +++ b/litellm/proxy/db/db_transaction_queue/redis_update_buffer.py @@ -13,6 +13,7 @@ from litellm.caching import RedisCache from litellm.constants import ( MAX_REDIS_BUFFER_DEQUEUE_COUNT, REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY, + REDIS_DAILY_TAG_SPEND_UPDATE_BUFFER_KEY, REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY, REDIS_UPDATE_BUFFER_KEY, ) @@ -68,11 +69,41 @@ class RedisUpdateBuffer: return False return _use_redis_transaction_buffer + async def _store_transactions_in_redis( + self, + transactions: Any, + redis_key: str, + service_type: ServiceTypes, + ) -> None: + """ + Helper method to store transactions in Redis and emit an event + + Args: + transactions: The transactions to store + redis_key: The Redis key to store under + service_type: The service type for event emission + """ + if transactions is None or len(transactions) == 0: + return + + list_of_transactions = [safe_dumps(transactions)] + if self.redis_cache is None: + return + current_redis_buffer_size = await self.redis_cache.async_rpush( + key=redis_key, + values=list_of_transactions, + ) + await self._emit_new_item_added_to_redis_buffer_event( + queue_size=current_redis_buffer_size, + service=service_type, + ) + async def store_in_memory_spend_updates_in_redis( self, spend_update_queue: SpendUpdateQueue, daily_spend_update_queue: DailySpendUpdateQueue, daily_team_spend_update_queue: DailySpendUpdateQueue, + daily_tag_spend_update_queue: DailySpendUpdateQueue, ): """ Stores the in-memory spend updates to Redis @@ -124,18 +155,23 @@ class RedisUpdateBuffer: ) return + # Get all transactions db_spend_update_transactions = ( await spend_update_queue.flush_and_get_aggregated_db_spend_update_transactions() ) - verbose_proxy_logger.debug( - "ALL DB SPEND UPDATE TRANSACTIONS: %s", db_spend_update_transactions - ) daily_spend_update_transactions = ( await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions() ) daily_team_spend_update_transactions = ( await daily_team_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions() ) + daily_tag_spend_update_transactions = ( + await daily_tag_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions() + ) + + verbose_proxy_logger.debug( + "ALL DB SPEND UPDATE TRANSACTIONS: %s", db_spend_update_transactions + ) verbose_proxy_logger.debug( "ALL DAILY SPEND UPDATE TRANSACTIONS: %s", daily_spend_update_transactions ) @@ -147,40 +183,29 @@ class RedisUpdateBuffer: ): return - list_of_transactions = [safe_dumps(db_spend_update_transactions)] - current_redis_buffer_size = await self.redis_cache.async_rpush( - key=REDIS_UPDATE_BUFFER_KEY, - values=list_of_transactions, - ) - await self._emit_new_item_added_to_redis_buffer_event( - queue_size=current_redis_buffer_size, - service=ServiceTypes.REDIS_SPEND_UPDATE_QUEUE, + # Store all transaction types using the helper method + await self._store_transactions_in_redis( + transactions=db_spend_update_transactions, + redis_key=REDIS_UPDATE_BUFFER_KEY, + service_type=ServiceTypes.REDIS_SPEND_UPDATE_QUEUE, ) - list_of_daily_spend_update_transactions = [ - safe_dumps(daily_spend_update_transactions) - ] - - current_redis_buffer_size = await self.redis_cache.async_rpush( - key=REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY, - values=list_of_daily_spend_update_transactions, - ) - await self._emit_new_item_added_to_redis_buffer_event( - queue_size=current_redis_buffer_size, - service=ServiceTypes.REDIS_DAILY_SPEND_UPDATE_QUEUE, + await self._store_transactions_in_redis( + transactions=daily_spend_update_transactions, + redis_key=REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY, + service_type=ServiceTypes.REDIS_DAILY_SPEND_UPDATE_QUEUE, ) - list_of_daily_team_spend_update_transactions = [ - safe_dumps(daily_team_spend_update_transactions) - ] - - current_redis_buffer_size = await self.redis_cache.async_rpush( - key=REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY, - values=list_of_daily_team_spend_update_transactions, + await self._store_transactions_in_redis( + transactions=daily_team_spend_update_transactions, + redis_key=REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY, + service_type=ServiceTypes.REDIS_DAILY_TEAM_SPEND_UPDATE_QUEUE, ) - await self._emit_new_item_added_to_redis_buffer_event( - queue_size=current_redis_buffer_size, - service=ServiceTypes.REDIS_DAILY_TEAM_SPEND_UPDATE_QUEUE, + + await self._store_transactions_in_redis( + transactions=daily_tag_spend_update_transactions, + redis_key=REDIS_DAILY_TAG_SPEND_UPDATE_BUFFER_KEY, + service_type=ServiceTypes.REDIS_DAILY_TAG_SPEND_UPDATE_QUEUE, ) @staticmethod diff --git a/litellm/proxy/schema.prisma b/litellm/proxy/schema.prisma index 845f05f14b..b21550413b 100644 --- a/litellm/proxy/schema.prisma +++ b/litellm/proxy/schema.prisma @@ -353,6 +353,8 @@ model LiteLLM_DailyTeamSpend { custom_llm_provider String? prompt_tokens Int @default(0) completion_tokens Int @default(0) + cache_read_input_tokens Int @default(0) + cache_creation_input_tokens Int @default(0) spend Float @default(0.0) api_requests Int @default(0) successful_requests Int @default(0) @@ -367,6 +369,33 @@ model LiteLLM_DailyTeamSpend { @@index([model]) } +// Track daily team spend metrics per model and key +model LiteLLM_DailyTagSpend { + id String @id @default(uuid()) + tag String @unique + date String + api_key String + model String + model_group String? + custom_llm_provider String? + prompt_tokens Int @default(0) + completion_tokens Int @default(0) + cache_read_input_tokens Int @default(0) + cache_creation_input_tokens Int @default(0) + spend Float @default(0.0) + api_requests Int @default(0) + successful_requests Int @default(0) + failed_requests Int @default(0) + created_at DateTime @default(now()) + updated_at DateTime @updatedAt + + @@unique([tag, date, api_key, model, custom_llm_provider]) + @@index([date]) + @@index([tag]) + @@index([api_key]) + @@index([model]) +} + // Track the status of cron jobs running. Only allow one pod to run the job at a time model LiteLLM_CronJob { diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index c722a92cf7..6e8c65710d 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -2796,50 +2796,3 @@ def _premium_user_check(): "error": f"This feature is only available for LiteLLM Enterprise users. {CommonProxyErrors.not_premium_user.value}" }, ) - - -async def _update_daily_spend_batch(prisma_client, spend_aggregates): - """Helper function to update daily spend in batches""" - async with prisma_client.db.batch_() as batcher: - for ( - user_id, - date, - api_key, - model, - model_group, - provider, - ), metrics in spend_aggregates.items(): - if not user_id: # Skip if no user_id - continue - - batcher.litellm_dailyuserspend.upsert( - where={ - "user_id_date_api_key_model_custom_llm_provider": { - "user_id": user_id, - "date": date, - "api_key": api_key, - "model": model, - "custom_llm_provider": provider, - } - }, - data={ - "create": { - "user_id": user_id, - "date": date, - "api_key": api_key, - "model": model, - "model_group": model_group, - "custom_llm_provider": provider, - "prompt_tokens": metrics["prompt_tokens"], - "completion_tokens": metrics["completion_tokens"], - "spend": metrics["spend"], - }, - "update": { - "prompt_tokens": {"increment": metrics["prompt_tokens"]}, - "completion_tokens": { - "increment": metrics["completion_tokens"] - }, - "spend": {"increment": metrics["spend"]}, - }, - }, - ) diff --git a/litellm/types/services.py b/litellm/types/services.py index 6c788c8956..e038100e27 100644 --- a/litellm/types/services.py +++ b/litellm/types/services.py @@ -34,6 +34,7 @@ class ServiceTypes(str, enum.Enum): IN_MEMORY_DAILY_SPEND_UPDATE_QUEUE = "in_memory_daily_spend_update_queue" REDIS_DAILY_SPEND_UPDATE_QUEUE = "redis_daily_spend_update_queue" REDIS_DAILY_TEAM_SPEND_UPDATE_QUEUE = "redis_daily_team_spend_update_queue" + REDIS_DAILY_TAG_SPEND_UPDATE_QUEUE = "redis_daily_tag_spend_update_queue" # spend update queue - current spend of key, user, team IN_MEMORY_SPEND_UPDATE_QUEUE = "in_memory_spend_update_queue" REDIS_SPEND_UPDATE_QUEUE = "redis_spend_update_queue" diff --git a/schema.prisma b/schema.prisma index 845f05f14b..b21550413b 100644 --- a/schema.prisma +++ b/schema.prisma @@ -353,6 +353,8 @@ model LiteLLM_DailyTeamSpend { custom_llm_provider String? prompt_tokens Int @default(0) completion_tokens Int @default(0) + cache_read_input_tokens Int @default(0) + cache_creation_input_tokens Int @default(0) spend Float @default(0.0) api_requests Int @default(0) successful_requests Int @default(0) @@ -367,6 +369,33 @@ model LiteLLM_DailyTeamSpend { @@index([model]) } +// Track daily team spend metrics per model and key +model LiteLLM_DailyTagSpend { + id String @id @default(uuid()) + tag String @unique + date String + api_key String + model String + model_group String? + custom_llm_provider String? + prompt_tokens Int @default(0) + completion_tokens Int @default(0) + cache_read_input_tokens Int @default(0) + cache_creation_input_tokens Int @default(0) + spend Float @default(0.0) + api_requests Int @default(0) + successful_requests Int @default(0) + failed_requests Int @default(0) + created_at DateTime @default(now()) + updated_at DateTime @updatedAt + + @@unique([tag, date, api_key, model, custom_llm_provider]) + @@index([date]) + @@index([tag]) + @@index([api_key]) + @@index([model]) +} + // Track the status of cron jobs running. Only allow one pod to run the job at a time model LiteLLM_CronJob { From 5c078af73895212e1d5b1e40d372745aa8ac50b2 Mon Sep 17 00:00:00 2001 From: Peter Dave Hello Date: Thu, 17 Apr 2025 03:40:13 +0800 Subject: [PATCH 2/2] Add OpenAI o3 & 4o-mini (#10065) Reference: - https://platform.openai.com/docs/models/o3 - https://platform.openai.com/docs/models/o4-mini --- ...odel_prices_and_context_window_backup.json | 68 +++++++++++++++++++ model_prices_and_context_window.json | 68 +++++++++++++++++++ 2 files changed, 136 insertions(+) diff --git a/litellm/model_prices_and_context_window_backup.json b/litellm/model_prices_and_context_window_backup.json index d42762355c..9918743bc9 100644 --- a/litellm/model_prices_and_context_window_backup.json +++ b/litellm/model_prices_and_context_window_backup.json @@ -600,6 +600,40 @@ "supports_vision": true, "supports_prompt_caching": true }, + "o3": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1e-5, + "output_cost_per_token": 4e-5, + "cache_read_input_token_cost": 2.5e-6, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, + "o3-2025-04-16": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1e-5, + "output_cost_per_token": 4e-5, + "cache_read_input_token_cost": 2.5e-6, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, "o3-mini": { "max_tokens": 100000, "max_input_tokens": 200000, @@ -634,6 +668,40 @@ "supports_reasoning": true, "supports_tool_choice": true }, + "o4-mini": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1.1e-6, + "output_cost_per_token": 4.4e-6, + "cache_read_input_token_cost": 2.75e-7, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, + "o4-mini-2025-04-16": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1.1e-6, + "output_cost_per_token": 4.4e-6, + "cache_read_input_token_cost": 2.75e-7, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, "o1-mini-2024-09-12": { "max_tokens": 65536, "max_input_tokens": 128000, diff --git a/model_prices_and_context_window.json b/model_prices_and_context_window.json index d42762355c..9918743bc9 100644 --- a/model_prices_and_context_window.json +++ b/model_prices_and_context_window.json @@ -600,6 +600,40 @@ "supports_vision": true, "supports_prompt_caching": true }, + "o3": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1e-5, + "output_cost_per_token": 4e-5, + "cache_read_input_token_cost": 2.5e-6, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, + "o3-2025-04-16": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1e-5, + "output_cost_per_token": 4e-5, + "cache_read_input_token_cost": 2.5e-6, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, "o3-mini": { "max_tokens": 100000, "max_input_tokens": 200000, @@ -634,6 +668,40 @@ "supports_reasoning": true, "supports_tool_choice": true }, + "o4-mini": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1.1e-6, + "output_cost_per_token": 4.4e-6, + "cache_read_input_token_cost": 2.75e-7, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, + "o4-mini-2025-04-16": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1.1e-6, + "output_cost_per_token": 4.4e-6, + "cache_read_input_token_cost": 2.75e-7, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, "o1-mini-2024-09-12": { "max_tokens": 65536, "max_input_tokens": 128000,