add readme and keep only stubs

This commit is contained in:
Swapna Lekkala 2025-11-04 10:42:00 -08:00
parent 0a98415e25
commit 9cf349a263
12 changed files with 328 additions and 1109 deletions

View file

@ -1,47 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .api import JobStatus, Scheduler
from .config import CelerySchedulerConfig, InlineSchedulerConfig, SchedulerConfig
async def scheduler_impl(config: SchedulerConfig) -> Scheduler:
"""
Factory function to instantiate scheduler implementations.
Args:
config: Scheduler configuration (InlineSchedulerConfig or CelerySchedulerConfig)
Returns:
Scheduler: An initialized scheduler instance
Raises:
ValueError: If the config type is unknown
"""
impl: Scheduler
if isinstance(config, InlineSchedulerConfig):
from .inline import InlineSchedulerImpl
impl = InlineSchedulerImpl(config)
elif isinstance(config, CelerySchedulerConfig):
from .celery import CelerySchedulerImpl
impl = CelerySchedulerImpl(config)
else:
raise ValueError(f"Unknown scheduler config type: {type(config)}")
await impl.initialize()
return impl
__all__ = [
"JobStatus",
"Scheduler",
"SchedulerConfig",
"InlineSchedulerConfig",
"CelerySchedulerConfig",
"scheduler_impl",
]

View file

