This commit is contained in:
Swapna Lekkala 2025-10-29 11:58:31 -07:00
parent 16c0321482
commit 2daecd34f1
7 changed files with 642 additions and 0 deletions

View file

@ -0,0 +1,47 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .api import JobStatus, Scheduler
from .config import CelerySchedulerConfig, InlineSchedulerConfig, SchedulerConfig
async def scheduler_impl(config: SchedulerConfig) -> Scheduler:
"""
Factory function to instantiate scheduler implementations.
Args:
config: Scheduler configuration (InlineSchedulerConfig or CelerySchedulerConfig)
Returns:
Scheduler: An initialized scheduler instance
Raises:
ValueError: If the config type is unknown
"""
impl: Scheduler
if isinstance(config, InlineSchedulerConfig):
from .inline import InlineSchedulerImpl
impl = InlineSchedulerImpl(config)
elif isinstance(config, CelerySchedulerConfig):
from .celery import CelerySchedulerImpl
impl = CelerySchedulerImpl(config)
else:
raise ValueError(f"Unknown scheduler config type: {type(config)}")
await impl.initialize()
return impl
__all__ = [
"JobStatus",
"Scheduler",
"SchedulerConfig",
"InlineSchedulerConfig",
"CelerySchedulerConfig",
"scheduler_impl",
]

View file

