diff --git a/src/llama_stack/providers/utils/job_scheduler/__init__.py b/src/llama_stack/providers/utils/job_scheduler/__init__.py new file mode 100644 index 000000000..e09d9f45d --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/__init__.py @@ -0,0 +1,47 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .api import JobStatus, Scheduler +from .config import CelerySchedulerConfig, InlineSchedulerConfig, SchedulerConfig + + +async def scheduler_impl(config: SchedulerConfig) -> Scheduler: + """ + Factory function to instantiate scheduler implementations. + + Args: + config: Scheduler configuration (InlineSchedulerConfig or CelerySchedulerConfig) + + Returns: + Scheduler: An initialized scheduler instance + + Raises: + ValueError: If the config type is unknown + """ + impl: Scheduler + if isinstance(config, InlineSchedulerConfig): + from .inline import InlineSchedulerImpl + + impl = InlineSchedulerImpl(config) + elif isinstance(config, CelerySchedulerConfig): + from .celery import CelerySchedulerImpl + + impl = CelerySchedulerImpl(config) + else: + raise ValueError(f"Unknown scheduler config type: {type(config)}") + + await impl.initialize() + return impl + + +__all__ = [ + "JobStatus", + "Scheduler", + "SchedulerConfig", + "InlineSchedulerConfig", + "CelerySchedulerConfig", + "scheduler_impl", +] diff --git a/src/llama_stack/providers/utils/job_scheduler/api.py b/src/llama_stack/providers/utils/job_scheduler/api.py new file mode 100644 index 000000000..e46da8fcb --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/api.py @@ -0,0 +1,135 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from collections.abc import Awaitable, Callable +from enum import StrEnum +from typing import Protocol + + +class JobStatus(StrEnum): + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +class Scheduler(Protocol): + """ + Abstract scheduler protocol for managing async jobs. + + This provides a pluggable backend for job scheduling and execution. + """ + + async def schedule_job( + self, + job_type: str, + job_data: dict, + metadata: dict | None = None, + ) -> str: + """ + Schedule a new job for execution. + + Args: + job_type: Type of job (e.g., "batch_processing", "log_aggregation", "metrics_collection") + job_data: Job-specific data and parameters + metadata: Additional metadata for tracking + + Returns: + job_id: UUID for tracking the job + """ + ... + + async def get_job_info(self, job_id: str) -> dict: + """ + Get complete information about a job. + + Returns: + { + "job_id": str, + "job_type": str, + "status": JobStatus, + "created_at": datetime, + "started_at": datetime | None, + "completed_at": datetime | None, + "progress": float, # 0.0 to 1.0 + "metadata": dict, + "error": str | None, # Error message if status == FAILED + "result": dict | None, # Job result if status == COMPLETED + } + """ + ... + + async def cancel_job(self, job_id: str) -> bool: + """ + Cancel a pending or running job. + + Args: + job_id: Job to cancel + + Returns: + True if job was cancelled, False if not found or already completed + """ + ... + + async def delete_job(self, job_id: str) -> bool: + """ + Delete a completed or cancelled job. + + Args: + job_id: Job to delete + + Returns: + True if job was deleted, False if not found + """ + ... + + async def list_jobs( + self, + job_type: str | None = None, + status: JobStatus | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + """ + List jobs with optional filtering. + + Args: + job_type: Filter by job type (e.g., "batch_processing", "log_aggregation", "metrics_collection") + status: Filter by status + limit: Maximum number of jobs to return + offset: Offset for pagination + + Returns: + List of job info dictionaries + """ + ... + + def register_job_executor( + self, + job_type: str, + executor: Callable[[dict], Awaitable[dict]], + ) -> None: + """ + Register a job executor function for a specific job type. + + This allows components to register custom job execution logic with the scheduler. + When a job of the registered type is executed, the scheduler will call the + registered executor function. + + Args: + job_type: The type of job (e.g., "vector_store_file_batch") + executor: Async function that takes job_data dict and returns result dict + """ + ... + + async def initialize(self) -> None: + """Initialize the scheduler (connect to backend, etc.)""" + ... + + async def shutdown(self) -> None: + """Gracefully shutdown the scheduler""" + ... diff --git a/src/llama_stack/providers/utils/job_scheduler/celery/__init__.py b/src/llama_stack/providers/utils/job_scheduler/celery/__init__.py new file mode 100644 index 000000000..27d9d01cb --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/celery/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .scheduler import CelerySchedulerImpl + +__all__ = ["CelerySchedulerImpl"] diff --git a/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py b/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py new file mode 100644 index 000000000..0a5b6e220 --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py @@ -0,0 +1,81 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from collections.abc import Awaitable, Callable + +from ..api import JobStatus, Scheduler +from ..config import CelerySchedulerConfig + + +class CelerySchedulerImpl(Scheduler): + """ + Celery-based scheduler implementation for distributed multi-worker deployments. + + This scheduler uses Celery to distribute jobs across multiple worker processes + or machines. It provides: + - Persistent job queue (via Redis/RabbitMQ broker) + - Multi-worker coordination + - Crash recovery (jobs survive server restarts) + - Distributed task execution + + This is suitable for: + - Production deployments + - Multi-worker scenarios + - High availability requirements + """ + + def __init__(self, config: CelerySchedulerConfig): + self.config = config + self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} + # TODO: Initialize Celery app with broker and result backend + raise NotImplementedError("Celery scheduler not yet implemented") + + def register_job_executor( + self, + job_type: str, + executor: Callable[[dict], Awaitable[dict]], + ) -> None: + """Register a job executor function for a specific job type.""" + self._job_executors[job_type] = executor + + async def initialize(self) -> None: + """Initialize the Celery scheduler.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def shutdown(self) -> None: + """Gracefully shutdown the Celery scheduler.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def schedule_job( + self, + job_type: str, + job_data: dict, + metadata: dict | None = None, + ) -> str: + """Schedule a new job for execution via Celery.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def get_job_info(self, job_id: str) -> dict: + """Get complete information about a job from Celery result backend.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def cancel_job(self, job_id: str) -> bool: + """Cancel a pending or running Celery job.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def delete_job(self, job_id: str) -> bool: + """Delete a completed or cancelled job from Celery result backend.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def list_jobs( + self, + job_type: str | None = None, + status: JobStatus | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + """List jobs from Celery result backend with optional filtering.""" + raise NotImplementedError("Celery scheduler not yet implemented") diff --git a/src/llama_stack/providers/utils/job_scheduler/config.py b/src/llama_stack/providers/utils/job_scheduler/config.py new file mode 100644 index 000000000..d8eba89fb --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/config.py @@ -0,0 +1,58 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from typing import Annotated, Literal + +from pydantic import BaseModel, Field + +from llama_stack.core.storage.datatypes import KVStoreReference + + +class SchedulerConfig(BaseModel): + """Base class for scheduler configurations.""" + + type: str + + +class InlineSchedulerConfig(SchedulerConfig): + """ + Configuration for inline (asyncio-based) scheduler. + + This scheduler runs jobs in the same process using asyncio tasks. + Suitable for development and single-worker deployments. + """ + + type: Literal["inline"] = "inline" + kvstore: KVStoreReference = Field( + description="KVStore reference for persisting job state", + ) + max_concurrent_jobs: int = Field( + default=10, + description="Maximum number of jobs that can run concurrently", + ) + + +class CelerySchedulerConfig(SchedulerConfig): + """ + Configuration for Celery-based distributed scheduler. + + This scheduler distributes jobs across multiple worker processes/machines. + Suitable for production and multi-worker deployments. + """ + + type: Literal["celery"] = "celery" + broker_url: str = Field( + description="Celery broker URL (e.g., 'redis://localhost:6379/0')", + ) + result_backend: str = Field( + description="Celery result backend URL (e.g., 'redis://localhost:6379/1')", + ) + + +SchedulerConfigUnion = Annotated[ + InlineSchedulerConfig | CelerySchedulerConfig, + Field(discriminator="type"), +] diff --git a/src/llama_stack/providers/utils/job_scheduler/inline/__init__.py b/src/llama_stack/providers/utils/job_scheduler/inline/__init__.py new file mode 100644 index 000000000..5be56af5e --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/inline/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .scheduler import InlineSchedulerImpl + +__all__ = ["InlineSchedulerImpl"] diff --git a/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py b/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py new file mode 100644 index 000000000..401bb0d27 --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py @@ -0,0 +1,303 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import asyncio +import json +import traceback +import uuid +from collections.abc import Awaitable, Callable +from datetime import UTC, datetime +from typing import Any + +from llama_stack.core.utils.serialize import EnumEncoder +from llama_stack.providers.utils.kvstore import kvstore_impl +from llama_stack.providers.utils.kvstore.api import KVStore + +from ..api import JobStatus, Scheduler +from ..config import InlineSchedulerConfig + +JOB_PREFIX = "job_scheduler:jobs:" + + +class InlineSchedulerImpl(Scheduler): + """ + Inline scheduler implementation using asyncio for single-worker deployments. + + This scheduler runs jobs in the same process using asyncio tasks. Jobs and their + state are persisted to KVStore for crash recovery. + + This is suitable for: + - Development and testing + - Single-worker deployments + - Scenarios where external dependencies (Redis, RabbitMQ) are not available + """ + + def __init__(self, config: InlineSchedulerConfig): + self.config = config + self._jobs: dict[str, dict[str, Any]] = {} + self._tasks: dict[str, asyncio.Task] = {} + self._semaphore: asyncio.Semaphore + self._shutdown_event = asyncio.Event() + self._kvstore: KVStore + self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} + + async def initialize(self) -> None: + """Initialize the scheduler and load persisted jobs.""" + self._semaphore = asyncio.Semaphore(self.config.max_concurrent_jobs) + + # Initialize KVStore + self._kvstore = await kvstore_impl(self.config.kvstore) + + # Load persisted jobs from KVStore + await self._load_jobs_from_storage() + + # Resume incomplete jobs + await self._resume_incomplete_jobs() + + def register_job_executor( + self, + job_type: str, + executor: Callable[[dict], Awaitable[dict]], + ) -> None: + """Register a job executor function for a specific job type.""" + self._job_executors[job_type] = executor + + async def _load_jobs_from_storage(self) -> None: + """Load all jobs from KVStore into memory.""" + start_key = JOB_PREFIX + end_key = f"{JOB_PREFIX}\xff" + + stored_values = await self._kvstore.values_in_range(start_key, end_key) + + for value in stored_values: + job = json.loads(value) + # Deserialize datetime strings back to datetime objects + job["created_at"] = datetime.fromisoformat(job["created_at"]) + if job.get("started_at"): + job["started_at"] = datetime.fromisoformat(job["started_at"]) + if job.get("completed_at"): + job["completed_at"] = datetime.fromisoformat(job["completed_at"]) + job["status"] = JobStatus(job["status"]) + + self._jobs[job["job_id"]] = job + + async def _resume_incomplete_jobs(self) -> None: + """Resume jobs that were running when server crashed.""" + for job_id, job in self._jobs.items(): + if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: + # Reset running jobs to pending + if job["status"] == JobStatus.RUNNING: + job["status"] = JobStatus.PENDING + job["started_at"] = None + await self._save_job_to_storage(job) + + # Restart the job + task = asyncio.create_task(self._run_job(job_id)) + self._tasks[job_id] = task + + async def _save_job_to_storage(self, job: dict[str, Any]) -> None: + """Persist job to KVStore.""" + key = f"{JOB_PREFIX}{job['job_id']}" + await self._kvstore.set(key, json.dumps(job, cls=EnumEncoder)) + + async def _delete_job_from_storage(self, job_id: str) -> None: + """Delete job from KVStore.""" + key = f"{JOB_PREFIX}{job_id}" + await self._kvstore.delete(key) + + async def shutdown(self) -> None: + """Gracefully shutdown the scheduler.""" + self._shutdown_event.set() + + # Cancel all running tasks + for task in self._tasks.values(): + if not task.done(): + task.cancel() + + # Wait for all tasks to complete + if self._tasks: + await asyncio.gather(*self._tasks.values(), return_exceptions=True) + + self._tasks.clear() + + async def schedule_job( + self, + job_type: str, + job_data: dict, + metadata: dict | None = None, + ) -> str: + """Schedule a new job for execution.""" + job_id = str(uuid.uuid4()) + + job_info = { + "job_id": job_id, + "job_type": job_type, + "status": JobStatus.PENDING, + "created_at": datetime.now(UTC), + "started_at": None, + "completed_at": None, + "progress": 0.0, + "metadata": metadata or {}, + "job_data": job_data, + "error": None, + "result": None, + } + + self._jobs[job_id] = job_info + + # Persist to KVStore + await self._save_job_to_storage(job_info) + + # Create and schedule the task + task = asyncio.create_task(self._run_job(job_id)) + self._tasks[job_id] = task + + return job_id + + async def _run_job(self, job_id: str) -> None: + """Run a job asynchronously.""" + job = self._jobs[job_id] + + try: + # Acquire semaphore to limit concurrent jobs + async with self._semaphore: + # Update status to RUNNING + job["status"] = JobStatus.RUNNING + job["started_at"] = datetime.now(UTC) + await self._save_job_to_storage(job) + + # Execute the job based on job_type + result = await self._execute_job(job) + + # Mark as completed + job["status"] = JobStatus.COMPLETED + job["completed_at"] = datetime.now(UTC) + job["progress"] = 1.0 + job["result"] = result + await self._save_job_to_storage(job) + + except asyncio.CancelledError: + # Job was cancelled + job["status"] = JobStatus.CANCELLED + job["completed_at"] = datetime.now(UTC) + await self._save_job_to_storage(job) + raise + + except Exception as e: + # Job failed + job["status"] = JobStatus.FAILED + job["completed_at"] = datetime.now(UTC) + job["error"] = str(e) + job["result"] = {"error_details": traceback.format_exc()} + await self._save_job_to_storage(job) + + finally: + # Clean up task reference + if job_id in self._tasks: + del self._tasks[job_id] + + async def _execute_job(self, job: dict) -> dict: + """ + Execute a job based on its type. + + If a custom executor is registered for the job type, it will be called. + Otherwise, raises an error for unknown job types. + """ + job_type = job["job_type"] + job_data = job["job_data"] + + if job_type in self._job_executors: + executor = self._job_executors[job_type] + return await executor(job_data) + + raise ValueError(f"No executor registered for job type: {job_type}") + + async def get_job_info(self, job_id: str) -> dict: + """Get complete information about a job.""" + if job_id not in self._jobs: + raise ValueError(f"Job {job_id} not found") + + job = self._jobs[job_id].copy() + + # Remove internal job_data field from response + job.pop("job_data", None) + + return job + + async def cancel_job(self, job_id: str) -> bool: + """Cancel a pending or running job.""" + if job_id not in self._jobs: + return False + + job = self._jobs[job_id] + + # Can only cancel pending or running jobs + if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: + return False + + # Cancel the task if it exists + if job_id in self._tasks: + task = self._tasks[job_id] + if not task.done(): + task.cancel() + + # Update job status + job["status"] = JobStatus.CANCELLED + job["completed_at"] = datetime.now(UTC) + await self._save_job_to_storage(job) + + return True + + async def delete_job(self, job_id: str) -> bool: + """Delete a completed or cancelled job.""" + if job_id not in self._jobs: + return False + + job = self._jobs[job_id] + + # Can only delete completed, failed, or cancelled jobs + if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: + return False + + # Remove from memory and storage + del self._jobs[job_id] + await self._delete_job_from_storage(job_id) + + return True + + async def list_jobs( + self, + job_type: str | None = None, + status: JobStatus | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + """List jobs with optional filtering.""" + jobs = list(self._jobs.values()) + + # Filter by job_type + if job_type is not None: + jobs = [j for j in jobs if j["job_type"] == job_type] + + # Filter by status + if status is not None: + jobs = [j for j in jobs if j["status"] == status] + + # Sort by created_at (newest first) + jobs.sort(key=lambda j: j["created_at"], reverse=True) + + # Apply pagination + jobs = jobs[offset : offset + limit] + + # Convert to return format (remove internal fields) + result = [] + for job in jobs: + job_copy = job.copy() + # Remove internal job_data field + job_copy.pop("job_data", None) + result.append(job_copy) + + return result