@ -1,135 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from collections.abc import Awaitable, Callable
from enum import StrEnum
from typing import Protocol
class JobStatus(StrEnum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class Scheduler(Protocol):
"""
Abstract scheduler protocol for managing async jobs.
This provides a pluggable backend for job scheduling and execution.
"""
async def schedule_job(
self,
job_type: str,
job_data: dict,
metadata: dict | None = None,
) -> str:
"""
Schedule a new job for execution.
Args:
job_type: Type of job (e.g., "batch_processing", "log_aggregation", "metrics_collection")
job_data: Job-specific data and parameters
metadata: Additional metadata for tracking
Returns:
job_id: UUID for tracking the job
"""
...
async def get_job_info(self, job_id: str) -> dict:
"""
Get complete information about a job.
Returns:
{
"job_id": str,
"job_type": str,
"status": JobStatus,
"created_at": datetime,
"started_at": datetime | None,
"completed_at": datetime | None,
"progress": float, # 0.0 to 1.0
"metadata": dict,
"error": str | None, # Error message if status == FAILED
"result": dict | None, # Job result if status == COMPLETED
}
"""
...
async def cancel_job(self, job_id: str) -> bool:
"""
Cancel a pending or running job.
Args:
job_id: Job to cancel
Returns:
True if job was cancelled, False if not found or already completed
"""
...
async def delete_job(self, job_id: str) -> bool:
"""
Delete a completed or cancelled job.
Args:
job_id: Job to delete
Returns:
True if job was deleted, False if not found
"""
...
async def list_jobs(
self,
job_type: str | None = None,
status: JobStatus | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict]:
"""
List jobs with optional filtering.
Args:
job_type: Filter by job type (e.g., "batch_processing", "log_aggregation", "metrics_collection")
status: Filter by status
limit: Maximum number of jobs to return
offset: Offset for pagination
Returns:
List of job info dictionaries
"""
...
def register_job_executor(
self,
job_type: str,
executor: Callable[[dict], Awaitable[dict]],
) -> None:
"""
Register a job executor function for a specific job type.
This allows components to register custom job execution logic with the scheduler.
When a job of the registered type is executed, the scheduler will call the
registered executor function.
Args:
job_type: The type of job (e.g., "vector_store_file_batch")
executor: Async function that takes job_data dict and returns result dict
"""
...
async def initialize(self) -> None:
"""Initialize the scheduler (connect to backend, etc.)"""
...
async def shutdown(self) -> None:
"""Gracefully shutdown the scheduler"""
...

View file

@ -1,9 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .scheduler import CelerySchedulerImpl
__all__ = ["CelerySchedulerImpl"]

View file

@ -1,81 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from collections.abc import Awaitable, Callable
from ..api import JobStatus, Scheduler
from ..config import CelerySchedulerConfig
class CelerySchedulerImpl(Scheduler):
"""
Celery-based scheduler implementation for distributed multi-worker deployments.
This scheduler uses Celery to distribute jobs across multiple worker processes
or machines. It provides:
- Persistent job queue (via Redis/RabbitMQ broker)
- Multi-worker coordination
- Crash recovery (jobs survive server restarts)
- Distributed task execution
This is suitable for:
- Production deployments
- Multi-worker scenarios
- High availability requirements
"""
def __init__(self, config: CelerySchedulerConfig):
self.config = config
self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {}
# TODO: Initialize Celery app with broker and result backend
raise NotImplementedError("Celery scheduler not yet implemented")
def register_job_executor(
self,
job_type: str,
executor: Callable[[dict], Awaitable[dict]],
) -> None:
"""Register a job executor function for a specific job type."""
self._job_executors[job_type] = executor
async def initialize(self) -> None:
"""Initialize the Celery scheduler."""
raise NotImplementedError("Celery scheduler not yet implemented")
async def shutdown(self) -> None:
"""Gracefully shutdown the Celery scheduler."""
raise NotImplementedError("Celery scheduler not yet implemented")
async def schedule_job(
self,
job_type: str,
job_data: dict,
metadata: dict | None = None,
) -> str:
"""Schedule a new job for execution via Celery."""
raise NotImplementedError("Celery scheduler not yet implemented")
async def get_job_info(self, job_id: str) -> dict:
"""Get complete information about a job from Celery result backend."""
raise NotImplementedError("Celery scheduler not yet implemented")
async def cancel_job(self, job_id: str) -> bool:
"""Cancel a pending or running Celery job."""
raise NotImplementedError("Celery scheduler not yet implemented")
async def delete_job(self, job_id: str) -> bool:
"""Delete a completed or cancelled job from Celery result backend."""
raise NotImplementedError("Celery scheduler not yet implemented")
async def list_jobs(
self,
job_type: str | None = None,
status: JobStatus | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict]:
"""List jobs from Celery result backend with optional filtering."""
raise NotImplementedError("Celery scheduler not yet implemented")

View file

@ -1,58 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Annotated, Literal
from pydantic import BaseModel, Field
from llama_stack.core.storage.datatypes import KVStoreReference
class SchedulerConfig(BaseModel):
"""Base class for scheduler configurations."""
type: str
class InlineSchedulerConfig(SchedulerConfig):
"""
Configuration for inline (asyncio-based) scheduler.
This scheduler runs jobs in the same process using asyncio tasks.
Suitable for development and single-worker deployments.
"""
type: Literal["inline"] = "inline"
kvstore: KVStoreReference = Field(
description="KVStore reference for persisting job state",
)
max_concurrent_jobs: int = Field(
default=10,
description="Maximum number of jobs that can run concurrently",
)
class CelerySchedulerConfig(SchedulerConfig):
"""
Configuration for Celery-based distributed scheduler.
This scheduler distributes jobs across multiple worker processes/machines.
Suitable for production and multi-worker deployments.
"""
type: Literal["celery"] = "celery"
broker_url: str = Field(
description="Celery broker URL (e.g., 'redis://localhost:6379/0')",
)
result_backend: str = Field(
description="Celery result backend URL (e.g., 'redis://localhost:6379/1')",
)
SchedulerConfigUnion = Annotated[
InlineSchedulerConfig | CelerySchedulerConfig,
Field(discriminator="type"),
]

View file

@ -1,9 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .scheduler import InlineSchedulerImpl
__all__ = ["InlineSchedulerImpl"]

View file

@ -1,304 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
import json
import traceback
import uuid
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from typing import Any
from llama_stack.core.utils.serialize import EnumEncoder
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
from ..api import JobStatus, Scheduler
from ..config import InlineSchedulerConfig
JOB_PREFIX = "job_scheduler:jobs:"
class InlineSchedulerImpl(Scheduler):
"""
Inline scheduler implementation using asyncio for single-worker deployments.
This scheduler runs jobs in the same process using asyncio tasks. Jobs and their
state are persisted to KVStore for crash recovery.
This is suitable for:
- Development and testing
- Single-worker deployments
- Scenarios where external dependencies (Redis, RabbitMQ) are not available
"""
def __init__(self, config: InlineSchedulerConfig):
self.config = config
self._jobs: dict[str, dict[str, Any]] = {}
self._tasks: dict[str, asyncio.Task] = {}
self._semaphore: asyncio.Semaphore
self._shutdown_event = asyncio.Event()
self._kvstore: KVStore
self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {}
async def initialize(self) -> None:
"""Initialize the scheduler and load persisted jobs."""
self._semaphore = asyncio.Semaphore(self.config.max_concurrent_jobs)
# Initialize KVStore
self._kvstore = await kvstore_impl(self.config.kvstore)
# Load persisted jobs from KVStore
await self._load_jobs_from_storage()
# Resume incomplete jobs
await self._resume_incomplete_jobs()
def register_job_executor(
self,
job_type: str,
executor: Callable[[dict], Awaitable[dict]],
) -> None:
"""Register a job executor function for a specific job type."""
self._job_executors[job_type] = executor
async def _load_jobs_from_storage(self) -> None:
"""Load all jobs from KVStore into memory."""
start_key = JOB_PREFIX
end_key = f"{JOB_PREFIX}\xff"
stored_values = await self._kvstore.values_in_range(start_key, end_key)
for value in stored_values:
job = json.loads(value)
# Deserialize datetime strings back to datetime objects
job["created_at"] = datetime.fromisoformat(job["created_at"])
if job.get("started_at"):
job["started_at"] = datetime.fromisoformat(job["started_at"])
if job.get("completed_at"):
job["completed_at"] = datetime.fromisoformat(job["completed_at"])
job["status"] = JobStatus(job["status"])
self._jobs[job["job_id"]] = job
async def _resume_incomplete_jobs(self) -> None:
"""Resume jobs that were running when server crashed."""
for job_id, job in self._jobs.items():
if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]:
# Reset running jobs to pending
if job["status"] == JobStatus.RUNNING:
job["status"] = JobStatus.PENDING
job["started_at"] = None
await self._save_job_to_storage(job)
# Restart the job
task = asyncio.create_task(self._run_job(job_id))
self._tasks[job_id] = task
async def _save_job_to_storage(self, job: dict[str, Any]) -> None:
"""Persist job to KVStore."""
key = f"{JOB_PREFIX}{job['job_id']}"
await self._kvstore.set(key, json.dumps(job, cls=EnumEncoder))
async def _delete_job_from_storage(self, job_id: str) -> None:
"""Delete job from KVStore."""
key = f"{JOB_PREFIX}{job_id}"
await self._kvstore.delete(key)
async def shutdown(self) -> None:
"""Gracefully shutdown the scheduler."""
self._shutdown_event.set()
# Cancel all running tasks
for task in self._tasks.values():
if not task.done():
task.cancel()
# Wait for all tasks to complete
if self._tasks:
await asyncio.gather(*self._tasks.values(), return_exceptions=True)
self._tasks.clear()
async def schedule_job(
self,
job_type: str,
job_data: dict,
metadata: dict | None = None,
) -> str:
"""Schedule a new job for execution."""
job_id = str(uuid.uuid4())
job_info = {
"job_id": job_id,
"job_type": job_type,
"status": JobStatus.PENDING,
"created_at": datetime.now(UTC),
"started_at": None,
"completed_at": None,
"progress": 0.0,
"metadata": metadata or {},
"job_data": job_data,
"error": None,
"result": None,
}
self._jobs[job_id] = job_info
# Persist to KVStore
await self._save_job_to_storage(job_info)
# Create and schedule the task
task = asyncio.create_task(self._run_job(job_id))
self._tasks[job_id] = task
return job_id
async def _run_job(self, job_id: str) -> None:
"""Run a job asynchronously."""
job = self._jobs[job_id]
try:
# Acquire semaphore to limit concurrent jobs
async with self._semaphore:
# Update status to RUNNING
job["status"] = JobStatus.RUNNING
job["started_at"] = datetime.now(UTC)
await self._save_job_to_storage(job)
# Execute the job based on job_type
result = await self._execute_job(job)
# Mark as completed
job["status"] = JobStatus.COMPLETED
job["completed_at"] = datetime.now(UTC)
job["progress"] = 1.0
job["result"] = result
await self._save_job_to_storage(job)
except asyncio.CancelledError:
# Job was cancelled
job["status"] = JobStatus.CANCELLED
job["completed_at"] = datetime.now(UTC)
await self._save_job_to_storage(job)
raise
except Exception as e:
# Job failed
job["status"] = JobStatus.FAILED
job["completed_at"] = datetime.now(UTC)
job["error"] = str(e)
job["result"] = {"error_details": traceback.format_exc()}
await self._save_job_to_storage(job)
finally:
# Clean up task reference
if job_id in self._tasks:
del self._tasks[job_id]
async def _execute_job(self, job: dict) -> dict:
"""
Execute a job based on its type.
If a custom executor is registered for the job type, it will be called.
Otherwise, raises an error for unknown job types.
"""
job_type = job["job_type"]
job_data = job["job_data"]
# Check if a custom executor is registered for this job type
if job_type in self._job_executors:
executor = self._job_executors[job_type]
return await executor(job_data)
raise ValueError(f"No executor registered for job type: {job_type}")
async def get_job_info(self, job_id: str) -> dict:
"""Get complete information about a job."""
if job_id not in self._jobs:
raise ValueError(f"Job {job_id} not found")
job = self._jobs[job_id].copy()
# Remove internal job_data field from response
job.pop("job_data", None)
return job
async def cancel_job(self, job_id: str) -> bool:
"""Cancel a pending or running job."""
if job_id not in self._jobs:
return False
job = self._jobs[job_id]
# Can only cancel pending or running jobs
if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
return False
# Cancel the task if it exists
if job_id in self._tasks:
task = self._tasks[job_id]
if not task.done():
task.cancel()
# Update job status
job["status"] = JobStatus.CANCELLED
job["completed_at"] = datetime.now(UTC)
await self._save_job_to_storage(job)
return True
async def delete_job(self, job_id: str) -> bool:
"""Delete a completed or cancelled job."""
if job_id not in self._jobs:
return False
job = self._jobs[job_id]
# Can only delete completed, failed, or cancelled jobs
if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]:
return False
# Remove from memory and storage
del self._jobs[job_id]
await self._delete_job_from_storage(job_id)
return True
async def list_jobs(
self,
job_type: str | None = None,
status: JobStatus | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict]:
"""List jobs with optional filtering."""
jobs = list(self._jobs.values())
# Filter by job_type
if job_type is not None:
jobs = [j for j in jobs if j["job_type"] == job_type]
# Filter by status
if status is not None:
jobs = [j for j in jobs if j["status"] == status]
# Sort by created_at (newest first)
jobs.sort(key=lambda j: j["created_at"], reverse=True)
# Apply pagination
jobs = jobs[offset : offset + limit]
# Convert to return format (remove internal fields)
result = []
for job in jobs:
job_copy = job.copy()
# Remove internal job_data field
job_copy.pop("job_data", None)
result.append(job_copy)
return result

