diff --git a/litellm-proxy-extras/litellm_proxy_extras/migrations/20250416115320_add_tag_table_to_db/migration.sql b/litellm-proxy-extras/litellm_proxy_extras/migrations/20250416115320_add_tag_table_to_db/migration.sql new file mode 100644 index 0000000000..8c3cea7093 --- /dev/null +++ b/litellm-proxy-extras/litellm_proxy_extras/migrations/20250416115320_add_tag_table_to_db/migration.sql @@ -0,0 +1,45 @@ +-- AlterTable +ALTER TABLE "LiteLLM_DailyTeamSpend" ADD COLUMN "cache_creation_input_tokens" INTEGER NOT NULL DEFAULT 0, +ADD COLUMN "cache_read_input_tokens" INTEGER NOT NULL DEFAULT 0; + +-- CreateTable +CREATE TABLE "LiteLLM_DailyTagSpend" ( + "id" TEXT NOT NULL, + "tag" TEXT NOT NULL, + "date" TEXT NOT NULL, + "api_key" TEXT NOT NULL, + "model" TEXT NOT NULL, + "model_group" TEXT, + "custom_llm_provider" TEXT, + "prompt_tokens" INTEGER NOT NULL DEFAULT 0, + "completion_tokens" INTEGER NOT NULL DEFAULT 0, + "cache_read_input_tokens" INTEGER NOT NULL DEFAULT 0, + "cache_creation_input_tokens" INTEGER NOT NULL DEFAULT 0, + "spend" DOUBLE PRECISION NOT NULL DEFAULT 0.0, + "api_requests" INTEGER NOT NULL DEFAULT 0, + "successful_requests" INTEGER NOT NULL DEFAULT 0, + "failed_requests" INTEGER NOT NULL DEFAULT 0, + "created_at" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updated_at" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "LiteLLM_DailyTagSpend_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "LiteLLM_DailyTagSpend_tag_key" ON "LiteLLM_DailyTagSpend"("tag"); + +-- CreateIndex +CREATE INDEX "LiteLLM_DailyTagSpend_date_idx" ON "LiteLLM_DailyTagSpend"("date"); + +-- CreateIndex +CREATE INDEX "LiteLLM_DailyTagSpend_tag_idx" ON "LiteLLM_DailyTagSpend"("tag"); + +-- CreateIndex +CREATE INDEX "LiteLLM_DailyTagSpend_api_key_idx" ON "LiteLLM_DailyTagSpend"("api_key"); + +-- CreateIndex +CREATE INDEX "LiteLLM_DailyTagSpend_model_idx" ON "LiteLLM_DailyTagSpend"("model"); + +-- CreateIndex +CREATE UNIQUE INDEX "LiteLLM_DailyTagSpend_tag_date_api_key_model_custom_llm_pro_key" ON "LiteLLM_DailyTagSpend"("tag", "date", "api_key", "model", "custom_llm_provider"); + diff --git a/litellm-proxy-extras/litellm_proxy_extras/migrations/20250416151339_drop_tag_uniqueness_requirement/migration.sql b/litellm-proxy-extras/litellm_proxy_extras/migrations/20250416151339_drop_tag_uniqueness_requirement/migration.sql new file mode 100644 index 0000000000..5c27b84efb --- /dev/null +++ b/litellm-proxy-extras/litellm_proxy_extras/migrations/20250416151339_drop_tag_uniqueness_requirement/migration.sql @@ -0,0 +1,3 @@ +-- DropIndex +DROP INDEX "LiteLLM_DailyTagSpend_tag_key"; + diff --git a/litellm-proxy-extras/litellm_proxy_extras/schema.prisma b/litellm-proxy-extras/litellm_proxy_extras/schema.prisma index 976b5e1508..68e9382d75 100644 --- a/litellm-proxy-extras/litellm_proxy_extras/schema.prisma +++ b/litellm-proxy-extras/litellm_proxy_extras/schema.prisma @@ -354,6 +354,8 @@ model LiteLLM_DailyTeamSpend { custom_llm_provider String? prompt_tokens Int @default(0) completion_tokens Int @default(0) + cache_read_input_tokens Int @default(0) + cache_creation_input_tokens Int @default(0) spend Float @default(0.0) api_requests Int @default(0) successful_requests Int @default(0) @@ -368,6 +370,33 @@ model LiteLLM_DailyTeamSpend { @@index([model]) } +// Track daily team spend metrics per model and key +model LiteLLM_DailyTagSpend { + id String @id @default(uuid()) + tag String + date String + api_key String + model String + model_group String? + custom_llm_provider String? + prompt_tokens Int @default(0) + completion_tokens Int @default(0) + cache_read_input_tokens Int @default(0) + cache_creation_input_tokens Int @default(0) + spend Float @default(0.0) + api_requests Int @default(0) + successful_requests Int @default(0) + failed_requests Int @default(0) + created_at DateTime @default(now()) + updated_at DateTime @updatedAt + + @@unique([tag, date, api_key, model, custom_llm_provider]) + @@index([date]) + @@index([tag]) + @@index([api_key]) + @@index([model]) +} + // Track the status of cron jobs running. Only allow one pod to run the job at a time model LiteLLM_CronJob { diff --git a/litellm/constants.py b/litellm/constants.py index f25c7c71e4..9c30dc06a2 100644 --- a/litellm/constants.py +++ b/litellm/constants.py @@ -28,6 +28,7 @@ _DEFAULT_TTL_FOR_HTTPX_CLIENTS = 3600 # 1 hour, re-use the same httpx client fo REDIS_UPDATE_BUFFER_KEY = "litellm_spend_update_buffer" REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_spend_update_buffer" REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_team_spend_update_buffer" +REDIS_DAILY_TAG_SPEND_UPDATE_BUFFER_KEY = "litellm_daily_tag_spend_update_buffer" MAX_REDIS_BUFFER_DEQUEUE_COUNT = 100 MAX_SIZE_IN_MEMORY_QUEUE = 10000 MAX_IN_MEMORY_QUEUE_FLUSH_COUNT = 1000 diff --git a/litellm/model_prices_and_context_window_backup.json b/litellm/model_prices_and_context_window_backup.json index d42762355c..9918743bc9 100644 --- a/litellm/model_prices_and_context_window_backup.json +++ b/litellm/model_prices_and_context_window_backup.json @@ -600,6 +600,40 @@ "supports_vision": true, "supports_prompt_caching": true }, + "o3": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1e-5, + "output_cost_per_token": 4e-5, + "cache_read_input_token_cost": 2.5e-6, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, + "o3-2025-04-16": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1e-5, + "output_cost_per_token": 4e-5, + "cache_read_input_token_cost": 2.5e-6, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, "o3-mini": { "max_tokens": 100000, "max_input_tokens": 200000, @@ -634,6 +668,40 @@ "supports_reasoning": true, "supports_tool_choice": true }, + "o4-mini": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1.1e-6, + "output_cost_per_token": 4.4e-6, + "cache_read_input_token_cost": 2.75e-7, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, + "o4-mini-2025-04-16": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1.1e-6, + "output_cost_per_token": 4.4e-6, + "cache_read_input_token_cost": 2.75e-7, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, "o1-mini-2024-09-12": { "max_tokens": 65536, "max_input_tokens": 128000, diff --git a/litellm/proxy/_experimental/out/onboarding.html b/litellm/proxy/_experimental/out/onboarding.html deleted file mode 100644 index f14d344ab3..0000000000 --- a/litellm/proxy/_experimental/out/onboarding.html +++ /dev/null @@ -1 +0,0 @@ -LiteLLM Dashboard \ No newline at end of file diff --git a/litellm/proxy/_types.py b/litellm/proxy/_types.py index b0ed81af79..6be4153157 100644 --- a/litellm/proxy/_types.py +++ b/litellm/proxy/_types.py @@ -2804,6 +2804,10 @@ class DailyUserSpendTransaction(BaseDailySpendTransaction): user_id: str +class DailyTagSpendTransaction(BaseDailySpendTransaction): + tag: str + + class DBSpendUpdateTransactions(TypedDict): """ Internal Data Structure for buffering spend updates in Redis or in memory before committing them to the database diff --git a/litellm/proxy/db/db_spend_update_writer.py b/litellm/proxy/db/db_spend_update_writer.py index e6bc0c3b7a..61ea930387 100644 --- a/litellm/proxy/db/db_spend_update_writer.py +++ b/litellm/proxy/db/db_spend_update_writer.py @@ -11,7 +11,7 @@ import os import time import traceback from datetime import datetime, timedelta -from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union, cast, overload import litellm from litellm._logging import verbose_proxy_logger @@ -20,6 +20,7 @@ from litellm.constants import DB_SPEND_UPDATE_JOB_NAME from litellm.proxy._types import ( DB_CONNECTION_ERROR_TYPES, BaseDailySpendTransaction, + DailyTagSpendTransaction, DailyTeamSpendTransaction, DailyUserSpendTransaction, DBSpendUpdateTransactions, @@ -61,6 +62,7 @@ class DBSpendUpdateWriter: self.spend_update_queue = SpendUpdateQueue() self.daily_spend_update_queue = DailySpendUpdateQueue() self.daily_team_spend_update_queue = DailySpendUpdateQueue() + self.daily_tag_spend_update_queue = DailySpendUpdateQueue() async def update_database( # LiteLLM management object fields @@ -170,6 +172,13 @@ class DBSpendUpdateWriter: ) ) + asyncio.create_task( + self.add_spend_log_transaction_to_daily_tag_transaction( + payload=payload, + prisma_client=prisma_client, + ) + ) + verbose_proxy_logger.debug("Runs spend update on all tables") except Exception: verbose_proxy_logger.debug( @@ -394,6 +403,7 @@ class DBSpendUpdateWriter: spend_update_queue=self.spend_update_queue, daily_spend_update_queue=self.daily_spend_update_queue, daily_team_spend_update_queue=self.daily_team_spend_update_queue, + daily_tag_spend_update_queue=self.daily_tag_spend_update_queue, ) # Only commit from redis to db if this pod is the leader @@ -495,6 +505,20 @@ class DBSpendUpdateWriter: daily_spend_transactions=daily_team_spend_update_transactions, ) + ################## Daily Tag Spend Update Transactions ################## + # Aggregate all in memory daily tag spend transactions and commit to db + daily_tag_spend_update_transactions = cast( + Dict[str, DailyTagSpendTransaction], + await self.daily_tag_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions(), + ) + + await DBSpendUpdateWriter.update_daily_tag_spend( + n_retry_times=n_retry_times, + prisma_client=prisma_client, + proxy_logging_obj=proxy_logging_obj, + daily_spend_transactions=daily_tag_spend_update_transactions, + ) + async def _commit_spend_updates_to_db( # noqa: PLR0915 self, prisma_client: PrismaClient, @@ -740,6 +764,208 @@ class DBSpendUpdateWriter: e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj ) + @overload + @staticmethod + async def _update_daily_spend( + n_retry_times: int, + prisma_client: PrismaClient, + proxy_logging_obj: ProxyLogging, + daily_spend_transactions: Dict[str, DailyUserSpendTransaction], + entity_type: Literal["user"], + entity_id_field: str, + table_name: str, + unique_constraint_name: str, + ) -> None: + ... + + @overload + @staticmethod + async def _update_daily_spend( + n_retry_times: int, + prisma_client: PrismaClient, + proxy_logging_obj: ProxyLogging, + daily_spend_transactions: Dict[str, DailyTeamSpendTransaction], + entity_type: Literal["team"], + entity_id_field: str, + table_name: str, + unique_constraint_name: str, + ) -> None: + ... + + @overload + @staticmethod + async def _update_daily_spend( + n_retry_times: int, + prisma_client: PrismaClient, + proxy_logging_obj: ProxyLogging, + daily_spend_transactions: Dict[str, DailyTagSpendTransaction], + entity_type: Literal["tag"], + entity_id_field: str, + table_name: str, + unique_constraint_name: str, + ) -> None: + ... + + @staticmethod + async def _update_daily_spend( + n_retry_times: int, + prisma_client: PrismaClient, + proxy_logging_obj: ProxyLogging, + daily_spend_transactions: Union[ + Dict[str, DailyUserSpendTransaction], + Dict[str, DailyTeamSpendTransaction], + Dict[str, DailyTagSpendTransaction], + ], + entity_type: Literal["user", "team", "tag"], + entity_id_field: str, + table_name: str, + unique_constraint_name: str, + ) -> None: + """ + Generic function to update daily spend for any entity type (user, team, tag) + """ + from litellm.proxy.utils import _raise_failed_update_spend_exception + + verbose_proxy_logger.debug( + f"Daily {entity_type.capitalize()} Spend transactions: {len(daily_spend_transactions)}" + ) + BATCH_SIZE = 100 + start_time = time.time() + + try: + for i in range(n_retry_times + 1): + try: + transactions_to_process = dict( + list(daily_spend_transactions.items())[:BATCH_SIZE] + ) + + if len(transactions_to_process) == 0: + verbose_proxy_logger.debug( + f"No new transactions to process for daily {entity_type} spend update" + ) + break + + async with prisma_client.db.batch_() as batcher: + for _, transaction in transactions_to_process.items(): + entity_id = transaction.get(entity_id_field) + if not entity_id: + continue + + # Construct the where clause dynamically + where_clause = { + unique_constraint_name: { + entity_id_field: entity_id, + "date": transaction["date"], + "api_key": transaction["api_key"], + "model": transaction["model"], + "custom_llm_provider": transaction.get( + "custom_llm_provider" + ), + } + } + + # Get the table dynamically + table = getattr(batcher, table_name) + + # Common data structure for both create and update + common_data = { + entity_id_field: entity_id, + "date": transaction["date"], + "api_key": transaction["api_key"], + "model": transaction["model"], + "model_group": transaction.get("model_group"), + "custom_llm_provider": transaction.get( + "custom_llm_provider" + ), + "prompt_tokens": transaction["prompt_tokens"], + "completion_tokens": transaction["completion_tokens"], + "spend": transaction["spend"], + "api_requests": transaction["api_requests"], + "successful_requests": transaction[ + "successful_requests" + ], + "failed_requests": transaction["failed_requests"], + } + + # Add cache-related fields if they exist + if "cache_read_input_tokens" in transaction: + common_data[ + "cache_read_input_tokens" + ] = transaction.get("cache_read_input_tokens", 0) + if "cache_creation_input_tokens" in transaction: + common_data[ + "cache_creation_input_tokens" + ] = transaction.get("cache_creation_input_tokens", 0) + + # Create update data structure + update_data = { + "prompt_tokens": { + "increment": transaction["prompt_tokens"] + }, + "completion_tokens": { + "increment": transaction["completion_tokens"] + }, + "spend": {"increment": transaction["spend"]}, + "api_requests": { + "increment": transaction["api_requests"] + }, + "successful_requests": { + "increment": transaction["successful_requests"] + }, + "failed_requests": { + "increment": transaction["failed_requests"] + }, + } + + # Add cache-related fields to update if they exist + if "cache_read_input_tokens" in transaction: + update_data["cache_read_input_tokens"] = { + "increment": transaction.get( + "cache_read_input_tokens", 0 + ) + } + if "cache_creation_input_tokens" in transaction: + update_data["cache_creation_input_tokens"] = { + "increment": transaction.get( + "cache_creation_input_tokens", 0 + ) + } + + table.upsert( + where=where_clause, + data={ + "create": common_data, + "update": update_data, + }, + ) + + verbose_proxy_logger.info( + f"Processed {len(transactions_to_process)} daily {entity_type} transactions in {time.time() - start_time:.2f}s" + ) + + # Remove processed transactions + for key in transactions_to_process.keys(): + daily_spend_transactions.pop(key, None) + + break + + except DB_CONNECTION_ERROR_TYPES as e: + if i >= n_retry_times: + _raise_failed_update_spend_exception( + e=e, + start_time=start_time, + proxy_logging_obj=proxy_logging_obj, + ) + await asyncio.sleep(2**i) + + except Exception as e: + if "transactions_to_process" in locals(): + for key in transactions_to_process.keys(): # type: ignore + daily_spend_transactions.pop(key, None) + _raise_failed_update_spend_exception( + e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj + ) + @staticmethod async def update_daily_user_spend( n_retry_times: int, @@ -750,144 +976,16 @@ class DBSpendUpdateWriter: """ Batch job to update LiteLLM_DailyUserSpend table using in-memory daily_spend_transactions """ - from litellm.proxy.utils import _raise_failed_update_spend_exception - - ### UPDATE DAILY USER SPEND ### - verbose_proxy_logger.debug( - "Daily User Spend transactions: {}".format(len(daily_spend_transactions)) + await DBSpendUpdateWriter._update_daily_spend( + n_retry_times=n_retry_times, + prisma_client=prisma_client, + proxy_logging_obj=proxy_logging_obj, + daily_spend_transactions=daily_spend_transactions, + entity_type="user", + entity_id_field="user_id", + table_name="litellm_dailyuserspend", + unique_constraint_name="user_id_date_api_key_model_custom_llm_provider", ) - BATCH_SIZE = ( - 100 # Number of aggregated records to update in each database operation - ) - start_time = time.time() - - try: - for i in range(n_retry_times + 1): - try: - # Get transactions to process - transactions_to_process = dict( - list(daily_spend_transactions.items())[:BATCH_SIZE] - ) - - if len(transactions_to_process) == 0: - verbose_proxy_logger.debug( - "No new transactions to process for daily spend update" - ) - break - - # Update DailyUserSpend table in batches - async with prisma_client.db.batch_() as batcher: - for _, transaction in transactions_to_process.items(): - user_id = transaction.get("user_id") - if not user_id: # Skip if no user_id - continue - - batcher.litellm_dailyuserspend.upsert( - where={ - "user_id_date_api_key_model_custom_llm_provider": { - "user_id": user_id, - "date": transaction["date"], - "api_key": transaction["api_key"], - "model": transaction["model"], - "custom_llm_provider": transaction.get( - "custom_llm_provider" - ), - } - }, - data={ - "create": { - "user_id": user_id, - "date": transaction["date"], - "api_key": transaction["api_key"], - "model": transaction["model"], - "model_group": transaction.get("model_group"), - "custom_llm_provider": transaction.get( - "custom_llm_provider" - ), - "prompt_tokens": transaction["prompt_tokens"], - "completion_tokens": transaction[ - "completion_tokens" - ], - "cache_read_input_tokens": transaction.get( - "cache_read_input_tokens", 0 - ), - "cache_creation_input_tokens": transaction.get( - "cache_creation_input_tokens", 0 - ), - "spend": transaction["spend"], - "api_requests": transaction["api_requests"], - "successful_requests": transaction[ - "successful_requests" - ], - "failed_requests": transaction[ - "failed_requests" - ], - }, - "update": { - "prompt_tokens": { - "increment": transaction["prompt_tokens"] - }, - "completion_tokens": { - "increment": transaction[ - "completion_tokens" - ] - }, - "cache_read_input_tokens": { - "increment": transaction.get( - "cache_read_input_tokens", 0 - ) - }, - "cache_creation_input_tokens": { - "increment": transaction.get( - "cache_creation_input_tokens", 0 - ) - }, - "spend": {"increment": transaction["spend"]}, - "api_requests": { - "increment": transaction["api_requests"] - }, - "successful_requests": { - "increment": transaction[ - "successful_requests" - ] - }, - "failed_requests": { - "increment": transaction["failed_requests"] - }, - }, - }, - ) - - verbose_proxy_logger.info( - f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s" - ) - - # Remove processed transactions - for key in transactions_to_process.keys(): - daily_spend_transactions.pop(key, None) - - verbose_proxy_logger.debug( - f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s" - ) - break - - except DB_CONNECTION_ERROR_TYPES as e: - if i >= n_retry_times: - _raise_failed_update_spend_exception( - e=e, - start_time=start_time, - proxy_logging_obj=proxy_logging_obj, - ) - await asyncio.sleep(2**i) # Exponential backoff - - except Exception as e: - # Remove processed transactions even if there was an error - if "transactions_to_process" in locals(): - for key in transactions_to_process.keys(): # type: ignore - daily_spend_transactions.pop(key, None) - _raise_failed_update_spend_exception( - e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj - ) @staticmethod async def update_daily_team_spend( @@ -899,140 +997,53 @@ class DBSpendUpdateWriter: """ Batch job to update LiteLLM_DailyTeamSpend table using in-memory daily_spend_transactions """ - from litellm.proxy.utils import _raise_failed_update_spend_exception - - ### UPDATE DAILY USER SPEND ### - verbose_proxy_logger.debug( - "Daily Team Spend transactions: {}".format(len(daily_spend_transactions)) + await DBSpendUpdateWriter._update_daily_spend( + n_retry_times=n_retry_times, + prisma_client=prisma_client, + proxy_logging_obj=proxy_logging_obj, + daily_spend_transactions=daily_spend_transactions, + entity_type="team", + entity_id_field="team_id", + table_name="litellm_dailyteamspend", + unique_constraint_name="team_id_date_api_key_model_custom_llm_provider", ) - BATCH_SIZE = ( - 100 # Number of aggregated records to update in each database operation + + @staticmethod + async def update_daily_tag_spend( + n_retry_times: int, + prisma_client: PrismaClient, + proxy_logging_obj: ProxyLogging, + daily_spend_transactions: Dict[str, DailyTagSpendTransaction], + ): + """ + Batch job to update LiteLLM_DailyTagSpend table using in-memory daily_spend_transactions + """ + await DBSpendUpdateWriter._update_daily_spend( + n_retry_times=n_retry_times, + prisma_client=prisma_client, + proxy_logging_obj=proxy_logging_obj, + daily_spend_transactions=daily_spend_transactions, + entity_type="tag", + entity_id_field="tag", + table_name="litellm_dailytagspend", + unique_constraint_name="tag_date_api_key_model_custom_llm_provider", ) - start_time = time.time() - - try: - for i in range(n_retry_times + 1): - try: - # Get transactions to process - transactions_to_process = dict( - list(daily_spend_transactions.items())[:BATCH_SIZE] - ) - - if len(transactions_to_process) == 0: - verbose_proxy_logger.debug( - "No new transactions to process for daily spend update" - ) - break - - # Update DailyUserSpend table in batches - async with prisma_client.db.batch_() as batcher: - for _, transaction in transactions_to_process.items(): - team_id = transaction.get("team_id") - if not team_id: # Skip if no team_id - continue - - batcher.litellm_dailyteamspend.upsert( - where={ - "team_id_date_api_key_model_custom_llm_provider": { - "team_id": team_id, - "date": transaction["date"], - "api_key": transaction["api_key"], - "model": transaction["model"], - "custom_llm_provider": transaction.get( - "custom_llm_provider" - ), - } - }, - data={ - "create": { - "team_id": team_id, - "date": transaction["date"], - "api_key": transaction["api_key"], - "model": transaction["model"], - "model_group": transaction.get("model_group"), - "custom_llm_provider": transaction.get( - "custom_llm_provider" - ), - "prompt_tokens": transaction["prompt_tokens"], - "completion_tokens": transaction[ - "completion_tokens" - ], - "spend": transaction["spend"], - "api_requests": transaction["api_requests"], - "successful_requests": transaction[ - "successful_requests" - ], - "failed_requests": transaction[ - "failed_requests" - ], - }, - "update": { - "prompt_tokens": { - "increment": transaction["prompt_tokens"] - }, - "completion_tokens": { - "increment": transaction[ - "completion_tokens" - ] - }, - "spend": {"increment": transaction["spend"]}, - "api_requests": { - "increment": transaction["api_requests"] - }, - "successful_requests": { - "increment": transaction[ - "successful_requests" - ] - }, - "failed_requests": { - "increment": transaction["failed_requests"] - }, - }, - }, - ) - - verbose_proxy_logger.info( - f"Processed {len(transactions_to_process)} daily team transactions in {time.time() - start_time:.2f}s" - ) - - # Remove processed transactions - for key in transactions_to_process.keys(): - daily_spend_transactions.pop(key, None) - - verbose_proxy_logger.debug( - f"Processed {len(transactions_to_process)} daily spend transactions in {time.time() - start_time:.2f}s" - ) - break - - except DB_CONNECTION_ERROR_TYPES as e: - if i >= n_retry_times: - _raise_failed_update_spend_exception( - e=e, - start_time=start_time, - proxy_logging_obj=proxy_logging_obj, - ) - await asyncio.sleep(2**i) # Exponential backoff - - except Exception as e: - # Remove processed transactions even if there was an error - if "transactions_to_process" in locals(): - for key in transactions_to_process.keys(): # type: ignore - daily_spend_transactions.pop(key, None) - _raise_failed_update_spend_exception( - e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj - ) async def _common_add_spend_log_transaction_to_daily_transaction( self, payload: Union[dict, SpendLogsPayload], prisma_client: PrismaClient, - type: Literal["user", "team"] = "user", + type: Literal["user", "team", "request_tags"] = "user", ) -> Optional[BaseDailySpendTransaction]: common_expected_keys = ["startTime", "api_key", "model", "custom_llm_provider"] if type == "user": expected_keys = ["user", *common_expected_keys] - else: + elif type == "team": expected_keys = ["team_id", *common_expected_keys] + elif type == "request_tags": + expected_keys = ["request_tags", *common_expected_keys] + else: + raise ValueError(f"Invalid type: {type}") if not all(key in payload for key in expected_keys): verbose_proxy_logger.debug( @@ -1143,3 +1154,44 @@ class DBSpendUpdateWriter: await self.daily_team_spend_update_queue.add_update( update={daily_transaction_key: daily_transaction} ) + + async def add_spend_log_transaction_to_daily_tag_transaction( + self, + payload: SpendLogsPayload, + prisma_client: Optional[PrismaClient] = None, + ) -> None: + if prisma_client is None: + verbose_proxy_logger.debug( + "prisma_client is None. Skipping writing spend logs to db." + ) + return + + base_daily_transaction = ( + await self._common_add_spend_log_transaction_to_daily_transaction( + payload, prisma_client, "request_tags" + ) + ) + if base_daily_transaction is None: + return + if payload["request_tags"] is None: + verbose_proxy_logger.debug( + "request_tags is None for request. Skipping incrementing tag spend." + ) + return + + request_tags = [] + if isinstance(payload["request_tags"], str): + request_tags = json.loads(payload["request_tags"]) + elif isinstance(payload["request_tags"], list): + request_tags = payload["request_tags"] + else: + raise ValueError(f"Invalid request_tags: {payload['request_tags']}") + for tag in request_tags: + daily_transaction_key = f"{tag}_{base_daily_transaction['date']}_{payload['api_key']}_{payload['model']}_{payload['custom_llm_provider']}" + daily_transaction = DailyTagSpendTransaction( + tag=tag, **base_daily_transaction + ) + + await self.daily_tag_spend_update_queue.add_update( + update={daily_transaction_key: daily_transaction} + ) diff --git a/litellm/proxy/db/db_transaction_queue/redis_update_buffer.py b/litellm/proxy/db/db_transaction_queue/redis_update_buffer.py index 828778e288..03bd9dca9e 100644 --- a/litellm/proxy/db/db_transaction_queue/redis_update_buffer.py +++ b/litellm/proxy/db/db_transaction_queue/redis_update_buffer.py @@ -13,6 +13,7 @@ from litellm.caching import RedisCache from litellm.constants import ( MAX_REDIS_BUFFER_DEQUEUE_COUNT, REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY, + REDIS_DAILY_TAG_SPEND_UPDATE_BUFFER_KEY, REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY, REDIS_UPDATE_BUFFER_KEY, ) @@ -68,11 +69,41 @@ class RedisUpdateBuffer: return False return _use_redis_transaction_buffer + async def _store_transactions_in_redis( + self, + transactions: Any, + redis_key: str, + service_type: ServiceTypes, + ) -> None: + """ + Helper method to store transactions in Redis and emit an event + + Args: + transactions: The transactions to store + redis_key: The Redis key to store under + service_type: The service type for event emission + """ + if transactions is None or len(transactions) == 0: + return + + list_of_transactions = [safe_dumps(transactions)] + if self.redis_cache is None: + return + current_redis_buffer_size = await self.redis_cache.async_rpush( + key=redis_key, + values=list_of_transactions, + ) + await self._emit_new_item_added_to_redis_buffer_event( + queue_size=current_redis_buffer_size, + service=service_type, + ) + async def store_in_memory_spend_updates_in_redis( self, spend_update_queue: SpendUpdateQueue, daily_spend_update_queue: DailySpendUpdateQueue, daily_team_spend_update_queue: DailySpendUpdateQueue, + daily_tag_spend_update_queue: DailySpendUpdateQueue, ): """ Stores the in-memory spend updates to Redis @@ -124,18 +155,23 @@ class RedisUpdateBuffer: ) return + # Get all transactions db_spend_update_transactions = ( await spend_update_queue.flush_and_get_aggregated_db_spend_update_transactions() ) - verbose_proxy_logger.debug( - "ALL DB SPEND UPDATE TRANSACTIONS: %s", db_spend_update_transactions - ) daily_spend_update_transactions = ( await daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions() ) daily_team_spend_update_transactions = ( await daily_team_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions() ) + daily_tag_spend_update_transactions = ( + await daily_tag_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions() + ) + + verbose_proxy_logger.debug( + "ALL DB SPEND UPDATE TRANSACTIONS: %s", db_spend_update_transactions + ) verbose_proxy_logger.debug( "ALL DAILY SPEND UPDATE TRANSACTIONS: %s", daily_spend_update_transactions ) @@ -147,40 +183,29 @@ class RedisUpdateBuffer: ): return - list_of_transactions = [safe_dumps(db_spend_update_transactions)] - current_redis_buffer_size = await self.redis_cache.async_rpush( - key=REDIS_UPDATE_BUFFER_KEY, - values=list_of_transactions, - ) - await self._emit_new_item_added_to_redis_buffer_event( - queue_size=current_redis_buffer_size, - service=ServiceTypes.REDIS_SPEND_UPDATE_QUEUE, + # Store all transaction types using the helper method + await self._store_transactions_in_redis( + transactions=db_spend_update_transactions, + redis_key=REDIS_UPDATE_BUFFER_KEY, + service_type=ServiceTypes.REDIS_SPEND_UPDATE_QUEUE, ) - list_of_daily_spend_update_transactions = [ - safe_dumps(daily_spend_update_transactions) - ] - - current_redis_buffer_size = await self.redis_cache.async_rpush( - key=REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY, - values=list_of_daily_spend_update_transactions, - ) - await self._emit_new_item_added_to_redis_buffer_event( - queue_size=current_redis_buffer_size, - service=ServiceTypes.REDIS_DAILY_SPEND_UPDATE_QUEUE, + await self._store_transactions_in_redis( + transactions=daily_spend_update_transactions, + redis_key=REDIS_DAILY_SPEND_UPDATE_BUFFER_KEY, + service_type=ServiceTypes.REDIS_DAILY_SPEND_UPDATE_QUEUE, ) - list_of_daily_team_spend_update_transactions = [ - safe_dumps(daily_team_spend_update_transactions) - ] - - current_redis_buffer_size = await self.redis_cache.async_rpush( - key=REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY, - values=list_of_daily_team_spend_update_transactions, + await self._store_transactions_in_redis( + transactions=daily_team_spend_update_transactions, + redis_key=REDIS_DAILY_TEAM_SPEND_UPDATE_BUFFER_KEY, + service_type=ServiceTypes.REDIS_DAILY_TEAM_SPEND_UPDATE_QUEUE, ) - await self._emit_new_item_added_to_redis_buffer_event( - queue_size=current_redis_buffer_size, - service=ServiceTypes.REDIS_DAILY_TEAM_SPEND_UPDATE_QUEUE, + + await self._store_transactions_in_redis( + transactions=daily_tag_spend_update_transactions, + redis_key=REDIS_DAILY_TAG_SPEND_UPDATE_BUFFER_KEY, + service_type=ServiceTypes.REDIS_DAILY_TAG_SPEND_UPDATE_QUEUE, ) @staticmethod diff --git a/litellm/proxy/management_endpoints/common_daily_activity.py b/litellm/proxy/management_endpoints/common_daily_activity.py new file mode 100644 index 0000000000..e5604ed79d --- /dev/null +++ b/litellm/proxy/management_endpoints/common_daily_activity.py @@ -0,0 +1,254 @@ +from datetime import datetime +from typing import Any, Dict, List, Optional, Union + +from fastapi import HTTPException, status + +from litellm._logging import verbose_proxy_logger +from litellm.proxy._types import CommonProxyErrors +from litellm.proxy.utils import PrismaClient +from litellm.types.proxy.management_endpoints.common_daily_activity import ( + BreakdownMetrics, + DailySpendData, + DailySpendMetadata, + KeyMetadata, + KeyMetricWithMetadata, + MetricWithMetadata, + SpendAnalyticsPaginatedResponse, + SpendMetrics, +) + + +def update_metrics(existing_metrics: SpendMetrics, record: Any) -> SpendMetrics: + """Update metrics with new record data.""" + existing_metrics.spend += record.spend + existing_metrics.prompt_tokens += record.prompt_tokens + existing_metrics.completion_tokens += record.completion_tokens + existing_metrics.total_tokens += record.prompt_tokens + record.completion_tokens + existing_metrics.cache_read_input_tokens += record.cache_read_input_tokens + existing_metrics.cache_creation_input_tokens += record.cache_creation_input_tokens + existing_metrics.api_requests += record.api_requests + existing_metrics.successful_requests += record.successful_requests + existing_metrics.failed_requests += record.failed_requests + return existing_metrics + + +def update_breakdown_metrics( + breakdown: BreakdownMetrics, + record: Any, + model_metadata: Dict[str, Dict[str, Any]], + provider_metadata: Dict[str, Dict[str, Any]], + api_key_metadata: Dict[str, Dict[str, Any]], + entity_id_field: Optional[str] = None, +) -> BreakdownMetrics: + """Updates breakdown metrics for a single record using the existing update_metrics function""" + + # Update model breakdown + if record.model not in breakdown.models: + breakdown.models[record.model] = MetricWithMetadata( + metrics=SpendMetrics(), + metadata=model_metadata.get( + record.model, {} + ), # Add any model-specific metadata here + ) + breakdown.models[record.model].metrics = update_metrics( + breakdown.models[record.model].metrics, record + ) + + # Update provider breakdown + provider = record.custom_llm_provider or "unknown" + if provider not in breakdown.providers: + breakdown.providers[provider] = MetricWithMetadata( + metrics=SpendMetrics(), + metadata=provider_metadata.get( + provider, {} + ), # Add any provider-specific metadata here + ) + breakdown.providers[provider].metrics = update_metrics( + breakdown.providers[provider].metrics, record + ) + + # Update api key breakdown + if record.api_key not in breakdown.api_keys: + breakdown.api_keys[record.api_key] = KeyMetricWithMetadata( + metrics=SpendMetrics(), + metadata=KeyMetadata( + key_alias=api_key_metadata.get(record.api_key, {}).get( + "key_alias", None + ) + ), # Add any api_key-specific metadata here + ) + breakdown.api_keys[record.api_key].metrics = update_metrics( + breakdown.api_keys[record.api_key].metrics, record + ) + + # Update entity-specific metrics if entity_id_field is provided + if entity_id_field: + entity_value = getattr(record, entity_id_field, None) + if entity_value: + if entity_value not in breakdown.entities: + breakdown.entities[entity_value] = MetricWithMetadata( + metrics=SpendMetrics(), metadata={} + ) + breakdown.entities[entity_value].metrics = update_metrics( + breakdown.entities[entity_value].metrics, record + ) + + return breakdown + + +async def get_daily_activity( + prisma_client: Optional[PrismaClient], + table_name: str, + entity_id_field: str, + entity_id: Optional[Union[str, List[str]]], + start_date: Optional[str], + end_date: Optional[str], + model: Optional[str], + api_key: Optional[str], + page: int, + page_size: int, +) -> SpendAnalyticsPaginatedResponse: + """Common function to get daily activity for any entity type.""" + if prisma_client is None: + raise HTTPException( + status_code=500, + detail={"error": CommonProxyErrors.db_not_connected_error.value}, + ) + + if start_date is None or end_date is None: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail={"error": "Please provide start_date and end_date"}, + ) + + try: + # Build filter conditions + where_conditions: Dict[str, Any] = { + "date": { + "gte": start_date, + "lte": end_date, + } + } + + if model: + where_conditions["model"] = model + if api_key: + where_conditions["api_key"] = api_key + if entity_id: + if isinstance(entity_id, list): + where_conditions[entity_id_field] = {"in": entity_id} + else: + where_conditions[entity_id_field] = entity_id + + # Get total count for pagination + total_count = await getattr(prisma_client.db, table_name).count( + where=where_conditions + ) + + # Fetch paginated results + daily_spend_data = await getattr(prisma_client.db, table_name).find_many( + where=where_conditions, + order=[ + {"date": "desc"}, + ], + skip=(page - 1) * page_size, + take=page_size, + ) + + # Get all unique API keys from the spend data + api_keys = set() + for record in daily_spend_data: + if record.api_key: + api_keys.add(record.api_key) + + # Fetch key aliases in bulk + api_key_metadata: Dict[str, Dict[str, Any]] = {} + model_metadata: Dict[str, Dict[str, Any]] = {} + provider_metadata: Dict[str, Dict[str, Any]] = {} + if api_keys: + key_records = await prisma_client.db.litellm_verificationtoken.find_many( + where={"token": {"in": list(api_keys)}} + ) + api_key_metadata.update( + {k.token: {"key_alias": k.key_alias} for k in key_records} + ) + + # Process results + results = [] + total_metrics = SpendMetrics() + grouped_data: Dict[str, Dict[str, Any]] = {} + + for record in daily_spend_data: + date_str = record.date + if date_str not in grouped_data: + grouped_data[date_str] = { + "metrics": SpendMetrics(), + "breakdown": BreakdownMetrics(), + } + + # Update metrics + grouped_data[date_str]["metrics"] = update_metrics( + grouped_data[date_str]["metrics"], record + ) + # Update breakdowns + grouped_data[date_str]["breakdown"] = update_breakdown_metrics( + grouped_data[date_str]["breakdown"], + record, + model_metadata, + provider_metadata, + api_key_metadata, + entity_id_field=entity_id_field, + ) + + # Update total metrics + total_metrics.spend += record.spend + total_metrics.prompt_tokens += record.prompt_tokens + total_metrics.completion_tokens += record.completion_tokens + total_metrics.total_tokens += ( + record.prompt_tokens + record.completion_tokens + ) + total_metrics.cache_read_input_tokens += record.cache_read_input_tokens + total_metrics.cache_creation_input_tokens += ( + record.cache_creation_input_tokens + ) + total_metrics.api_requests += record.api_requests + total_metrics.successful_requests += record.successful_requests + total_metrics.failed_requests += record.failed_requests + + # Convert grouped data to response format + for date_str, data in grouped_data.items(): + results.append( + DailySpendData( + date=datetime.strptime(date_str, "%Y-%m-%d").date(), + metrics=data["metrics"], + breakdown=data["breakdown"], + ) + ) + + # Sort results by date + results.sort(key=lambda x: x.date, reverse=True) + + return SpendAnalyticsPaginatedResponse( + results=results, + metadata=DailySpendMetadata( + total_spend=total_metrics.spend, + total_prompt_tokens=total_metrics.prompt_tokens, + total_completion_tokens=total_metrics.completion_tokens, + total_tokens=total_metrics.total_tokens, + total_api_requests=total_metrics.api_requests, + total_successful_requests=total_metrics.successful_requests, + total_failed_requests=total_metrics.failed_requests, + total_cache_read_input_tokens=total_metrics.cache_read_input_tokens, + total_cache_creation_input_tokens=total_metrics.cache_creation_input_tokens, + page=page, + total_pages=-(-total_count // page_size), # Ceiling division + has_more=(page * page_size) < total_count, + ), + ) + + except Exception as e: + verbose_proxy_logger.exception(f"Error fetching daily activity: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": f"Failed to fetch analytics: {str(e)}"}, + ) diff --git a/litellm/proxy/management_endpoints/internal_user_endpoints.py b/litellm/proxy/management_endpoints/internal_user_endpoints.py index 4a8f4edea3..a91bc2dc62 100644 --- a/litellm/proxy/management_endpoints/internal_user_endpoints.py +++ b/litellm/proxy/management_endpoints/internal_user_endpoints.py @@ -14,9 +14,8 @@ These are members of a Team on LiteLLM import asyncio import traceback import uuid -from datetime import date, datetime, timedelta, timezone -from enum import Enum -from typing import Any, Dict, List, Optional, TypedDict, Union, cast +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List, Optional, Union, cast import fastapi from fastapi import APIRouter, Depends, Header, HTTPException, Request, status @@ -33,6 +32,17 @@ from litellm.proxy.management_endpoints.key_management_endpoints import ( from litellm.proxy.management_helpers.audit_logs import create_audit_log_for_update from litellm.proxy.management_helpers.utils import management_endpoint_wrapper from litellm.proxy.utils import handle_exception_on_proxy +from litellm.types.proxy.management_endpoints.common_daily_activity import ( + BreakdownMetrics, + DailySpendData, + DailySpendMetadata, + KeyMetadata, + KeyMetricWithMetadata, + LiteLLM_DailyUserSpend, + MetricWithMetadata, + SpendAnalyticsPaginatedResponse, + SpendMetrics, +) router = APIRouter() @@ -82,9 +92,9 @@ def _update_internal_new_user_params(data_json: dict, data: NewUserRequest) -> d data_json["user_id"] = str(uuid.uuid4()) auto_create_key = data_json.pop("auto_create_key", True) if auto_create_key is False: - data_json["table_name"] = ( - "user" # only create a user, don't create key if 'auto_create_key' set to False - ) + data_json[ + "table_name" + ] = "user" # only create a user, don't create key if 'auto_create_key' set to False is_internal_user = False if data.user_role and data.user_role.is_internal_user_role: @@ -651,9 +661,9 @@ def _update_internal_user_params(data_json: dict, data: UpdateUserRequest) -> di "budget_duration" not in non_default_values ): # applies internal user limits, if user role updated if is_internal_user and litellm.internal_user_budget_duration is not None: - non_default_values["budget_duration"] = ( - litellm.internal_user_budget_duration - ) + non_default_values[ + "budget_duration" + ] = litellm.internal_user_budget_duration duration_s = duration_in_seconds( duration=non_default_values["budget_duration"] ) @@ -964,13 +974,13 @@ async def get_users( "in": user_id_list, # Now passing a list of strings as required by Prisma } - users: Optional[List[LiteLLM_UserTable]] = ( - await prisma_client.db.litellm_usertable.find_many( - where=where_conditions, - skip=skip, - take=page_size, - order={"created_at": "desc"}, - ) + users: Optional[ + List[LiteLLM_UserTable] + ] = await prisma_client.db.litellm_usertable.find_many( + where=where_conditions, + skip=skip, + take=page_size, + order={"created_at": "desc"}, ) # Get total count of user rows @@ -1225,13 +1235,13 @@ async def ui_view_users( } # Query users with pagination and filters - users: Optional[List[BaseModel]] = ( - await prisma_client.db.litellm_usertable.find_many( - where=where_conditions, - skip=skip, - take=page_size, - order={"created_at": "desc"}, - ) + users: Optional[ + List[BaseModel] + ] = await prisma_client.db.litellm_usertable.find_many( + where=where_conditions, + skip=skip, + take=page_size, + order={"created_at": "desc"}, ) if not users: @@ -1244,111 +1254,6 @@ async def ui_view_users( raise HTTPException(status_code=500, detail=f"Error searching users: {str(e)}") -class GroupByDimension(str, Enum): - DATE = "date" - MODEL = "model" - API_KEY = "api_key" - TEAM = "team" - ORGANIZATION = "organization" - MODEL_GROUP = "model_group" - PROVIDER = "custom_llm_provider" - - -class SpendMetrics(BaseModel): - spend: float = Field(default=0.0) - prompt_tokens: int = Field(default=0) - completion_tokens: int = Field(default=0) - cache_read_input_tokens: int = Field(default=0) - cache_creation_input_tokens: int = Field(default=0) - total_tokens: int = Field(default=0) - successful_requests: int = Field(default=0) - failed_requests: int = Field(default=0) - api_requests: int = Field(default=0) - - -class MetricBase(BaseModel): - metrics: SpendMetrics - - -class MetricWithMetadata(MetricBase): - metadata: Dict[str, Any] = Field(default_factory=dict) - - -class KeyMetadata(BaseModel): - """Metadata for a key""" - - key_alias: Optional[str] = None - - -class KeyMetricWithMetadata(MetricBase): - """Base class for metrics with additional metadata""" - - metadata: KeyMetadata = Field(default_factory=KeyMetadata) - - -class BreakdownMetrics(BaseModel): - """Breakdown of spend by different dimensions""" - - models: Dict[str, MetricWithMetadata] = Field( - default_factory=dict - ) # model -> {metrics, metadata} - providers: Dict[str, MetricWithMetadata] = Field( - default_factory=dict - ) # provider -> {metrics, metadata} - api_keys: Dict[str, KeyMetricWithMetadata] = Field( - default_factory=dict - ) # api_key -> {metrics, metadata} - - -class DailySpendData(BaseModel): - date: date - metrics: SpendMetrics - breakdown: BreakdownMetrics = Field(default_factory=BreakdownMetrics) - - -class DailySpendMetadata(BaseModel): - total_spend: float = Field(default=0.0) - total_prompt_tokens: int = Field(default=0) - total_completion_tokens: int = Field(default=0) - total_tokens: int = Field(default=0) - total_api_requests: int = Field(default=0) - total_successful_requests: int = Field(default=0) - total_failed_requests: int = Field(default=0) - total_cache_read_input_tokens: int = Field(default=0) - total_cache_creation_input_tokens: int = Field(default=0) - page: int = Field(default=1) - total_pages: int = Field(default=1) - has_more: bool = Field(default=False) - - -class SpendAnalyticsPaginatedResponse(BaseModel): - results: List[DailySpendData] - metadata: DailySpendMetadata = Field(default_factory=DailySpendMetadata) - - -class LiteLLM_DailyUserSpend(BaseModel): - id: str - user_id: str - date: str - api_key: str - model: str - model_group: Optional[str] = None - custom_llm_provider: Optional[str] = None - prompt_tokens: int = 0 - completion_tokens: int = 0 - cache_read_input_tokens: int = 0 - cache_creation_input_tokens: int = 0 - spend: float = 0.0 - api_requests: int = 0 - successful_requests: int = 0 - failed_requests: int = 0 - - -class GroupedData(TypedDict): - metrics: SpendMetrics - breakdown: BreakdownMetrics - - def update_metrics( group_metrics: SpendMetrics, record: LiteLLM_DailyUserSpend ) -> SpendMetrics: @@ -1494,9 +1399,9 @@ async def get_user_daily_activity( user_api_key_dict.user_role != LitellmUserRoles.PROXY_ADMIN and user_api_key_dict.user_role != LitellmUserRoles.PROXY_ADMIN_VIEW_ONLY ): - where_conditions["user_id"] = ( - user_api_key_dict.user_id - ) # only allow access to own data + where_conditions[ + "user_id" + ] = user_api_key_dict.user_id # only allow access to own data # Get total count for pagination total_count = await prisma_client.db.litellm_dailyuserspend.count( diff --git a/litellm/proxy/management_endpoints/tag_management_endpoints.py b/litellm/proxy/management_endpoints/tag_management_endpoints.py index 014a1f3c57..79a69a16c1 100644 --- a/litellm/proxy/management_endpoints/tag_management_endpoints.py +++ b/litellm/proxy/management_endpoints/tag_management_endpoints.py @@ -12,7 +12,7 @@ All /tag management endpoints import datetime import json -from typing import Dict +from typing import Dict, Optional from fastapi import APIRouter, Depends, HTTPException @@ -20,6 +20,10 @@ from litellm._logging import verbose_proxy_logger from litellm.litellm_core_utils.safe_json_dumps import safe_dumps from litellm.proxy._types import UserAPIKeyAuth from litellm.proxy.auth.user_api_key_auth import user_api_key_auth +from litellm.proxy.management_endpoints.common_daily_activity import ( + SpendAnalyticsPaginatedResponse, + get_daily_activity, +) from litellm.types.tag_management import ( TagConfig, TagDeleteRequest, @@ -354,3 +358,52 @@ async def delete_tag( return {"message": f"Tag {data.name} deleted successfully"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) + + +@router.get( + "/tag/daily/activity", + response_model=SpendAnalyticsPaginatedResponse, + tags=["tag management"], + dependencies=[Depends(user_api_key_auth)], +) +async def get_tag_daily_activity( + tags: Optional[str] = None, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + model: Optional[str] = None, + api_key: Optional[str] = None, + page: int = 1, + page_size: int = 10, +): + """ + Get daily activity for specific tags or all tags. + + Args: + tags (Optional[str]): Comma-separated list of tags to filter by. If not provided, returns data for all tags. + start_date (Optional[str]): Start date for the activity period (YYYY-MM-DD). + end_date (Optional[str]): End date for the activity period (YYYY-MM-DD). + model (Optional[str]): Filter by model name. + api_key (Optional[str]): Filter by API key. + page (int): Page number for pagination. + page_size (int): Number of items per page. + + Returns: + SpendAnalyticsPaginatedResponse: Paginated response containing daily activity data. + """ + from litellm.proxy.proxy_server import prisma_client + + # Convert comma-separated tags string to list if provided + tag_list = tags.split(",") if tags else None + + return await get_daily_activity( + prisma_client=prisma_client, + table_name="litellm_dailytagspend", + entity_id_field="tag", + entity_id=tag_list, + start_date=start_date, + end_date=end_date, + model=model, + api_key=api_key, + page=page, + page_size=page_size, + ) diff --git a/litellm/proxy/management_endpoints/team_endpoints.py b/litellm/proxy/management_endpoints/team_endpoints.py index c8200f7fed..706f7d2c2f 100644 --- a/litellm/proxy/management_endpoints/team_endpoints.py +++ b/litellm/proxy/management_endpoints/team_endpoints.py @@ -62,6 +62,9 @@ from litellm.proxy.management_endpoints.common_utils import ( _is_user_team_admin, _set_object_metadata_field, ) +from litellm.proxy.management_endpoints.tag_management_endpoints import ( + get_daily_activity, +) from litellm.proxy.management_helpers.team_member_permission_checks import ( TeamMemberPermissionChecks, ) @@ -75,6 +78,9 @@ from litellm.proxy.utils import ( handle_exception_on_proxy, ) from litellm.router import Router +from litellm.types.proxy.management_endpoints.common_daily_activity import ( + SpendAnalyticsPaginatedResponse, +) from litellm.types.proxy.management_endpoints.team_endpoints import ( GetTeamMemberPermissionsResponse, UpdateTeamMemberPermissionsRequest, @@ -515,12 +521,12 @@ async def update_team( updated_kv["model_id"] = _model_id updated_kv = prisma_client.jsonify_team_object(db_data=updated_kv) - team_row: Optional[LiteLLM_TeamTable] = ( - await prisma_client.db.litellm_teamtable.update( - where={"team_id": data.team_id}, - data=updated_kv, - include={"litellm_model_table": True}, # type: ignore - ) + team_row: Optional[ + LiteLLM_TeamTable + ] = await prisma_client.db.litellm_teamtable.update( + where={"team_id": data.team_id}, + data=updated_kv, + include={"litellm_model_table": True}, # type: ignore ) if team_row is None or team_row.team_id is None: @@ -1146,10 +1152,10 @@ async def delete_team( team_rows: List[LiteLLM_TeamTable] = [] for team_id in data.team_ids: try: - team_row_base: Optional[BaseModel] = ( - await prisma_client.db.litellm_teamtable.find_unique( - where={"team_id": team_id} - ) + team_row_base: Optional[ + BaseModel + ] = await prisma_client.db.litellm_teamtable.find_unique( + where={"team_id": team_id} ) if team_row_base is None: raise Exception @@ -1307,10 +1313,10 @@ async def team_info( ) try: - team_info: Optional[BaseModel] = ( - await prisma_client.db.litellm_teamtable.find_unique( - where={"team_id": team_id} - ) + team_info: Optional[ + BaseModel + ] = await prisma_client.db.litellm_teamtable.find_unique( + where={"team_id": team_id} ) if team_info is None: raise Exception @@ -2079,3 +2085,52 @@ async def update_team_member_permissions( ) return updated_team + + +@router.get( + "/team/daily/activity", + response_model=SpendAnalyticsPaginatedResponse, + tags=["team management"], + dependencies=[Depends(user_api_key_auth)], +) +async def get_team_daily_activity( + team_ids: Optional[str] = None, + start_date: Optional[str] = None, + end_date: Optional[str] = None, + model: Optional[str] = None, + api_key: Optional[str] = None, + page: int = 1, + page_size: int = 10, +): + """ + Get daily activity for specific teams or all teams. + + Args: + team_ids (Optional[str]): Comma-separated list of team IDs to filter by. If not provided, returns data for all teams. + start_date (Optional[str]): Start date for the activity period (YYYY-MM-DD). + end_date (Optional[str]): End date for the activity period (YYYY-MM-DD). + model (Optional[str]): Filter by model name. + api_key (Optional[str]): Filter by API key. + page (int): Page number for pagination. + page_size (int): Number of items per page. + + Returns: + SpendAnalyticsPaginatedResponse: Paginated response containing daily activity data. + """ + from litellm.proxy.proxy_server import prisma_client + + # Convert comma-separated tags string to list if provided + team_ids_list = team_ids.split(",") if team_ids else None + + return await get_daily_activity( + prisma_client=prisma_client, + table_name="litellm_dailyteamspend", + entity_id_field="team_id", + entity_id=team_ids_list, + start_date=start_date, + end_date=end_date, + model=model, + api_key=api_key, + page=page, + page_size=page_size, + ) diff --git a/litellm/proxy/schema.prisma b/litellm/proxy/schema.prisma index 976b5e1508..68e9382d75 100644 --- a/litellm/proxy/schema.prisma +++ b/litellm/proxy/schema.prisma @@ -354,6 +354,8 @@ model LiteLLM_DailyTeamSpend { custom_llm_provider String? prompt_tokens Int @default(0) completion_tokens Int @default(0) + cache_read_input_tokens Int @default(0) + cache_creation_input_tokens Int @default(0) spend Float @default(0.0) api_requests Int @default(0) successful_requests Int @default(0) @@ -368,6 +370,33 @@ model LiteLLM_DailyTeamSpend { @@index([model]) } +// Track daily team spend metrics per model and key +model LiteLLM_DailyTagSpend { + id String @id @default(uuid()) + tag String + date String + api_key String + model String + model_group String? + custom_llm_provider String? + prompt_tokens Int @default(0) + completion_tokens Int @default(0) + cache_read_input_tokens Int @default(0) + cache_creation_input_tokens Int @default(0) + spend Float @default(0.0) + api_requests Int @default(0) + successful_requests Int @default(0) + failed_requests Int @default(0) + created_at DateTime @default(now()) + updated_at DateTime @updatedAt + + @@unique([tag, date, api_key, model, custom_llm_provider]) + @@index([date]) + @@index([tag]) + @@index([api_key]) + @@index([model]) +} + // Track the status of cron jobs running. Only allow one pod to run the job at a time model LiteLLM_CronJob { diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index c722a92cf7..6e8c65710d 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -2796,50 +2796,3 @@ def _premium_user_check(): "error": f"This feature is only available for LiteLLM Enterprise users. {CommonProxyErrors.not_premium_user.value}" }, ) - - -async def _update_daily_spend_batch(prisma_client, spend_aggregates): - """Helper function to update daily spend in batches""" - async with prisma_client.db.batch_() as batcher: - for ( - user_id, - date, - api_key, - model, - model_group, - provider, - ), metrics in spend_aggregates.items(): - if not user_id: # Skip if no user_id - continue - - batcher.litellm_dailyuserspend.upsert( - where={ - "user_id_date_api_key_model_custom_llm_provider": { - "user_id": user_id, - "date": date, - "api_key": api_key, - "model": model, - "custom_llm_provider": provider, - } - }, - data={ - "create": { - "user_id": user_id, - "date": date, - "api_key": api_key, - "model": model, - "model_group": model_group, - "custom_llm_provider": provider, - "prompt_tokens": metrics["prompt_tokens"], - "completion_tokens": metrics["completion_tokens"], - "spend": metrics["spend"], - }, - "update": { - "prompt_tokens": {"increment": metrics["prompt_tokens"]}, - "completion_tokens": { - "increment": metrics["completion_tokens"] - }, - "spend": {"increment": metrics["spend"]}, - }, - }, - ) diff --git a/litellm/types/proxy/management_endpoints/common_daily_activity.py b/litellm/types/proxy/management_endpoints/common_daily_activity.py new file mode 100644 index 0000000000..9408035746 --- /dev/null +++ b/litellm/types/proxy/management_endpoints/common_daily_activity.py @@ -0,0 +1,113 @@ +from datetime import date +from enum import Enum +from typing import Any, Dict, List, Optional, TypedDict + +from pydantic import BaseModel, Field + + +class GroupByDimension(str, Enum): + DATE = "date" + MODEL = "model" + API_KEY = "api_key" + TEAM = "team" + ORGANIZATION = "organization" + MODEL_GROUP = "model_group" + PROVIDER = "custom_llm_provider" + + +class SpendMetrics(BaseModel): + spend: float = Field(default=0.0) + prompt_tokens: int = Field(default=0) + completion_tokens: int = Field(default=0) + cache_read_input_tokens: int = Field(default=0) + cache_creation_input_tokens: int = Field(default=0) + total_tokens: int = Field(default=0) + successful_requests: int = Field(default=0) + failed_requests: int = Field(default=0) + api_requests: int = Field(default=0) + + +class MetricBase(BaseModel): + metrics: SpendMetrics + + +class MetricWithMetadata(MetricBase): + metadata: Dict[str, Any] = Field(default_factory=dict) + + +class KeyMetadata(BaseModel): + """Metadata for a key""" + + key_alias: Optional[str] = None + + +class KeyMetricWithMetadata(MetricBase): + """Base class for metrics with additional metadata""" + + metadata: KeyMetadata = Field(default_factory=KeyMetadata) + + +class BreakdownMetrics(BaseModel): + """Breakdown of spend by different dimensions""" + + models: Dict[str, MetricWithMetadata] = Field( + default_factory=dict + ) # model -> {metrics, metadata} + providers: Dict[str, MetricWithMetadata] = Field( + default_factory=dict + ) # provider -> {metrics, metadata} + api_keys: Dict[str, KeyMetricWithMetadata] = Field( + default_factory=dict + ) # api_key -> {metrics, metadata} + entities: Dict[str, MetricWithMetadata] = Field( + default_factory=dict + ) # entity -> {metrics, metadata} + + +class DailySpendData(BaseModel): + date: date + metrics: SpendMetrics + breakdown: BreakdownMetrics = Field(default_factory=BreakdownMetrics) + + +class DailySpendMetadata(BaseModel): + total_spend: float = Field(default=0.0) + total_prompt_tokens: int = Field(default=0) + total_completion_tokens: int = Field(default=0) + total_tokens: int = Field(default=0) + total_api_requests: int = Field(default=0) + total_successful_requests: int = Field(default=0) + total_failed_requests: int = Field(default=0) + total_cache_read_input_tokens: int = Field(default=0) + total_cache_creation_input_tokens: int = Field(default=0) + page: int = Field(default=1) + total_pages: int = Field(default=1) + has_more: bool = Field(default=False) + + +class SpendAnalyticsPaginatedResponse(BaseModel): + results: List[DailySpendData] + metadata: DailySpendMetadata = Field(default_factory=DailySpendMetadata) + + +class LiteLLM_DailyUserSpend(BaseModel): + id: str + user_id: str + date: str + api_key: str + model: str + model_group: Optional[str] = None + custom_llm_provider: Optional[str] = None + prompt_tokens: int = 0 + completion_tokens: int = 0 + cache_read_input_tokens: int = 0 + cache_creation_input_tokens: int = 0 + spend: float = 0.0 + api_requests: int = 0 + successful_requests: int = 0 + failed_requests: int = 0 + + +class GroupedData(TypedDict): + metrics: SpendMetrics + breakdown: BreakdownMetrics diff --git a/litellm/types/services.py b/litellm/types/services.py index 6c788c8956..e038100e27 100644 --- a/litellm/types/services.py +++ b/litellm/types/services.py @@ -34,6 +34,7 @@ class ServiceTypes(str, enum.Enum): IN_MEMORY_DAILY_SPEND_UPDATE_QUEUE = "in_memory_daily_spend_update_queue" REDIS_DAILY_SPEND_UPDATE_QUEUE = "redis_daily_spend_update_queue" REDIS_DAILY_TEAM_SPEND_UPDATE_QUEUE = "redis_daily_team_spend_update_queue" + REDIS_DAILY_TAG_SPEND_UPDATE_QUEUE = "redis_daily_tag_spend_update_queue" # spend update queue - current spend of key, user, team IN_MEMORY_SPEND_UPDATE_QUEUE = "in_memory_spend_update_queue" REDIS_SPEND_UPDATE_QUEUE = "redis_spend_update_queue" diff --git a/model_prices_and_context_window.json b/model_prices_and_context_window.json index d42762355c..9918743bc9 100644 --- a/model_prices_and_context_window.json +++ b/model_prices_and_context_window.json @@ -600,6 +600,40 @@ "supports_vision": true, "supports_prompt_caching": true }, + "o3": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1e-5, + "output_cost_per_token": 4e-5, + "cache_read_input_token_cost": 2.5e-6, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, + "o3-2025-04-16": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1e-5, + "output_cost_per_token": 4e-5, + "cache_read_input_token_cost": 2.5e-6, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, "o3-mini": { "max_tokens": 100000, "max_input_tokens": 200000, @@ -634,6 +668,40 @@ "supports_reasoning": true, "supports_tool_choice": true }, + "o4-mini": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1.1e-6, + "output_cost_per_token": 4.4e-6, + "cache_read_input_token_cost": 2.75e-7, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, + "o4-mini-2025-04-16": { + "max_tokens": 100000, + "max_input_tokens": 200000, + "max_output_tokens": 100000, + "input_cost_per_token": 1.1e-6, + "output_cost_per_token": 4.4e-6, + "cache_read_input_token_cost": 2.75e-7, + "litellm_provider": "openai", + "mode": "chat", + "supports_function_calling": true, + "supports_parallel_function_calling": false, + "supports_vision": true, + "supports_prompt_caching": true, + "supports_response_schema": true, + "supports_reasoning": true, + "supports_tool_choice": true + }, "o1-mini-2024-09-12": { "max_tokens": 65536, "max_input_tokens": 128000, diff --git a/schema.prisma b/schema.prisma index 976b5e1508..68e9382d75 100644 --- a/schema.prisma +++ b/schema.prisma @@ -354,6 +354,8 @@ model LiteLLM_DailyTeamSpend { custom_llm_provider String? prompt_tokens Int @default(0) completion_tokens Int @default(0) + cache_read_input_tokens Int @default(0) + cache_creation_input_tokens Int @default(0) spend Float @default(0.0) api_requests Int @default(0) successful_requests Int @default(0) @@ -368,6 +370,33 @@ model LiteLLM_DailyTeamSpend { @@index([model]) } +// Track daily team spend metrics per model and key +model LiteLLM_DailyTagSpend { + id String @id @default(uuid()) + tag String + date String + api_key String + model String + model_group String? + custom_llm_provider String? + prompt_tokens Int @default(0) + completion_tokens Int @default(0) + cache_read_input_tokens Int @default(0) + cache_creation_input_tokens Int @default(0) + spend Float @default(0.0) + api_requests Int @default(0) + successful_requests Int @default(0) + failed_requests Int @default(0) + created_at DateTime @default(now()) + updated_at DateTime @updatedAt + + @@unique([tag, date, api_key, model, custom_llm_provider]) + @@index([date]) + @@index([tag]) + @@index([api_key]) + @@index([model]) +} + // Track the status of cron jobs running. Only allow one pod to run the job at a time model LiteLLM_CronJob { diff --git a/ui/litellm-dashboard/src/components/entity_usage.tsx b/ui/litellm-dashboard/src/components/entity_usage.tsx new file mode 100644 index 0000000000..e4cfb9bfae --- /dev/null +++ b/ui/litellm-dashboard/src/components/entity_usage.tsx @@ -0,0 +1,503 @@ +import React, { useState, useEffect } from "react"; +import { + BarChart, Card, Title, Text, + Grid, Col, DateRangePicker, DateRangePickerValue, + Table, TableHead, TableRow, TableHeaderCell, TableBody, TableCell, + DonutChart, + TabPanel, TabGroup, TabList, Tab, TabPanels +} from "@tremor/react"; +import { Select } from 'antd'; +import { ActivityMetrics, processActivityData } from './activity_metrics'; +import { SpendMetrics, DailyData } from './usage/types'; +import { tagDailyActivityCall, teamDailyActivityCall } from './networking'; + +interface EntityMetrics { + metrics: { + spend: number; + prompt_tokens: number; + completion_tokens: number; + cache_read_input_tokens: number; + cache_creation_input_tokens: number; + total_tokens: number; + successful_requests: number; + failed_requests: number; + api_requests: number; + }; + metadata: Record; +} + +interface BreakdownMetrics { + models: Record; + providers: Record; + api_keys: Record; + entities: Record; +} + +interface ExtendedDailyData extends DailyData { + breakdown: BreakdownMetrics; +} + +interface EntitySpendData { + results: ExtendedDailyData[]; + metadata: { + total_spend: number; + total_api_requests: number; + total_successful_requests: number; + total_failed_requests: number; + total_tokens: number; + }; +} + +interface EntityUsageProps { + accessToken: string | null; + entityType: 'tag' | 'team'; + entityId?: string | null; +} + +const EntityUsage: React.FC = ({ + accessToken, + entityType, + entityId +}) => { + const [spendData, setSpendData] = useState({ + results: [], + metadata: { + total_spend: 0, + total_api_requests: 0, + total_successful_requests: 0, + total_failed_requests: 0, + total_tokens: 0 + } + }); + + const modelMetrics = processActivityData(spendData); + + const [selectedTags, setSelectedTags] = useState([]); + const [dateValue, setDateValue] = useState({ + from: new Date(Date.now() - 28 * 24 * 60 * 60 * 1000), + to: new Date(), + }); + + const fetchSpendData = async () => { + if (!accessToken || !dateValue.from || !dateValue.to) return; + const startTime = dateValue.from; + const endTime = dateValue.to; + + if (entityType === 'tag') { + const data = await tagDailyActivityCall( + accessToken, + startTime, + endTime, + 1, + selectedTags.length > 0 ? selectedTags : null + ); + setSpendData(data); + } else if (entityType === 'team') { + const data = await teamDailyActivityCall( + accessToken, + startTime, + endTime, + 1, + selectedTags.length > 0 ? selectedTags : null + ); + setSpendData(data); + } else { + throw new Error("Invalid entity type"); + } + }; + + useEffect(() => { + fetchSpendData(); + }, [accessToken, dateValue, entityId, selectedTags]); + + const getTopModels = () => { + const modelSpend: { [key: string]: any } = {}; + spendData.results.forEach(day => { + Object.entries(day.breakdown.models || {}).forEach(([model, metrics]) => { + if (!modelSpend[model]) { + modelSpend[model] = { + spend: 0, + requests: 0, + successful_requests: 0, + failed_requests: 0, + tokens: 0 + }; + } + try { + modelSpend[model].spend += metrics.metrics.spend; + } catch (e) { + console.log(`Error adding spend for ${model}: ${e}, got metrics: ${JSON.stringify(metrics)}`); + } + modelSpend[model].requests += metrics.metrics.api_requests; + modelSpend[model].successful_requests += metrics.metrics.successful_requests; + modelSpend[model].failed_requests += metrics.metrics.failed_requests; + modelSpend[model].tokens += metrics.metrics.total_tokens; + }); + }); + + return Object.entries(modelSpend) + .map(([model, metrics]) => ({ + key: model, + ...metrics + })) + .sort((a, b) => b.spend - a.spend) + .slice(0, 5); + }; + + const getTopApiKeys = () => { + const apiKeySpend: { [key: string]: any } = {}; + spendData.results.forEach(day => { + Object.entries(day.breakdown.api_keys || {}).forEach(([key, metrics]) => { + if (!apiKeySpend[key]) { + apiKeySpend[key] = { + key: key, + spend: 0, + requests: 0, + successful_requests: 0, + failed_requests: 0, + tokens: 0 + }; + } + apiKeySpend[key].spend += metrics.metrics.spend; + apiKeySpend[key].requests += metrics.metrics.api_requests; + apiKeySpend[key].successful_requests += metrics.metrics.successful_requests; + apiKeySpend[key].failed_requests += metrics.metrics.failed_requests; + apiKeySpend[key].tokens += metrics.metrics.total_tokens; + }); + }); + + return Object.values(apiKeySpend) + .sort((a, b) => b.spend - a.spend) + .slice(0, 5); + }; + + const getProviderSpend = () => { + const providerSpend: { [key: string]: any } = {}; + spendData.results.forEach(day => { + Object.entries(day.breakdown.providers || {}).forEach(([provider, metrics]) => { + if (!providerSpend[provider]) { + providerSpend[provider] = { + provider, + spend: 0, + requests: 0, + successful_requests: 0, + failed_requests: 0, + tokens: 0 + }; + } + try { + providerSpend[provider].spend += metrics.metrics.spend; + providerSpend[provider].requests += metrics.metrics.api_requests; + providerSpend[provider].successful_requests += metrics.metrics.successful_requests; + providerSpend[provider].failed_requests += metrics.metrics.failed_requests; + providerSpend[provider].tokens += metrics.metrics.total_tokens; + } catch (e) { + console.log(`Error processing provider ${provider}: ${e}`); + } + }); + }); + + return Object.values(providerSpend) + .filter(provider => provider.spend > 0) + .sort((a, b) => b.spend - a.spend); + }; + + const getAllTags = () => { + const tags = new Set(); + spendData.results.forEach(day => { + Object.keys(day.breakdown.entities || {}).forEach(tag => { + tags.add(tag); + }); + }); + return Array.from(tags).map(tag => ({ + label: tag, + value: tag + })); + }; + + const filterDataByTags = (data: any[]) => { + if (selectedTags.length === 0) return data; + return data.filter(item => selectedTags.includes(item.entity)); + }; + + const getEntityBreakdown = () => { + const entitySpend: { [key: string]: any } = {}; + spendData.results.forEach(day => { + Object.entries(day.breakdown.entities || {}).forEach(([entity, data]) => { + if (!entitySpend[entity]) { + entitySpend[entity] = { + entity, + spend: 0, + requests: 0, + successful_requests: 0, + failed_requests: 0, + tokens: 0 + }; + } + entitySpend[entity].spend += data.metrics.spend; + entitySpend[entity].requests += data.metrics.api_requests; + entitySpend[entity].successful_requests += data.metrics.successful_requests; + entitySpend[entity].failed_requests += data.metrics.failed_requests; + entitySpend[entity].tokens += data.metrics.total_tokens; + }); + }); + + const result = Object.values(entitySpend) + .sort((a, b) => b.spend - a.spend); + + return filterDataByTags(result); + }; + + + + return ( +
+ + + Select Time Range + + + + Filter by {entityType === 'tag' ? 'Tags' : 'Teams'} +