From 0a5db39c89c6d4317e0d13f334d26fb56ccd07ab Mon Sep 17 00:00:00 2001 From: Ishaan Jaff Date: Thu, 27 Mar 2025 23:13:01 -0700 Subject: [PATCH] working cron job updates --- litellm/constants.py | 2 +- litellm/proxy/db/pod_leader_manager.py | 6 +- litellm/proxy/db/redis_update_buffer.py | 92 +++++++++++++++++++------ litellm/proxy/schema.prisma | 2 +- schema.prisma | 2 +- 5 files changed, 78 insertions(+), 26 deletions(-) diff --git a/litellm/constants.py b/litellm/constants.py index 0d5a9dcddf..1276c08fae 100644 --- a/litellm/constants.py +++ b/litellm/constants.py @@ -444,4 +444,4 @@ LITELLM_PROXY_ADMIN_NAME = "default_user_id" ########################### DB CRON JOB NAMES ########################### DB_SPEND_UPDATE_JOB_NAME = "db_spend_update_job" -DEFAULT_CRON_JOB_LOCK_TTL_SECONDS = 600 # 5 minutes +DEFAULT_CRON_JOB_LOCK_TTL_SECONDS = 60 # 1 minute diff --git a/litellm/proxy/db/pod_leader_manager.py b/litellm/proxy/db/pod_leader_manager.py index c762eabf66..9c055e96f6 100644 --- a/litellm/proxy/db/pod_leader_manager.py +++ b/litellm/proxy/db/pod_leader_manager.py @@ -41,7 +41,7 @@ class PodLockManager: ) # Attempt to acquire the lock by upserting the record in the `cronjob_locks` table - cronjob_lock = await prisma_client.db.cronjob.upsert( + cronjob_lock = await prisma_client.db.litellm_cronjob.upsert( where={"cronjob_id": self.cronjob_id}, data={ "create": { @@ -90,7 +90,7 @@ class PodLockManager: seconds=DEFAULT_CRON_JOB_LOCK_TTL_SECONDS ) - await prisma_client.db.cronjob.update( + await prisma_client.db.litellm_cronjob.update( where={"cronjob_id": self.cronjob_id, "pod_id": self.pod_id}, data={"ttl": ttl_expiry, "last_updated": current_time}, ) @@ -114,7 +114,7 @@ class PodLockManager: verbose_proxy_logger.debug( "releasing lock for cronjob_id=%s", self.cronjob_id ) - await prisma_client.db.cronjob.update( + await prisma_client.db.litellm_cronjob.update( where={"cronjob_id": self.cronjob_id, "pod_id": self.pod_id}, data={"status": "INACTIVE"}, ) diff --git a/litellm/proxy/db/redis_update_buffer.py b/litellm/proxy/db/redis_update_buffer.py index 94cd1e47c7..2207911024 100644 --- a/litellm/proxy/db/redis_update_buffer.py +++ b/litellm/proxy/db/redis_update_buffer.py @@ -111,6 +111,13 @@ class RedisUpdateBuffer: value=transaction_amount, ) + @staticmethod + def _remove_prefix_from_keys(data: Dict[str, Any], prefix: str) -> Dict[str, Any]: + """ + Removes the specified prefix from the keys of a dictionary. + """ + return {key.replace(prefix, "", 1): value for key, value in data.items()} + async def get_all_update_transactions_from_redis( self, ) -> Optional[DBSpendUpdateTransactions]: @@ -119,24 +126,69 @@ class RedisUpdateBuffer: """ if self.redis_cache is None: return None - expected_keys = [ - "user_list_transactions", - "end_user_list_transactions", - "key_list_transactions", - "team_list_transactions", - "team_member_list_transactions", - "org_list_transactions", - ] - result = await self.redis_cache.async_batch_get_cache(expected_keys) - if result is None: - return None - return DBSpendUpdateTransactions( - user_list_transactions=result.get("user_list_transactions", {}), - end_user_list_transactions=result.get("end_user_list_transactions", {}), - key_list_transactions=result.get("key_list_transactions", {}), - team_list_transactions=result.get("team_list_transactions", {}), - team_member_list_transactions=result.get( - "team_member_list_transactions", {} - ), - org_list_transactions=result.get("org_list_transactions", {}), + user_transaction_keys = await self.redis_cache.async_scan_iter( + "user_list_transactions:*" + ) + end_user_transaction_keys = await self.redis_cache.async_scan_iter( + "end_user_list_transactions:*" + ) + key_transaction_keys = await self.redis_cache.async_scan_iter( + "key_list_transactions:*" + ) + team_transaction_keys = await self.redis_cache.async_scan_iter( + "team_list_transactions:*" + ) + team_member_transaction_keys = await self.redis_cache.async_scan_iter( + "team_member_list_transactions:*" + ) + org_transaction_keys = await self.redis_cache.async_scan_iter( + "org_list_transactions:*" + ) + + user_list_transactions = await self.redis_cache.async_batch_get_cache( + user_transaction_keys + ) + end_user_list_transactions = await self.redis_cache.async_batch_get_cache( + end_user_transaction_keys + ) + key_list_transactions = await self.redis_cache.async_batch_get_cache( + key_transaction_keys + ) + team_list_transactions = await self.redis_cache.async_batch_get_cache( + team_transaction_keys + ) + team_member_list_transactions = await self.redis_cache.async_batch_get_cache( + team_member_transaction_keys + ) + org_list_transactions = await self.redis_cache.async_batch_get_cache( + org_transaction_keys + ) + + # filter out the "prefix" from the keys using the helper method + user_list_transactions = self._remove_prefix_from_keys( + user_list_transactions, "user_list_transactions:" + ) + end_user_list_transactions = self._remove_prefix_from_keys( + end_user_list_transactions, "end_user_list_transactions:" + ) + key_list_transactions = self._remove_prefix_from_keys( + key_list_transactions, "key_list_transactions:" + ) + team_list_transactions = self._remove_prefix_from_keys( + team_list_transactions, "team_list_transactions:" + ) + team_member_list_transactions = self._remove_prefix_from_keys( + team_member_list_transactions, "team_member_list_transactions:" + ) + org_list_transactions = self._remove_prefix_from_keys( + org_list_transactions, "org_list_transactions:" + ) + + return DBSpendUpdateTransactions( + user_list_transactions=user_list_transactions, + end_user_list_transactions=end_user_list_transactions, + key_list_transactions=key_list_transactions, + team_list_transactions=team_list_transactions, + team_member_list_transactions=team_member_list_transactions, + org_list_transactions=org_list_transactions, ) diff --git a/litellm/proxy/schema.prisma b/litellm/proxy/schema.prisma index ccc22f8af1..ae7560ead3 100644 --- a/litellm/proxy/schema.prisma +++ b/litellm/proxy/schema.prisma @@ -339,7 +339,7 @@ model LiteLLM_DailyUserSpend { // Track the status of cron jobs running. Only allow one pod to run the job at a time -model CronJob { +model LiteLLM_CronJob { cronjob_id String @id @default(cuid()) // Unique ID for the record pod_id String // Unique identifier for the pod acting as the leader status JobStatus @default(INACTIVE) // Status of the cron job (active or inactive) diff --git a/schema.prisma b/schema.prisma index d0de6e2e27..ad46f96f6d 100644 --- a/schema.prisma +++ b/schema.prisma @@ -338,7 +338,7 @@ model LiteLLM_DailyUserSpend { // Track the status of cron jobs running. Only allow one pod to run the job at a time -model CronJob { +model LiteLLM_CronJob { cronjob_id String @id @default(cuid()) // Unique ID for the record pod_id String // Unique identifier for the pod acting as the leader status JobStatus @default(INACTIVE) // Status of the cron job (active or inactive)