diff --git a/litellm/proxy/db/db_spend_update_writer.py b/litellm/proxy/db/db_spend_update_writer.py index 20c33fb240..56f7664c73 100644 --- a/litellm/proxy/db/db_spend_update_writer.py +++ b/litellm/proxy/db/db_spend_update_writer.py @@ -404,9 +404,6 @@ class DBSpendUpdateWriter: db_spend_update_transactions = ( await self.redis_update_buffer.get_all_update_transactions_from_redis_buffer() ) - daily_spend_update_transactions = ( - await self.redis_update_buffer.get_all_daily_spend_update_transactions_from_redis_buffer() - ) if db_spend_update_transactions is not None: await self._commit_spend_updates_to_db( prisma_client=prisma_client, @@ -414,6 +411,10 @@ class DBSpendUpdateWriter: proxy_logging_obj=proxy_logging_obj, db_spend_update_transactions=db_spend_update_transactions, ) + + daily_spend_update_transactions = ( + await self.redis_update_buffer.get_all_daily_spend_update_transactions_from_redis_buffer() + ) if daily_spend_update_transactions is not None: await DBSpendUpdateWriter.update_daily_user_spend( n_retry_times=n_retry_times, @@ -439,12 +440,12 @@ class DBSpendUpdateWriter: Note: This flow causes Deadlocks in production (1K RPS+). Use self._commit_spend_updates_to_db_with_redis() instead if you expect 1K+ RPS. """ + + # Aggregate all in memory spend updates (key, user, end_user, team, team_member, org) and commit to db + ################## Spend Update Transactions ################## db_spend_update_transactions = ( await self.spend_update_queue.flush_and_get_aggregated_db_spend_update_transactions() ) - daily_spend_update_transactions = ( - await self.daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions() - ) await self._commit_spend_updates_to_db( prisma_client=prisma_client, n_retry_times=n_retry_times, @@ -452,6 +453,12 @@ class DBSpendUpdateWriter: db_spend_update_transactions=db_spend_update_transactions, ) + ################## Daily Spend Update Transactions ################## + # Aggregate all in memory daily spend transactions and commit to db + daily_spend_update_transactions = ( + await self.daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions() + ) + await DBSpendUpdateWriter.update_daily_user_spend( n_retry_times=n_retry_times, prisma_client=prisma_client, @@ -843,7 +850,7 @@ class DBSpendUpdateWriter: prisma_client: PrismaClient, ): """ - Add a spend log transaction to the daily user transaction list + Add a spend log transaction to the `daily_spend_update_queue` Key = @@unique([user_id, date, api_key, model, custom_llm_provider]) ) diff --git a/litellm/proxy/db/spend_update_queue.py b/litellm/proxy/db/spend_update_queue.py index 47b4b9724a..c17c1cddf9 100644 --- a/litellm/proxy/db/spend_update_queue.py +++ b/litellm/proxy/db/spend_update_queue.py @@ -142,7 +142,18 @@ class DailySpendUpdateQueue: ] = asyncio.Queue() async def add_update(self, update: Dict[str, DailyUserSpendTransaction]) -> None: - """Enqueue an update. Each update might be a dict like {'entity_type': 'user', 'entity_id': '123', 'amount': 1.2}.""" + """Enqueue an update. Each update might be a dict like + { + "user_date_api_key_model_custom_llm_provider": { + "spend": 1.2, + "prompt_tokens": 1000, + "completion_tokens": 1000, + "api_requests": 1000, + "successful_requests": 1000, + "failed_requests": 1000, + } + } + .""" verbose_proxy_logger.debug("Adding update to queue: %s", update) await self.update_queue.put(update)