diff --git a/litellm/proxy/db/pod_leader_manager.py b/litellm/proxy/db/pod_leader_manager.py index 73ecd68b1b..c762eabf66 100644 --- a/litellm/proxy/db/pod_leader_manager.py +++ b/litellm/proxy/db/pod_leader_manager.py @@ -19,16 +19,19 @@ class PodLockManager: Ensures that only one pod can run a cron job at a time. """ - def __init__(self, prisma_client: Optional[PrismaClient], cronjob_id: str): + def __init__(self, cronjob_id: str): self.pod_id = str(uuid.uuid4()) - self.prisma = prisma_client self.cronjob_id = cronjob_id async def acquire_lock(self) -> bool: """ Attempt to acquire the lock for a specific cron job. """ - if not self.prisma: + from litellm.proxy.proxy_server import prisma_client + + verbose_proxy_logger.debug("acquiring lock for cronjob_id=%s", self.cronjob_id) + if not prisma_client: + verbose_proxy_logger.debug("prisma is None, returning False") return False try: current_time = datetime.now(timezone.utc) @@ -38,21 +41,24 @@ class PodLockManager: ) # Attempt to acquire the lock by upserting the record in the `cronjob_locks` table - cronjob_lock = await self.prisma.db.cronJob.upsert( + cronjob_lock = await prisma_client.db.cronjob.upsert( where={"cronjob_id": self.cronjob_id}, - create={ - "cronjob_id": self.cronjob_id, - "pod_id": self.pod_id, - "status": "ACTIVE", - "last_updated": current_time, - "ttl": ttl_expiry, - }, - update={ - "status": "ACTIVE", - "last_updated": current_time, - "ttl": ttl_expiry, + data={ + "create": { + "cronjob_id": self.cronjob_id, + "pod_id": self.pod_id, + "status": "ACTIVE", + "last_updated": current_time, + "ttl": ttl_expiry, + }, + "update": { + "status": "ACTIVE", + "last_updated": current_time, + "ttl": ttl_expiry, + }, }, ) + verbose_proxy_logger.debug("cronjob_lock=%s", cronjob_lock) if cronjob_lock.status == "ACTIVE" and cronjob_lock.pod_id == self.pod_id: verbose_proxy_logger.debug( @@ -70,16 +76,21 @@ class PodLockManager: """ Renew the lock (update the TTL) for the pod holding the lock. """ - if not self.prisma: + from litellm.proxy.proxy_server import prisma_client + + if not prisma_client: return False try: + verbose_proxy_logger.debug( + "renewing lock for cronjob_id=%s", self.cronjob_id + ) current_time = datetime.now(timezone.utc) # Extend the TTL for another DEFAULT_CRON_JOB_LOCK_TTL_SECONDS ttl_expiry = current_time + timedelta( seconds=DEFAULT_CRON_JOB_LOCK_TTL_SECONDS ) - await self.prisma.db.cronJob.update( + await prisma_client.db.cronjob.update( where={"cronjob_id": self.cronjob_id, "pod_id": self.pod_id}, data={"ttl": ttl_expiry, "last_updated": current_time}, ) @@ -95,10 +106,15 @@ class PodLockManager: """ Release the lock and mark the pod as inactive. """ - if not self.prisma: + from litellm.proxy.proxy_server import prisma_client + + if not prisma_client: return False try: - await self.prisma.db.cronJob.update( + verbose_proxy_logger.debug( + "releasing lock for cronjob_id=%s", self.cronjob_id + ) + await prisma_client.db.cronjob.update( where={"cronjob_id": self.cronjob_id, "pod_id": self.pod_id}, data={"status": "INACTIVE"}, ) diff --git a/litellm/proxy/schema.prisma b/litellm/proxy/schema.prisma index 6e4937390a..ccc22f8af1 100644 --- a/litellm/proxy/schema.prisma +++ b/litellm/proxy/schema.prisma @@ -340,10 +340,10 @@ model LiteLLM_DailyUserSpend { // Track the status of cron jobs running. Only allow one pod to run the job at a time model CronJob { - id String @id @default(cuid()) // Unique ID for the record - podId String // Unique identifier for the pod acting as the leader + cronjob_id String @id @default(cuid()) // Unique ID for the record + pod_id String // Unique identifier for the pod acting as the leader status JobStatus @default(INACTIVE) // Status of the cron job (active or inactive) - lastUpdated DateTime @default(now()) // Timestamp for the last update of the cron job record + last_updated DateTime @default(now()) // Timestamp for the last update of the cron job record ttl DateTime // Time when the leader's lease expires } @@ -352,3 +352,4 @@ enum JobStatus { INACTIVE } + diff --git a/schema.prisma b/schema.prisma index 5d1535d2ff..d0de6e2e27 100644 --- a/schema.prisma +++ b/schema.prisma @@ -339,10 +339,10 @@ model LiteLLM_DailyUserSpend { // Track the status of cron jobs running. Only allow one pod to run the job at a time model CronJob { - id String @id @default(cuid()) // Unique ID for the record - podId String // Unique identifier for the pod acting as the leader + cronjob_id String @id @default(cuid()) // Unique ID for the record + pod_id String // Unique identifier for the pod acting as the leader status JobStatus @default(INACTIVE) // Status of the cron job (active or inactive) - lastUpdated DateTime @default(now()) // Timestamp for the last update of the cron job record + last_updated DateTime @default(now()) // Timestamp for the last update of the cron job record ttl DateTime // Time when the leader's lease expires }