View file

@ -0,0 +1,237 @@
# Job Scheduler
## Run Config → Adapter Init Flow
```
┌─────────────────────────────────────────────────────────┐
│ 1. Run Config YAML │
├─────────────────────────────────────────────────────────┤
│ providers: │
│ job_scheduler: │
│ - provider_type: inline::scheduler │
│ config: { kvstore: {...} } │
│ vector_io: │
│ - provider_type: inline::faiss │
│ config: { persistence: {...} } │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 2. Stack.initialize() │
│ → resolve_impls(run_config, ...) │
│ → instantiate_providers(sorted_by_deps) │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 3. For each provider (in dependency order): │
│ │
│ A. Job Scheduler (no dependencies) │
│ ├─ get_provider_impl(config, deps={}) │
│ ├─ scheduler = InlineSchedulerImpl(config) │
│ └─ await scheduler.initialize() │
│ │
│ B. Vector IO (depends on inference, job_scheduler) │
│ ├─ deps = { │
│ │ Api.inference: <impl>, │
│ │ Api.job_scheduler: <scheduler_impl>
│ │ } │
│ ├─ get_provider_impl(config, deps) │
│ │ ├─ adapter = FaissVectorIOAdapter( │
│ │ │ config, │
│ │ │ deps[Api.inference], │
│ │ │ deps.get(Api.files), │
│ │ │ deps.get(Api.job_scheduler) ← Here! │
│ │ │ ) │
│ │ └─ await adapter.initialize() │
│ └─ return adapter │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 4. Adapter.__init__(scheduler) │
│ ├─ self.scheduler = scheduler │
│ └─ scheduler.register_job_executor(...) │
└─────────────────────────────────────────────────────────┘
```
**Key Points:**
- **Dependency resolution**: Providers instantiated in topological order
- **Automatic injection**: Framework passes `deps[Api.job_scheduler]`
- **Registry declares it**: `optional_api_dependencies=[Api.job_scheduler]`
- **User configures it**: Run YAML specifies inline vs celery
---
## Server Startup Flow
## Two-Phase Initialization
Separate scheduler initialization into two phases:
- **Phase 1 (`initialize`)**: Load jobs from storage, but don't resume them
- **Phase 2 (`start`)**: Resume jobs after all executors are registered
```
┌─────────────────────────────────────────────────────────┐
│ Stack.initialize() │
├─────────────────────────────────────────────────────────┤
│ → resolve_impls() │
│ ├─ Job Scheduler │
│ │ └─ scheduler.initialize() ← Phase 1 │
│ │ • Connect to KVStore │
│ │ • Load jobs into memory │
│ │ • DON'T resume jobs yet ❌ │
│ │ │
│ ├─ Vector IO Provider │
│ │ ├─ __init__(): │
│ │ │ └─ scheduler.register_job_executor(...) │
│ │ └─ provider.initialize() │
│ │ │
│ └─ [All providers initialized, executors ✓] │
│ │
│ → Start schedulers │
│ └─ scheduler.start() ← Phase 2 │
│ └─ Resume incomplete jobs │
│ • Executors are registered ✓ │
│ • Jobs execute successfully ✓ │
└─────────────────────────────────────────────────────────┘
```
## Implementation
### 1. Scheduler Protocol (`api.py`)
```python
async def initialize(self) -> None:
"""
Initialize the scheduler (connect to backend, load persisted jobs).
Note: Does NOT start processing jobs. Call start() after all
executors are registered.
"""
...
async def start(self) -> None:
"""
Start processing jobs after all executors are registered.
This resumes any incomplete jobs from storage. Must be called
after initialize() and after all job executors have been
registered via register_job_executor().
"""
...
```
### 2. Scheduler Implementation
```python
class InlineSchedulerImpl(Scheduler):
async def initialize(self) -> None:
self._kvstore = await kvstore_impl(self.config.kvstore)
await self._load_jobs_from_storage()
# DON'T call _resume_incomplete_jobs() here!
async def start(self) -> None:
"""Call AFTER all executors are registered."""
await self._resume_incomplete_jobs()
```
### 3. Stack Integration (`stack.py`)
```python
async def initialize(self):
# ... existing code ...
impls = await resolve_impls(...) # All providers initialized
# NEW: Start schedulers after all executors registered
if Api.job_scheduler in impls:
await impls[Api.job_scheduler].start()
self.impls = impls
```
### 4. Provider Integration
```python
class VectorIOAdapter:
def __init__(self, config, inference_api, files_api, scheduler):
# Register executor during __init__ (before scheduler.start())
scheduler.register_job_executor(
"vector_store_file_batch", self._process_file_batch
)
```
## Behavior
### Case 1: Clean Start (No Jobs)
```python
await scheduler.start()
# _resume_incomplete_jobs() finds no jobs
# Loop completes, nothing happens ✓
```
### Case 2: After Crash (Jobs Exist)
```python
await scheduler.start()
# _resume_incomplete_jobs() finds PENDING/RUNNING jobs
# Executors are registered ✓
# Creates asyncio tasks successfully ✓
# Jobs run in background ✓
```
## Usage: OpenAI Vector Store Mixin
### Current Pattern (Direct asyncio tasks)
```python
class OpenAIVectorStoreMixin:
def __init__(self, files_api, kvstore):
self._file_batch_tasks: dict[str, asyncio.Task] = {}
async def openai_create_vector_store_file_batch(self, params):
# Create background task directly
task = asyncio.create_task(self._process_file_batch_async(params))
self._file_batch_tasks[batch_id] = task
```
### New Pattern (Job Scheduler)
```python
class OpenAIVectorStoreMixin:
def __init__(self, files_api, kvstore, scheduler):
self.scheduler = scheduler
# Register executor in __init__ (before scheduler.start())
self.scheduler.register_job_executor(
"vector_store_file_batch", self._execute_file_batch
)
async def openai_create_vector_store_file_batch(self, params):
# Schedule job through scheduler
job_id = await self.scheduler.schedule_job(
job_type="vector_store_file_batch",
job_data={
"batch_id": batch_id,
"vector_store_id": vector_store_id,
"file_ids": params.file_ids,
"attributes": params.attributes,
"chunking_strategy": chunking_strategy.model_dump(),
},
metadata={"batch_id": batch_id},
)
return batch_object
async def _execute_file_batch(self, job_data: dict) -> dict:
"""Executor called by scheduler."""
batch_id = job_data["batch_id"]
vector_store_id = job_data["vector_store_id"]
file_ids = job_data["file_ids"]
# Process files (existing logic from _process_file_batch_async)
# await self._process_files_with_concurrency(...)
return {"status": "completed", "files_processed": len(file_ids)}
```
### Benefits
- ✅ **Crash recovery**: Jobs survive server restarts
- ✅ **Persistence**: Job state stored in KVStore
- ✅ **Monitoring**: Query job status via `get_job_info(job_id)`
- ✅ **Cancellation**: Cancel jobs via `cancel_job(job_id)`
- ✅ **Clean separation**: Job scheduling decoupled from execution

