working cron job updates

This commit is contained in:
Ishaan Jaff 2025-03-27 23:13:01 -07:00
parent 39cc75c36b
commit 0a5db39c89
5 changed files with 78 additions and 26 deletions

View file

@ -444,4 +444,4 @@ LITELLM_PROXY_ADMIN_NAME = "default_user_id"
########################### DB CRON JOB NAMES ########################### ########################### DB CRON JOB NAMES ###########################
DB_SPEND_UPDATE_JOB_NAME = "db_spend_update_job" DB_SPEND_UPDATE_JOB_NAME = "db_spend_update_job"
DEFAULT_CRON_JOB_LOCK_TTL_SECONDS = 600 # 5 minutes DEFAULT_CRON_JOB_LOCK_TTL_SECONDS = 60 # 1 minute

View file

@ -41,7 +41,7 @@ class PodLockManager:
) )
# Attempt to acquire the lock by upserting the record in the `cronjob_locks` table # Attempt to acquire the lock by upserting the record in the `cronjob_locks` table
cronjob_lock = await prisma_client.db.cronjob.upsert( cronjob_lock = await prisma_client.db.litellm_cronjob.upsert(
where={"cronjob_id": self.cronjob_id}, where={"cronjob_id": self.cronjob_id},
data={ data={
"create": { "create": {
@ -90,7 +90,7 @@ class PodLockManager:
seconds=DEFAULT_CRON_JOB_LOCK_TTL_SECONDS seconds=DEFAULT_CRON_JOB_LOCK_TTL_SECONDS
) )
await prisma_client.db.cronjob.update( await prisma_client.db.litellm_cronjob.update(
where={"cronjob_id": self.cronjob_id, "pod_id": self.pod_id}, where={"cronjob_id": self.cronjob_id, "pod_id": self.pod_id},
data={"ttl": ttl_expiry, "last_updated": current_time}, data={"ttl": ttl_expiry, "last_updated": current_time},
) )
@ -114,7 +114,7 @@ class PodLockManager:
verbose_proxy_logger.debug( verbose_proxy_logger.debug(
"releasing lock for cronjob_id=%s", self.cronjob_id "releasing lock for cronjob_id=%s", self.cronjob_id
) )
await prisma_client.db.cronjob.update( await prisma_client.db.litellm_cronjob.update(
where={"cronjob_id": self.cronjob_id, "pod_id": self.pod_id}, where={"cronjob_id": self.cronjob_id, "pod_id": self.pod_id},
data={"status": "INACTIVE"}, data={"status": "INACTIVE"},
) )

View file

@ -111,6 +111,13 @@ class RedisUpdateBuffer:
value=transaction_amount, value=transaction_amount,
) )
@staticmethod
def _remove_prefix_from_keys(data: Dict[str, Any], prefix: str) -> Dict[str, Any]:
"""
Removes the specified prefix from the keys of a dictionary.
"""
return {key.replace(prefix, "", 1): value for key, value in data.items()}
async def get_all_update_transactions_from_redis( async def get_all_update_transactions_from_redis(
self, self,
) -> Optional[DBSpendUpdateTransactions]: ) -> Optional[DBSpendUpdateTransactions]:
@ -119,24 +126,69 @@ class RedisUpdateBuffer:
""" """
if self.redis_cache is None: if self.redis_cache is None:
return None return None
expected_keys = [ user_transaction_keys = await self.redis_cache.async_scan_iter(
"user_list_transactions", "user_list_transactions:*"
"end_user_list_transactions", )
"key_list_transactions", end_user_transaction_keys = await self.redis_cache.async_scan_iter(
"team_list_transactions", "end_user_list_transactions:*"
"team_member_list_transactions", )
"org_list_transactions", key_transaction_keys = await self.redis_cache.async_scan_iter(
] "key_list_transactions:*"
result = await self.redis_cache.async_batch_get_cache(expected_keys) )
if result is None: team_transaction_keys = await self.redis_cache.async_scan_iter(
return None "team_list_transactions:*"
return DBSpendUpdateTransactions( )
user_list_transactions=result.get("user_list_transactions", {}), team_member_transaction_keys = await self.redis_cache.async_scan_iter(
end_user_list_transactions=result.get("end_user_list_transactions", {}), "team_member_list_transactions:*"
key_list_transactions=result.get("key_list_transactions", {}), )
team_list_transactions=result.get("team_list_transactions", {}), org_transaction_keys = await self.redis_cache.async_scan_iter(
team_member_list_transactions=result.get( "org_list_transactions:*"
"team_member_list_transactions", {} )
),
org_list_transactions=result.get("org_list_transactions", {}), user_list_transactions = await self.redis_cache.async_batch_get_cache(
user_transaction_keys
)
end_user_list_transactions = await self.redis_cache.async_batch_get_cache(
end_user_transaction_keys
)
key_list_transactions = await self.redis_cache.async_batch_get_cache(
key_transaction_keys
)
team_list_transactions = await self.redis_cache.async_batch_get_cache(
team_transaction_keys
)
team_member_list_transactions = await self.redis_cache.async_batch_get_cache(
team_member_transaction_keys
)
org_list_transactions = await self.redis_cache.async_batch_get_cache(
org_transaction_keys
)
# filter out the "prefix" from the keys using the helper method
user_list_transactions = self._remove_prefix_from_keys(
user_list_transactions, "user_list_transactions:"
)
end_user_list_transactions = self._remove_prefix_from_keys(
end_user_list_transactions, "end_user_list_transactions:"
)
key_list_transactions = self._remove_prefix_from_keys(
key_list_transactions, "key_list_transactions:"
)
team_list_transactions = self._remove_prefix_from_keys(
team_list_transactions, "team_list_transactions:"
)
team_member_list_transactions = self._remove_prefix_from_keys(
team_member_list_transactions, "team_member_list_transactions:"
)
org_list_transactions = self._remove_prefix_from_keys(
org_list_transactions, "org_list_transactions:"
)
return DBSpendUpdateTransactions(
user_list_transactions=user_list_transactions,
end_user_list_transactions=end_user_list_transactions,
key_list_transactions=key_list_transactions,
team_list_transactions=team_list_transactions,
team_member_list_transactions=team_member_list_transactions,
org_list_transactions=org_list_transactions,
) )

View file

@ -339,7 +339,7 @@ model LiteLLM_DailyUserSpend {
// Track the status of cron jobs running. Only allow one pod to run the job at a time // Track the status of cron jobs running. Only allow one pod to run the job at a time
model CronJob { model LiteLLM_CronJob {
cronjob_id String @id @default(cuid()) // Unique ID for the record cronjob_id String @id @default(cuid()) // Unique ID for the record
pod_id String // Unique identifier for the pod acting as the leader pod_id String // Unique identifier for the pod acting as the leader
status JobStatus @default(INACTIVE) // Status of the cron job (active or inactive) status JobStatus @default(INACTIVE) // Status of the cron job (active or inactive)

View file

@ -338,7 +338,7 @@ model LiteLLM_DailyUserSpend {
// Track the status of cron jobs running. Only allow one pod to run the job at a time // Track the status of cron jobs running. Only allow one pod to run the job at a time
model CronJob { model LiteLLM_CronJob {
cronjob_id String @id @default(cuid()) // Unique ID for the record cronjob_id String @id @default(cuid()) // Unique ID for the record
pod_id String // Unique identifier for the pod acting as the leader pod_id String // Unique identifier for the pod acting as the leader
status JobStatus @default(INACTIVE) // Status of the cron job (active or inactive) status JobStatus @default(INACTIVE) // Status of the cron job (active or inactive)