fix spend update queue

This commit is contained in:
Ishaan Jaff 2025-04-01 17:49:28 -07:00
parent 9270823d6c
commit f122cabd24
2 changed files with 26 additions and 8 deletions

View file

@ -404,9 +404,6 @@ class DBSpendUpdateWriter:
db_spend_update_transactions = (
await self.redis_update_buffer.get_all_update_transactions_from_redis_buffer()
)
daily_spend_update_transactions = (
await self.redis_update_buffer.get_all_daily_spend_update_transactions_from_redis_buffer()
)
if db_spend_update_transactions is not None:
await self._commit_spend_updates_to_db(
prisma_client=prisma_client,
@ -414,6 +411,10 @@ class DBSpendUpdateWriter:
proxy_logging_obj=proxy_logging_obj,
db_spend_update_transactions=db_spend_update_transactions,
)
daily_spend_update_transactions = (
await self.redis_update_buffer.get_all_daily_spend_update_transactions_from_redis_buffer()
)
if daily_spend_update_transactions is not None:
await DBSpendUpdateWriter.update_daily_user_spend(
n_retry_times=n_retry_times,
@ -439,12 +440,12 @@ class DBSpendUpdateWriter:
Note: This flow causes Deadlocks in production (1K RPS+). Use self._commit_spend_updates_to_db_with_redis() instead if you expect 1K+ RPS.
"""
# Aggregate all in memory spend updates (key, user, end_user, team, team_member, org) and commit to db
################## Spend Update Transactions ##################
db_spend_update_transactions = (
await self.spend_update_queue.flush_and_get_aggregated_db_spend_update_transactions()
)
daily_spend_update_transactions = (
await self.daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
)
await self._commit_spend_updates_to_db(
prisma_client=prisma_client,
n_retry_times=n_retry_times,
@ -452,6 +453,12 @@ class DBSpendUpdateWriter:
db_spend_update_transactions=db_spend_update_transactions,
)
################## Daily Spend Update Transactions ##################
# Aggregate all in memory daily spend transactions and commit to db
daily_spend_update_transactions = (
await self.daily_spend_update_queue.flush_and_get_aggregated_daily_spend_update_transactions()
)
await DBSpendUpdateWriter.update_daily_user_spend(
n_retry_times=n_retry_times,
prisma_client=prisma_client,
@ -843,7 +850,7 @@ class DBSpendUpdateWriter:
prisma_client: PrismaClient,
):
"""
Add a spend log transaction to the daily user transaction list
Add a spend log transaction to the `daily_spend_update_queue`
Key = @@unique([user_id, date, api_key, model, custom_llm_provider]) )

View file

@ -142,7 +142,18 @@ class DailySpendUpdateQueue:
] = asyncio.Queue()
async def add_update(self, update: Dict[str, DailyUserSpendTransaction]) -> None:
"""Enqueue an update. Each update might be a dict like {'entity_type': 'user', 'entity_id': '123', 'amount': 1.2}."""
"""Enqueue an update. Each update might be a dict like
{
"user_date_api_key_model_custom_llm_provider": {
"spend": 1.2,
"prompt_tokens": 1000,
"completion_tokens": 1000,
"api_requests": 1000,
"successful_requests": 1000,
"failed_requests": 1000,
}
}
."""
verbose_proxy_logger.debug("Adding update to queue: %s", update)
await self.update_queue.put(update)