working cron job updates

This commit is contained in:
Ishaan Jaff 2025-03-27 23:13:01 -07:00
parent 666cd8d4e3
commit 1ba73f491a
5 changed files with 78 additions and 26 deletions

View file

@ -444,4 +444,4 @@ LITELLM_PROXY_ADMIN_NAME = "default_user_id"
########################### DB CRON JOB NAMES ###########################
DB_SPEND_UPDATE_JOB_NAME = "db_spend_update_job"
DEFAULT_CRON_JOB_LOCK_TTL_SECONDS = 600 # 5 minutes
DEFAULT_CRON_JOB_LOCK_TTL_SECONDS = 60 # 1 minute

View file

@ -41,7 +41,7 @@ class PodLockManager:
)
# Attempt to acquire the lock by upserting the record in the `cronjob_locks` table
cronjob_lock = await prisma_client.db.cronjob.upsert(
cronjob_lock = await prisma_client.db.litellm_cronjob.upsert(
where={"cronjob_id": self.cronjob_id},
data={
"create": {
@ -90,7 +90,7 @@ class PodLockManager:
seconds=DEFAULT_CRON_JOB_LOCK_TTL_SECONDS
)
await prisma_client.db.cronjob.update(
await prisma_client.db.litellm_cronjob.update(
where={"cronjob_id": self.cronjob_id, "pod_id": self.pod_id},
data={"ttl": ttl_expiry, "last_updated": current_time},
)
@ -114,7 +114,7 @@ class PodLockManager:
verbose_proxy_logger.debug(
"releasing lock for cronjob_id=%s", self.cronjob_id
)
await prisma_client.db.cronjob.update(
await prisma_client.db.litellm_cronjob.update(
where={"cronjob_id": self.cronjob_id, "pod_id": self.pod_id},
data={"status": "INACTIVE"},
)

View file

@ -111,6 +111,13 @@ class RedisUpdateBuffer:
value=transaction_amount,
)
@staticmethod
def _remove_prefix_from_keys(data: Dict[str, Any], prefix: str) -> Dict[str, Any]:
"""
Removes the specified prefix from the keys of a dictionary.
"""
return {key.replace(prefix, "", 1): value for key, value in data.items()}
async def get_all_update_transactions_from_redis(
self,
) -> Optional[DBSpendUpdateTransactions]:
@ -119,24 +126,69 @@ class RedisUpdateBuffer:
"""
if self.redis_cache is None:
return None
expected_keys = [
"user_list_transactions",
"end_user_list_transactions",
"key_list_transactions",
"team_list_transactions",
"team_member_list_transactions",
"org_list_transactions",
]
result = await self.redis_cache.async_batch_get_cache(expected_keys)
if result is None:
return None
return DBSpendUpdateTransactions(
user_list_transactions=result.get("user_list_transactions", {}),
end_user_list_transactions=result.get("end_user_list_transactions", {}),
key_list_transactions=result.get("key_list_transactions", {}),
team_list_transactions=result.get("team_list_transactions", {}),
team_member_list_transactions=result.get(
"team_member_list_transactions", {}
),
org_list_transactions=result.get("org_list_transactions", {}),
user_transaction_keys = await self.redis_cache.async_scan_iter(
"user_list_transactions:*"
)
end_user_transaction_keys = await self.redis_cache.async_scan_iter(
"end_user_list_transactions:*"
)
key_transaction_keys = await self.redis_cache.async_scan_iter(
"key_list_transactions:*"
)
team_transaction_keys = await self.redis_cache.async_scan_iter(
"team_list_transactions:*"
)
team_member_transaction_keys = await self.redis_cache.async_scan_iter(
"team_member_list_transactions:*"
)
org_transaction_keys = await self.redis_cache.async_scan_iter(
"org_list_transactions:*"
)
user_list_transactions = await self.redis_cache.async_batch_get_cache(
user_transaction_keys
)
end_user_list_transactions = await self.redis_cache.async_batch_get_cache(
end_user_transaction_keys
)
key_list_transactions = await self.redis_cache.async_batch_get_cache(
key_transaction_keys
)
team_list_transactions = await self.redis_cache.async_batch_get_cache(
team_transaction_keys
)
team_member_list_transactions = await self.redis_cache.async_batch_get_cache(
team_member_transaction_keys
)
org_list_transactions = await self.redis_cache.async_batch_get_cache(
org_transaction_keys
)
# filter out the "prefix" from the keys using the helper method
user_list_transactions = self._remove_prefix_from_keys(
user_list_transactions, "user_list_transactions:"
)
end_user_list_transactions = self._remove_prefix_from_keys(
end_user_list_transactions, "end_user_list_transactions:"
)
key_list_transactions = self._remove_prefix_from_keys(
key_list_transactions, "key_list_transactions:"
)
team_list_transactions = self._remove_prefix_from_keys(
team_list_transactions, "team_list_transactions:"
)
team_member_list_transactions = self._remove_prefix_from_keys(
team_member_list_transactions, "team_member_list_transactions:"
)
org_list_transactions = self._remove_prefix_from_keys(
org_list_transactions, "org_list_transactions:"
)
return DBSpendUpdateTransactions(
user_list_transactions=user_list_transactions,
end_user_list_transactions=end_user_list_transactions,
key_list_transactions=key_list_transactions,
team_list_transactions=team_list_transactions,
team_member_list_transactions=team_member_list_transactions,
org_list_transactions=org_list_transactions,
)

View file

@ -339,7 +339,7 @@ model LiteLLM_DailyUserSpend {
// Track the status of cron jobs running. Only allow one pod to run the job at a time
model CronJob {
model LiteLLM_CronJob {
cronjob_id String @id @default(cuid()) // Unique ID for the record
pod_id String // Unique identifier for the pod acting as the leader
status JobStatus @default(INACTIVE) // Status of the cron job (active or inactive)

View file

@ -338,7 +338,7 @@ model LiteLLM_DailyUserSpend {
// Track the status of cron jobs running. Only allow one pod to run the job at a time
model CronJob {
model LiteLLM_CronJob {
cronjob_id String @id @default(cuid()) // Unique ID for the record
pod_id String // Unique identifier for the pod acting as the leader
status JobStatus @default(INACTIVE) // Status of the cron job (active or inactive)