@ -0,0 +1,135 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from collections.abc import Awaitable, Callable
from enum import StrEnum
from typing import Protocol
class JobStatus(StrEnum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class Scheduler(Protocol):
"""
Abstract scheduler protocol for managing async jobs.
This provides a pluggable backend for job scheduling and execution.
"""
async def schedule_job(
self,
job_type: str,
job_data: dict,
metadata: dict | None = None,
) -> str:
"""
Schedule a new job for execution.
Args:
job_type: Type of job (e.g., "batch_processing", "log_aggregation", "metrics_collection")
job_data: Job-specific data and parameters
metadata: Additional metadata for tracking
Returns:
job_id: UUID for tracking the job
"""
...
async def get_job_info(self, job_id: str) -> dict:
"""
Get complete information about a job.
Returns:
{
"job_id": str,
"job_type": str,
"status": JobStatus,
"created_at": datetime,
"started_at": datetime | None,
"completed_at": datetime | None,
"progress": float, # 0.0 to 1.0
"metadata": dict,
"error": str | None, # Error message if status == FAILED
"result": dict | None, # Job result if status == COMPLETED
}
"""
...
async def cancel_job(self, job_id: str) -> bool:
"""
Cancel a pending or running job.
Args:
job_id: Job to cancel
Returns:
True if job was cancelled, False if not found or already completed
"""
...
async def delete_job(self, job_id: str) -> bool:
"""
Delete a completed or cancelled job.
Args:
job_id: Job to delete
Returns:
True if job was deleted, False if not found
"""
...
async def list_jobs(
self,
job_type: str | None = None,
status: JobStatus | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict]:
"""
List jobs with optional filtering.
Args:
job_type: Filter by job type (e.g., "batch_processing", "log_aggregation", "metrics_collection")
status: Filter by status
limit: Maximum number of jobs to return
offset: Offset for pagination
Returns:
List of job info dictionaries
"""
...
def register_job_executor(
self,
job_type: str,
executor: Callable[[dict], Awaitable[dict]],
) -> None:
"""
Register a job executor function for a specific job type.
This allows components to register custom job execution logic with the scheduler.
When a job of the registered type is executed, the scheduler will call the
registered executor function.
Args:
job_type: The type of job (e.g., "vector_store_file_batch")
executor: Async function that takes job_data dict and returns result dict
"""
...
async def initialize(self) -> None:
"""Initialize the scheduler (connect to backend, etc.)"""
...
async def shutdown(self) -> None:
"""Gracefully shutdown the scheduler"""
...

View file

@ -0,0 +1,9 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .scheduler import CelerySchedulerImpl
__all__ = ["CelerySchedulerImpl"]

View file

@ -0,0 +1,81 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from collections.abc import Awaitable, Callable
from ..api import JobStatus, Scheduler
from ..config import CelerySchedulerConfig
class CelerySchedulerImpl(Scheduler):
"""
Celery-based scheduler implementation for distributed multi-worker deployments.
This scheduler uses Celery to distribute jobs across multiple worker processes
or machines. It provides:
- Persistent job queue (via Redis/RabbitMQ broker)
- Multi-worker coordination
- Crash recovery (jobs survive server restarts)
- Distributed task execution
This is suitable for:
- Production deployments
- Multi-worker scenarios
- High availability requirements
"""
def __init__(self, config: CelerySchedulerConfig):
self.config = config
self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {}
# TODO: Initialize Celery app with broker and result backend
raise NotImplementedError("Celery scheduler not yet implemented")
def register_job_executor(
self,
job_type: str,
executor: Callable[[dict], Awaitable[dict]],
) -> None:
"""Register a job executor function for a specific job type."""
self._job_executors[job_type] = executor
async def initialize(self) -> None:
"""Initialize the Celery scheduler."""
raise NotImplementedError("Celery scheduler not yet implemented")
async def shutdown(self) -> None:
"""Gracefully shutdown the Celery scheduler."""
raise NotImplementedError("Celery scheduler not yet implemented")
async def schedule_job(
self,
job_type: str,
job_data: dict,
metadata: dict | None = None,
) -> str:
"""Schedule a new job for execution via Celery."""
raise NotImplementedError("Celery scheduler not yet implemented")
async def get_job_info(self, job_id: str) -> dict:
"""Get complete information about a job from Celery result backend."""
raise NotImplementedError("Celery scheduler not yet implemented")
async def cancel_job(self, job_id: str) -> bool:
"""Cancel a pending or running Celery job."""
raise NotImplementedError("Celery scheduler not yet implemented")
async def delete_job(self, job_id: str) -> bool:
"""Delete a completed or cancelled job from Celery result backend."""
raise NotImplementedError("Celery scheduler not yet implemented")
async def list_jobs(
self,
job_type: str | None = None,
status: JobStatus | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict]:
"""List jobs from Celery result backend with optional filtering."""
raise NotImplementedError("Celery scheduler not yet implemented")

View file

@ -0,0 +1,58 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Annotated, Literal
from pydantic import BaseModel, Field
from llama_stack.core.storage.datatypes import KVStoreReference
class SchedulerConfig(BaseModel):
"""Base class for scheduler configurations."""
type: str
class InlineSchedulerConfig(SchedulerConfig):
"""
Configuration for inline (asyncio-based) scheduler.
This scheduler runs jobs in the same process using asyncio tasks.
Suitable for development and single-worker deployments.
"""
type: Literal["inline"] = "inline"
kvstore: KVStoreReference = Field(
description="KVStore reference for persisting job state",
)
max_concurrent_jobs: int = Field(
default=10,
description="Maximum number of jobs that can run concurrently",
)
class CelerySchedulerConfig(SchedulerConfig):
"""
Configuration for Celery-based distributed scheduler.
This scheduler distributes jobs across multiple worker processes/machines.
Suitable for production and multi-worker deployments.
"""
type: Literal["celery"] = "celery"
broker_url: str = Field(
description="Celery broker URL (e.g., 'redis://localhost:6379/0')",
)
result_backend: str = Field(
description="Celery result backend URL (e.g., 'redis://localhost:6379/1')",
)
SchedulerConfigUnion = Annotated[
InlineSchedulerConfig | CelerySchedulerConfig,
Field(discriminator="type"),
]

View file

@ -0,0 +1,9 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .scheduler import InlineSchedulerImpl
__all__ = ["InlineSchedulerImpl"]

View file

@ -0,0 +1,303 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
import json
import traceback
import uuid
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from typing import Any
from llama_stack.core.utils.serialize import EnumEncoder
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
from ..api import JobStatus, Scheduler
from ..config import InlineSchedulerConfig
JOB_PREFIX = "job_scheduler:jobs:"
class InlineSchedulerImpl(Scheduler):
"""
Inline scheduler implementation using asyncio for single-worker deployments.
This scheduler runs jobs in the same process using asyncio tasks. Jobs and their
state are persisted to KVStore for crash recovery.
This is suitable for:
- Development and testing
- Single-worker deployments
- Scenarios where external dependencies (Redis, RabbitMQ) are not available
"""
def __init__(self, config: InlineSchedulerConfig):
self.config = config
self._jobs: dict[str, dict[str, Any]] = {}
self._tasks: dict[str, asyncio.Task] = {}
self._semaphore: asyncio.Semaphore
self._shutdown_event = asyncio.Event()
self._kvstore: KVStore
self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {}
async def initialize(self) -> None:
"""Initialize the scheduler and load persisted jobs."""
self._semaphore = asyncio.Semaphore(self.config.max_concurrent_jobs)
# Initialize KVStore
self._kvstore = await kvstore_impl(self.config.kvstore)
# Load persisted jobs from KVStore
await self._load_jobs_from_storage()
# Resume incomplete jobs
await self._resume_incomplete_jobs()
def register_job_executor(
self,
job_type: str,
executor: Callable[[dict], Awaitable[dict]],
) -> None:
"""Register a job executor function for a specific job type."""
self._job_executors[job_type] = executor
async def _load_jobs_from_storage(self) -> None:
"""Load all jobs from KVStore into memory."""
start_key = JOB_PREFIX
end_key = f"{JOB_PREFIX}\xff"
stored_values = await self._kvstore.values_in_range(start_key, end_key)
for value in stored_values:
job = json.loads(value)
# Deserialize datetime strings back to datetime objects
job["created_at"] = datetime.fromisoformat(job["created_at"])
if job.get("started_at"):
job["started_at"] = datetime.fromisoformat(job["started_at"])
if job.get("completed_at"):
job["completed_at"] = datetime.fromisoformat(job["completed_at"])
job["status"] = JobStatus(job["status"])
self._jobs[job["job_id"]] = job
async def _resume_incomplete_jobs(self) -> None:
"""Resume jobs that were running when server crashed."""
for job_id, job in self._jobs.items():
if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]:
# Reset running jobs to pending
if job["status"] == JobStatus.RUNNING:
job["status"] = JobStatus.PENDING
job["started_at"] = None
await self._save_job_to_storage(job)
# Restart the job
task = asyncio.create_task(self._run_job(job_id))
self._tasks[job_id] = task
async def _save_job_to_storage(self, job: dict[str, Any]) -> None:
"""Persist job to KVStore."""
key = f"{JOB_PREFIX}{job['job_id']}"
await self._kvstore.set(key, json.dumps(job, cls=EnumEncoder))
async def _delete_job_from_storage(self, job_id: str) -> None:
"""Delete job from KVStore."""
key = f"{JOB_PREFIX}{job_id}"
await self._kvstore.delete(key)
async def shutdown(self) -> None:
"""Gracefully shutdown the scheduler."""
self._shutdown_event.set()
# Cancel all running tasks
for task in self._tasks.values():
if not task.done():
task.cancel()
# Wait for all tasks to complete
if self._tasks:
await asyncio.gather(*self._tasks.values(), return_exceptions=True)
self._tasks.clear()
async def schedule_job(
self,
job_type: str,
job_data: dict,
metadata: dict | None = None,
) -> str:
"""Schedule a new job for execution."""
job_id = str(uuid.uuid4())
job_info = {
"job_id": job_id,
"job_type": job_type,
"status": JobStatus.PENDING,
"created_at": datetime.now(UTC),
"started_at": None,
"completed_at": None,
"progress": 0.0,
"metadata": metadata or {},
"job_data": job_data,
"error": None,
"result": None,
}
self._jobs[job_id] = job_info
# Persist to KVStore
await self._save_job_to_storage(job_info)
# Create and schedule the task
task = asyncio.create_task(self._run_job(job_id))
self._tasks[job_id] = task
return job_id
async def _run_job(self, job_id: str) -> None:
"""Run a job asynchronously."""
job = self._jobs[job_id]
try:
# Acquire semaphore to limit concurrent jobs
async with self._semaphore:
# Update status to RUNNING
job["status"] = JobStatus.RUNNING
job["started_at"] = datetime.now(UTC)
await self._save_job_to_storage(job)
# Execute the job based on job_type
result = await self._execute_job(job)
# Mark as completed
job["status"] = JobStatus.COMPLETED
job["completed_at"] = datetime.now(UTC)
job["progress"] = 1.0
job["result"] = result
await self._save_job_to_storage(job)
except asyncio.CancelledError:
# Job was cancelled
job["status"] = JobStatus.CANCELLED
job["completed_at"] = datetime.now(UTC)
await self._save_job_to_storage(job)
raise
except Exception as e:
# Job failed
job["status"] = JobStatus.FAILED
job["completed_at"] = datetime.now(UTC)
job["error"] = str(e)
job["result"] = {"error_details": traceback.format_exc()}
await self._save_job_to_storage(job)
finally:
# Clean up task reference
if job_id in self._tasks:
del self._tasks[job_id]
async def _execute_job(self, job: dict) -> dict:
"""
Execute a job based on its type.
If a custom executor is registered for the job type, it will be called.
Otherwise, raises an error for unknown job types.
"""
job_type = job["job_type"]
job_data = job["job_data"]
if job_type in self._job_executors:
executor = self._job_executors[job_type]
return await executor(job_data)
raise ValueError(f"No executor registered for job type: {job_type}")
async def get_job_info(self, job_id: str) -> dict:
"""Get complete information about a job."""
if job_id not in self._jobs:
raise ValueError(f"Job {job_id} not found")
job = self._jobs[job_id].copy()
# Remove internal job_data field from response
job.pop("job_data", None)
return job
async def cancel_job(self, job_id: str) -> bool:
"""Cancel a pending or running job."""
if job_id not in self._jobs:
return False
job = self._jobs[job_id]
# Can only cancel pending or running jobs
if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
return False
# Cancel the task if it exists
if job_id in self._tasks:
task = self._tasks[job_id]
if not task.done():
task.cancel()
# Update job status
job["status"] = JobStatus.CANCELLED
job["completed_at"] = datetime.now(UTC)
await self._save_job_to_storage(job)
return True
async def delete_job(self, job_id: str) -> bool:
"""Delete a completed or cancelled job."""
if job_id not in self._jobs:
return False
job = self._jobs[job_id]
# Can only delete completed, failed, or cancelled jobs
if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]:
return False
# Remove from memory and storage
del self._jobs[job_id]
await self._delete_job_from_storage(job_id)
return True
async def list_jobs(
self,
job_type: str | None = None,
status: JobStatus | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict]:
"""List jobs with optional filtering."""
jobs = list(self._jobs.values())
# Filter by job_type
if job_type is not None:
jobs = [j for j in jobs if j["job_type"] == job_type]
# Filter by status
if status is not None:
jobs = [j for j in jobs if j["status"] == status]
# Sort by created_at (newest first)
jobs.sort(key=lambda j: j["created_at"], reverse=True)
# Apply pagination
jobs = jobs[offset : offset + limit]
# Convert to return format (remove internal fields)
result = []
for job in jobs:
job_copy = job.copy()
# Remove internal job_data field
job_copy.pop("job_data", None)
result.append(job_copy)
return result