View file

@ -127,7 +127,20 @@ class Scheduler(Protocol):
... ...
async def initialize(self) -> None: async def initialize(self) -> None:
"""Initialize the scheduler (connect to backend, etc.)""" """
Initialize the scheduler (connect to backend, load persisted jobs).
Note: Does NOT start processing jobs. Call start() after all executors are registered.
"""
...
async def start(self) -> None:
"""
Start processing jobs after all executors are registered.
This resumes any incomplete jobs from storage. Must be called after initialize()
and after all job executors have been registered via register_job_executor().
"""
... ...
async def shutdown(self) -> None: async def shutdown(self) -> None:

View file

@ -30,8 +30,6 @@ class CelerySchedulerImpl(Scheduler):
def __init__(self, config: CelerySchedulerConfig): def __init__(self, config: CelerySchedulerConfig):
self.config = config self.config = config
self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {}
# TODO: Initialize Celery app with broker and result backend
raise NotImplementedError("Celery scheduler not yet implemented")
def register_job_executor( def register_job_executor(
self, self,
@ -39,15 +37,19 @@ class CelerySchedulerImpl(Scheduler):
executor: Callable[[dict], Awaitable[dict]], executor: Callable[[dict], Awaitable[dict]],
) -> None: ) -> None:
"""Register a job executor function for a specific job type.""" """Register a job executor function for a specific job type."""
self._job_executors[job_type] = executor raise NotImplementedError("Celery scheduler implementation is not yet available")
async def initialize(self) -> None: async def initialize(self) -> None:
"""Initialize the Celery scheduler.""" """Initialize the Celery scheduler."""
raise NotImplementedError("Celery scheduler not yet implemented") raise NotImplementedError("Celery scheduler implementation is not yet available")
async def start(self) -> None:
"""Start processing jobs after all executors are registered."""
raise NotImplementedError("Celery scheduler implementation is not yet available")
async def shutdown(self) -> None: async def shutdown(self) -> None:
"""Gracefully shutdown the Celery scheduler.""" """Gracefully shutdown the Celery scheduler."""
raise NotImplementedError("Celery scheduler not yet implemented") raise NotImplementedError("Celery scheduler implementation is not yet available")
async def schedule_job( async def schedule_job(
self, self,
@ -56,19 +58,19 @@ class CelerySchedulerImpl(Scheduler):
metadata: dict | None = None, metadata: dict | None = None,
) -> str: ) -> str:
"""Schedule a new job for execution via Celery.""" """Schedule a new job for execution via Celery."""
raise NotImplementedError("Celery scheduler not yet implemented") raise NotImplementedError("Celery scheduler implementation is not yet available")
async def get_job_info(self, job_id: str) -> dict: async def get_job_info(self, job_id: str) -> dict:
"""Get complete information about a job from Celery result backend.""" """Get complete information about a job from Celery result backend."""
raise NotImplementedError("Celery scheduler not yet implemented") raise NotImplementedError("Celery scheduler implementation is not yet available")
async def cancel_job(self, job_id: str) -> bool: async def cancel_job(self, job_id: str) -> bool:
"""Cancel a pending or running Celery job.""" """Cancel a pending or running Celery job."""
raise NotImplementedError("Celery scheduler not yet implemented") raise NotImplementedError("Celery scheduler implementation is not yet available")
async def delete_job(self, job_id: str) -> bool: async def delete_job(self, job_id: str) -> bool:
"""Delete a completed or cancelled job from Celery result backend.""" """Delete a completed or cancelled job from Celery result backend."""
raise NotImplementedError("Celery scheduler not yet implemented") raise NotImplementedError("Celery scheduler implementation is not yet available")
async def list_jobs( async def list_jobs(
self, self,
@ -78,4 +80,4 @@ class CelerySchedulerImpl(Scheduler):
offset: int = 0, offset: int = 0,
) -> list[dict]: ) -> list[dict]:
"""List jobs from Celery result backend with optional filtering.""" """List jobs from Celery result backend with optional filtering."""
raise NotImplementedError("Celery scheduler not yet implemented") raise NotImplementedError("Celery scheduler implementation is not yet available")

View file

@ -4,23 +4,11 @@
# This source code is licensed under the terms described in the LICENSE file in # This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree. # the root directory of this source tree.
import asyncio
import json
import traceback
import uuid
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from datetime import UTC, datetime
from typing import Any
from llama_stack.core.utils.serialize import EnumEncoder
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.api import KVStore
from ..api import JobStatus, Scheduler from ..api import JobStatus, Scheduler
from ..config import InlineSchedulerConfig from ..config import InlineSchedulerConfig
JOB_PREFIX = "job_scheduler:jobs:"
class InlineSchedulerImpl(Scheduler): class InlineSchedulerImpl(Scheduler):
""" """
@ -37,25 +25,14 @@ class InlineSchedulerImpl(Scheduler):
def __init__(self, config: InlineSchedulerConfig): def __init__(self, config: InlineSchedulerConfig):
self.config = config self.config = config
self._jobs: dict[str, dict[str, Any]] = {}
self._tasks: dict[str, asyncio.Task] = {}
self._semaphore: asyncio.Semaphore
self._shutdown_event = asyncio.Event()
self._kvstore: KVStore
self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {}
async def initialize(self) -> None: async def initialize(self) -> None:
"""Initialize the scheduler and load persisted jobs.""" """Initialize the scheduler and load persisted jobs."""
self._semaphore = asyncio.Semaphore(self.config.max_concurrent_jobs) raise NotImplementedError("Inline scheduler implementation is not yet available")
# Initialize KVStore async def start(self) -> None:
self._kvstore = await kvstore_impl(self.config.kvstore) """Start processing jobs after all executors are registered."""
raise NotImplementedError("Inline scheduler implementation is not yet available")
# Load persisted jobs from KVStore
await self._load_jobs_from_storage()
# Resume incomplete jobs
await self._resume_incomplete_jobs()
def register_job_executor( def register_job_executor(
self, self,
@ -63,64 +40,11 @@ class InlineSchedulerImpl(Scheduler):
executor: Callable[[dict], Awaitable[dict]], executor: Callable[[dict], Awaitable[dict]],
) -> None: ) -> None:
"""Register a job executor function for a specific job type.""" """Register a job executor function for a specific job type."""
self._job_executors[job_type] = executor raise NotImplementedError("Inline scheduler implementation is not yet available")
async def _load_jobs_from_storage(self) -> None:
"""Load all jobs from KVStore into memory."""
start_key = JOB_PREFIX
end_key = f"{JOB_PREFIX}\xff"
stored_values = await self._kvstore.values_in_range(start_key, end_key)
for value in stored_values:
job = json.loads(value)
job["created_at"] = datetime.fromisoformat(job["created_at"])
if job.get("started_at"):
job["started_at"] = datetime.fromisoformat(job["started_at"])
if job.get("completed_at"):
job["completed_at"] = datetime.fromisoformat(job["completed_at"])
job["status"] = JobStatus(job["status"])
self._jobs[job["job_id"]] = job
async def _resume_incomplete_jobs(self) -> None:
"""Resume jobs that were running when server crashed."""
for job_id, job in self._jobs.items():
if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]:
# Reset running jobs to pending
if job["status"] == JobStatus.RUNNING:
job["status"] = JobStatus.PENDING
job["started_at"] = None
await self._save_job_to_storage(job)
# Restart the job
task = asyncio.create_task(self._run_job(job_id))
self._tasks[job_id] = task
async def _save_job_to_storage(self, job: dict[str, Any]) -> None:
"""Persist job to KVStore."""
key = f"{JOB_PREFIX}{job['job_id']}"
await self._kvstore.set(key, json.dumps(job, cls=EnumEncoder))
async def _delete_job_from_storage(self, job_id: str) -> None:
"""Delete job from KVStore."""
key = f"{JOB_PREFIX}{job_id}"
await self._kvstore.delete(key)
async def shutdown(self) -> None: async def shutdown(self) -> None:
"""Gracefully shutdown the scheduler.""" """Gracefully shutdown the scheduler."""
self._shutdown_event.set() raise NotImplementedError("Inline scheduler implementation is not yet available")
# Cancel all running tasks
for task in self._tasks.values():
if not task.done():
task.cancel()
# Wait for all tasks to complete
if self._tasks:
await asyncio.gather(*self._tasks.values(), return_exceptions=True)
self._tasks.clear()
async def schedule_job( async def schedule_job(
self, self,
@ -129,143 +53,19 @@ class InlineSchedulerImpl(Scheduler):
metadata: dict | None = None, metadata: dict | None = None,
) -> str: ) -> str:
"""Schedule a new job for execution.""" """Schedule a new job for execution."""
job_id = str(uuid.uuid4()) raise NotImplementedError("Inline scheduler implementation is not yet available")
job_info = {
"job_id": job_id,
"job_type": job_type,
"status": JobStatus.PENDING,
"created_at": datetime.now(UTC),
"started_at": None,
"completed_at": None,
"progress": 0.0,
"metadata": metadata or {},
"job_data": job_data,
"error": None,
"result": None,
}
self._jobs[job_id] = job_info
# Persist to KVStore
await self._save_job_to_storage(job_info)
# Create and schedule the task
task = asyncio.create_task(self._run_job(job_id))
self._tasks[job_id] = task
return job_id
async def _run_job(self, job_id: str) -> None:
"""Run a job asynchronously."""
job = self._jobs[job_id]
try:
# Acquire semaphore to limit concurrent jobs
async with self._semaphore:
# Update status to RUNNING
job["status"] = JobStatus.RUNNING
job["started_at"] = datetime.now(UTC)
await self._save_job_to_storage(job)
# Execute the job based on job_type
result = await self._execute_job(job)
# Mark as completed
job["status"] = JobStatus.COMPLETED
job["completed_at"] = datetime.now(UTC)
job["progress"] = 1.0
job["result"] = result
await self._save_job_to_storage(job)
except asyncio.CancelledError:
# Job was cancelled
job["status"] = JobStatus.CANCELLED
job["completed_at"] = datetime.now(UTC)
await self._save_job_to_storage(job)
raise
except Exception as e:
# Job failed
job["status"] = JobStatus.FAILED
job["completed_at"] = datetime.now(UTC)
job["error"] = str(e)
job["result"] = {"error_details": traceback.format_exc()}
await self._save_job_to_storage(job)
finally:
# Clean up task reference
if job_id in self._tasks:
del self._tasks[job_id]
async def _execute_job(self, job: dict) -> dict:
"""
Execute a job based on its type.
If a custom executor is registered for the job type, it will be called.
Otherwise, raises an error for unknown job types.
"""
job_type = job["job_type"]
job_data = job["job_data"]
if job_type in self._job_executors:
executor = self._job_executors[job_type]
return await executor(job_data)
raise ValueError(f"No executor registered for job type: {job_type}")
async def get_job_info(self, job_id: str) -> dict: async def get_job_info(self, job_id: str) -> dict:
"""Get complete information about a job.""" """Get complete information about a job."""
if job_id not in self._jobs: raise NotImplementedError("Inline scheduler implementation is not yet available")
raise ValueError(f"Job {job_id} not found")
job = self._jobs[job_id].copy()
# Remove internal job_data field from response
job.pop("job_data", None)
return job
async def cancel_job(self, job_id: str) -> bool: async def cancel_job(self, job_id: str) -> bool:
"""Cancel a pending or running job.""" """Cancel a pending or running job."""
if job_id not in self._jobs: raise NotImplementedError("Inline scheduler implementation is not yet available")
return False
job = self._jobs[job_id]
# Can only cancel pending or running jobs
if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]:
return False
# Cancel the task if it exists
if job_id in self._tasks:
task = self._tasks[job_id]
if not task.done():
task.cancel()
# Update job status
job["status"] = JobStatus.CANCELLED
job["completed_at"] = datetime.now(UTC)
await self._save_job_to_storage(job)
return True
async def delete_job(self, job_id: str) -> bool: async def delete_job(self, job_id: str) -> bool:
"""Delete a completed or cancelled job.""" """Delete a completed or cancelled job."""
if job_id not in self._jobs: raise NotImplementedError("Inline scheduler implementation is not yet available")
return False
job = self._jobs[job_id]
# Can only delete completed, failed, or cancelled jobs
if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]:
return False
# Remove from memory and storage
del self._jobs[job_id]
await self._delete_job_from_storage(job_id)
return True
async def list_jobs( async def list_jobs(
self, self,
@ -275,28 +75,4 @@ class InlineSchedulerImpl(Scheduler):
offset: int = 0, offset: int = 0,
) -> list[dict]: ) -> list[dict]:
"""List jobs with optional filtering.""" """List jobs with optional filtering."""
jobs = list(self._jobs.values()) raise NotImplementedError("Inline scheduler implementation is not yet available")
# Filter by job_type
if job_type is not None:
jobs = [j for j in jobs if j["job_type"] == job_type]
# Filter by status
if status is not None:
jobs = [j for j in jobs if j["status"] == status]
# Sort by created_at (newest first)
jobs.sort(key=lambda j: j["created_at"], reverse=True)
# Apply pagination
jobs = jobs[offset : offset + limit]
# Convert to return format (remove internal fields)
result = []
for job in jobs:
job_copy = job.copy()
# Remove internal job_data field
job_copy.pop("job_data", None)
result.append(job_copy)
return result

View file

@ -7,13 +7,13 @@
import asyncio import asyncio
import tempfile import tempfile
from pathlib import Path from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest import pytest
from llama_stack.core.storage.datatypes import KVStoreReference, SqliteKVStoreConfig from llama_stack.core.storage.datatypes import KVStoreReference, SqliteKVStoreConfig
from llama_stack.providers.utils.job_scheduler import ( from llama_stack.providers.utils.job_scheduler import (
InlineSchedulerConfig, InlineSchedulerConfig,
JobStatus,
scheduler_impl, scheduler_impl,
) )
from llama_stack.providers.utils.kvstore import register_kvstore_backends from llama_stack.providers.utils.kvstore import register_kvstore_backends
@ -43,238 +43,72 @@ def scheduler_config():
) )
async def test_inline_scheduler_basic(scheduler_config): async def test_scheduler_api_exists(scheduler_config):
"""Test basic scheduler functionality.""" """Test that scheduler API is properly defined."""
scheduler = await scheduler_impl(scheduler_config) scheduler = await scheduler_impl(scheduler_config)
try: # Verify all required methods exist
# Register default executor assert hasattr(scheduler, "initialize")
assert hasattr(scheduler, "start")
assert hasattr(scheduler, "shutdown")
assert hasattr(scheduler, "register_job_executor")
assert hasattr(scheduler, "schedule_job")
assert hasattr(scheduler, "get_job_info")
assert hasattr(scheduler, "cancel_job")
assert hasattr(scheduler, "delete_job")
assert hasattr(scheduler, "list_jobs")
async def test_scheduler_not_implemented(scheduler_config):
"""Test that scheduler methods raise NotImplementedError."""
scheduler = await scheduler_impl(scheduler_config)
# Test that all methods raise NotImplementedError
with pytest.raises(NotImplementedError, match="not yet available"):
await scheduler.initialize()
with pytest.raises(NotImplementedError, match="not yet available"):
await scheduler.start()
with pytest.raises(NotImplementedError, match="not yet available"):
scheduler.register_job_executor("test_job", default_test_executor) scheduler.register_job_executor("test_job", default_test_executor)
# Schedule a job with pytest.raises(NotImplementedError, match="not yet available"):
job_id = await scheduler.schedule_job( await scheduler.schedule_job("test_job", {})
job_type="test_job",
job_data={"test": "data"},
metadata={"user": "test_user"},
)
assert job_id is not None with pytest.raises(NotImplementedError, match="not yet available"):
assert isinstance(job_id, str) await scheduler.get_job_info("job_id")
# Wait a bit for the job to complete with pytest.raises(NotImplementedError, match="not yet available"):
await asyncio.sleep(0.2) await scheduler.cancel_job("job_id")
# Get job info with pytest.raises(NotImplementedError, match="not yet available"):
job_info = await scheduler.get_job_info(job_id) await scheduler.delete_job("job_id")
assert job_info["job_id"] == job_id
assert job_info["job_type"] == "test_job"
assert job_info["status"] == JobStatus.COMPLETED.value
assert job_info["metadata"]["user"] == "test_user"
assert job_info["progress"] == 1.0
assert job_info["result"] is not None
finally: with pytest.raises(NotImplementedError, match="not yet available"):
await scheduler.list_jobs()
with pytest.raises(NotImplementedError, match="not yet available"):
await scheduler.shutdown() await scheduler.shutdown()
async def test_inline_scheduler_list_jobs(scheduler_config): async def test_two_phase_initialization_pattern(scheduler_config):
"""Test listing jobs with filters.""" """Test that the two-phase initialization pattern is supported."""
scheduler = await scheduler_impl(scheduler_config) scheduler = await scheduler_impl(scheduler_config)
try: # Mock the methods to test the pattern
# Register executors for different job types scheduler.initialize = AsyncMock()
scheduler.register_job_executor("batch_processing", default_test_executor) scheduler.start = AsyncMock()
scheduler.register_job_executor("log_aggregation", default_test_executor) scheduler.register_job_executor = MagicMock()
# Schedule multiple jobs # Phase 1: Initialize (loads jobs, doesn't start them)
await scheduler.schedule_job( await scheduler.initialize()
job_type="batch_processing", scheduler.initialize.assert_called_once()
job_data={"batch": 1},
)
await scheduler.schedule_job(
job_type="log_aggregation",
job_data={"logs": []},
)
await scheduler.schedule_job(
job_type="batch_processing",
job_data={"batch": 2},
)
# Wait for jobs to complete # Register executors after initialization
await asyncio.sleep(0.3)
# List all jobs
all_jobs = await scheduler.list_jobs()
assert len(all_jobs) == 3
# List jobs by type
batch_jobs = await scheduler.list_jobs(job_type="batch_processing")
assert len(batch_jobs) == 2
log_jobs = await scheduler.list_jobs(job_type="log_aggregation")
assert len(log_jobs) == 1
# List jobs by status
completed_jobs = await scheduler.list_jobs(status=JobStatus.COMPLETED)
assert len(completed_jobs) == 3
finally:
await scheduler.shutdown()
async def test_inline_scheduler_cancel_job(scheduler_config):
"""Test cancelling a job."""
scheduler = await scheduler_impl(scheduler_config)
try:
# Register executor
scheduler.register_job_executor("long_running_job", default_test_executor)
# Schedule a job
job_id = await scheduler.schedule_job(
job_type="long_running_job",
job_data={"duration": 10},
)
# Try to cancel immediately
await scheduler.cancel_job(job_id)
# Wait a bit
await asyncio.sleep(0.1)
# Check job status
job_info = await scheduler.get_job_info(job_id)
# Job might be CANCELLED or COMPLETED depending on timing
assert job_info["status"] in [JobStatus.CANCELLED.value, JobStatus.COMPLETED.value]
finally:
await scheduler.shutdown()
async def test_inline_scheduler_delete_job(scheduler_config):
"""Test deleting a completed job."""
scheduler = await scheduler_impl(scheduler_config)
try:
# Register executor
scheduler.register_job_executor("test_job", default_test_executor) scheduler.register_job_executor("test_job", default_test_executor)
scheduler.register_job_executor.assert_called_once()
# Schedule a job # Phase 2: Start (resumes jobs after executors registered)
job_id = await scheduler.schedule_job( await scheduler.start()
job_type="test_job", scheduler.start.assert_called_once()
job_data={"test": "data"},
)
# Wait for completion
await asyncio.sleep(0.2)
# Verify job exists
job_info = await scheduler.get_job_info(job_id)
assert job_info["status"] == JobStatus.COMPLETED.value
# Delete the job
success = await scheduler.delete_job(job_id)
assert success is True
# Verify job is deleted
with pytest.raises(ValueError, match="not found"):
await scheduler.get_job_info(job_id)
# Deleting again should return False
success = await scheduler.delete_job(job_id)
assert success is False
finally:
await scheduler.shutdown()
async def test_inline_scheduler_concurrent_jobs(scheduler_config):
"""Test running multiple jobs concurrently."""
scheduler = await scheduler_impl(scheduler_config)
try:
# Register executor
scheduler.register_job_executor("test_job", default_test_executor)
# Schedule multiple jobs
job_ids = []
for i in range(5):
job_id = await scheduler.schedule_job(
job_type="test_job",
job_data={"index": i},
)
job_ids.append(job_id)
# Wait for all jobs to complete
await asyncio.sleep(0.5)
# Verify all jobs completed
for job_id in job_ids:
job_info = await scheduler.get_job_info(job_id)
assert job_info["status"] == JobStatus.COMPLETED.value
finally:
await scheduler.shutdown()
async def test_inline_scheduler_pagination(scheduler_config):
"""Test job listing with pagination."""
scheduler = await scheduler_impl(scheduler_config)
try:
# Register executor
scheduler.register_job_executor("test_job", default_test_executor)
# Schedule 10 jobs
for i in range(10):
await scheduler.schedule_job(
job_type="test_job",
job_data={"index": i},
)
# Wait for completion
await asyncio.sleep(0.5)
# Test pagination
page1 = await scheduler.list_jobs(limit=5, offset=0)
assert len(page1) == 5
page2 = await scheduler.list_jobs(limit=5, offset=5)
assert len(page2) == 5
page3 = await scheduler.list_jobs(limit=5, offset=10)
assert len(page3) == 0
finally:
await scheduler.shutdown()
async def test_inline_scheduler_register_job_executor(scheduler_config):
"""Test registering a job executor."""
scheduler = await scheduler_impl(scheduler_config)
try:
# Define a custom job executor
async def custom_executor(job_data: dict) -> dict:
return {"custom": "result", "input": job_data}
# Register the executor
scheduler.register_job_executor("custom_job_type", custom_executor)
# Schedule a job with the custom type
job_id = await scheduler.schedule_job(
job_type="custom_job_type",
job_data={"test": "data"},
)
# Wait for job to complete
await asyncio.sleep(0.2)
# Verify the custom executor was called
job_info = await scheduler.get_job_info(job_id)
assert job_info["status"] == JobStatus.COMPLETED.value
assert job_info["result"]["custom"] == "result"
assert job_info["result"]["input"]["test"] == "data"
finally:
await scheduler.shutdown()