This commit is contained in:
slekkala1 2025-12-02 09:58:57 +01:00 committed by GitHub
commit e5ed33f966
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 790 additions and 0 deletions

View file

@ -0,0 +1,269 @@
# Job Scheduler
## Run Config → Adapter Init Flow
```
┌─────────────────────────────────────────────────────────┐
│ 1. Run Config YAML │
├─────────────────────────────────────────────────────────┤
│ providers: │
│ job_scheduler: │
│ - provider_type: inline::scheduler │
│ config: { kvstore: {...} } │
│ vector_io: │
│ - provider_type: inline::faiss │
│ config: { persistence: {...} } │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 2. Stack.initialize() │
│ → resolve_impls(run_config, ...) │
│ → instantiate_providers(sorted_by_deps) │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 3. For each provider (in dependency order): │
│ │
│ A. Job Scheduler (no dependencies) │
│ ├─ get_provider_impl(config, deps={}) │
│ ├─ scheduler = InlineSchedulerImpl(config) │
│ └─ await scheduler.initialize() │
│ │
│ B. Vector IO (depends on inference, job_scheduler) │
│ ├─ deps = { │
│ │ Api.inference: <impl>, │
│ │ Api.job_scheduler: <scheduler>
│ │ } │
│ ├─ get_provider_impl(config, deps) │
│ │ ├─ adapter = FaissVectorIOAdapter( │
│ │ │ config, │
│ │ │ deps[Api.inference], │
│ │ │ deps.get(Api.files), │
│ │ │ deps.get(Api.job_scheduler) ← Here! │
│ │ │ ) │
│ │ └─ await adapter.initialize() │
│ └─ return adapter │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 4. Adapter.__init__(scheduler) │
│ ├─ self.scheduler = scheduler │
│ └─ scheduler.register_job_executor(...) │
└─────────────────────────────────────────────────────────┘
```
**Key Points:**
- **Dependency resolution**: Providers instantiated in topological order
- **Automatic injection**: Framework passes `deps[Api.job_scheduler]`
- **Registry declares it**: `optional_api_dependencies=[Api.job_scheduler]`
- **User configures it**: Run YAML specifies inline vs celery
---
## Server Startup Flow
### Two-Phase Initialization
Separate scheduler initialization into two phases:
- **Phase 1 (`initialize`)**: Load jobs from storage, but don't resume them
- **Phase 2 (`start`)**: Resume jobs after all executors are registered
```
┌─────────────────────────────────────────────────────────┐
│ Stack.initialize() │
├─────────────────────────────────────────────────────────┤
│ → resolve_impls() │
│ ├─ Job Scheduler │
│ │ └─ scheduler.initialize() ← Phase 1 │
│ │ • Connect to KVStore │
│ │ • Load jobs into memory │
│ │ • DON'T resume jobs yet ❌ │
│ │ │
│ ├─ Vector IO Provider │
│ │ ├─ __init__(): │
│ │ │ └─ scheduler.register_job_executor(...) │
│ │ └─ provider.initialize() │
│ │ │
│ └─ [All providers initialized, executors ✓] │
│ │
│ → Start schedulers │
│ └─ scheduler.start() ← Phase 2 │
│ └─ Resume incomplete jobs │
│ • Executors are registered ✓ │
│ • Jobs execute successfully ✓ │
└─────────────────────────────────────────────────────────┘
```
## Implementation
### 1. Scheduler Protocol (`api.py`)
```python
async def initialize(self) -> None:
"""
Initialize the scheduler (connect to backend, load persisted jobs).
Note: Does NOT start processing jobs. Call start() after all
executors are registered.
"""
...
async def start(self) -> None:
"""
Start processing jobs after all executors are registered.
This resumes any incomplete jobs from storage. Must be called
after initialize() and after all job executors have been
registered via register_job_executor().
"""
...
```
### 2. Scheduler Implementation
```python
class InlineSchedulerImpl(Scheduler):
async def initialize(self) -> None:
self._kvstore = await kvstore_impl(self.config.kvstore)
await self._load_jobs_from_storage()
# DON'T call _resume_incomplete_jobs() here!
async def start(self) -> None:
"""Call AFTER all executors are registered."""
await self._resume_incomplete_jobs()
```
### 3. Stack Integration (`stack.py`)
```python
async def initialize(self):
# ... existing code ...
impls = await resolve_impls(...) # All providers initialized
# NEW: Start schedulers after all executors registered
if Api.job_scheduler in impls:
await impls[Api.job_scheduler].start()
self.impls = impls
```
### 4. Provider Integration
```python
class VectorIOAdapter:
def __init__(self, config, inference_api, files_api, scheduler):
# Register executor during __init__ (before scheduler.start())
scheduler.register_job_executor(
"vector_store_file_batch", self._process_file_batch
)
```
### Behavior
### Case 1: Clean Start (No Jobs)
```python
await scheduler.start()
# _resume_incomplete_jobs() finds no jobs
# Loop completes, nothing happens ✓
```
### Case 2: After Crash (Jobs Exist)
```python
await scheduler.start()
# _resume_incomplete_jobs() finds PENDING/RUNNING jobs
# Executors are registered ✓
# Creates asyncio tasks successfully ✓
# Jobs run in background ✓
```
## Usage: OpenAI Vector Store Mixin
### Current Pattern (Direct asyncio tasks)
```python
class OpenAIVectorStoreMixin:
def __init__(self, files_api, kvstore):
self._file_batch_tasks: dict[str, asyncio.Task] = {}
async def openai_create_vector_store_file_batch(self, params):
# Create background task directly
task = asyncio.create_task(self._process_file_batch_async(params))
self._file_batch_tasks[batch_id] = task
```
### New Pattern (Job Scheduler)
```python
class OpenAIVectorStoreMixin:
def __init__(self, files_api, kvstore, scheduler):
self.scheduler = scheduler
# Register executor in __init__ (before scheduler.start())
self.scheduler.register_job_executor(
"vector_store_file_batch", self._execute_file_batch
)
async def openai_create_vector_store_file_batch(self, params):
# Schedule job through scheduler
job_id = await self.scheduler.schedule_job(
job_type="vector_store_file_batch",
job_data={
"batch_id": batch_id,
"vector_store_id": vector_store_id,
"file_ids": params.file_ids,
"attributes": params.attributes,
"chunking_strategy": chunking_strategy.model_dump(),
},
metadata={"batch_id": batch_id},
)
return batch_object
async def _execute_file_batch(self, job_data: dict) -> dict:
"""Executor called by scheduler."""
batch_id = job_data["batch_id"]
vector_store_id = job_data["vector_store_id"]
file_ids = job_data["file_ids"]
# Process files (existing logic from _process_file_batch_async)
# await self._process_files_with_concurrency(...)
return {"status": "completed", "files_processed": len(file_ids)}
```
### Benefits
- ✅ **Crash recovery**: Jobs survive server restarts
- ✅ **Persistence**: Job state stored in KVStore
- ✅ **Monitoring**: Query job status via `get_job_info(job_id)`
- ✅ **Cancellation**: Cancel jobs via `cancel_job(job_id)`
- ✅ **Clean separation**: Job scheduling decoupled from execution
## Single Worker vs Multi Worker
✅ Single Worker (Inline Scheduler - Not Implemented Yet)
```
providers:
job_scheduler:
- provider_type: inline::scheduler
config:
kvstore: { ... }
max_concurrent_jobs: 10
```
Works because:
- Jobs run in the same process via asyncio.create_task()
- In-memory _jobs dict is shared within the process
- Crash recovery works (jobs persist to KVStore)
---
✅ Multi Worker (Celery Scheduler - Not Implemented Yet)
```
providers:
job_scheduler:
- provider_type: celery::scheduler
config:
broker_url: redis://localhost:6379/0
result_backend: redis://localhost:6379/1
```
Works because:
- Shared message broker (Redis/RabbitMQ)
- Celery handles distributed task queue
- Workers coordinate via broker
- Any worker can execute any job

View file

@ -0,0 +1,16 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .api import JobStatus, Scheduler
from .config import CelerySchedulerConfig, InlineSchedulerConfig, SchedulerConfig
__all__ = [
"JobStatus",
"Scheduler",
"SchedulerConfig",
"InlineSchedulerConfig",
"CelerySchedulerConfig",
]

View file

@ -0,0 +1,151 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from collections.abc import Awaitable, Callable
from enum import StrEnum
from typing import Protocol
class JobStatus(StrEnum):
VALIDATING = "validating"
FAILED = "failed"
IN_PROGRESS = "in_progress"
FINALIZING = "finalizing"
COMPLETED = "completed"
EXPIRED = "expired"
CANCELLING = "cancelling"
CANCELLED = "cancelled"
class Scheduler(Protocol):
"""
Abstract scheduler protocol for managing async jobs.
This provides a pluggable backend for job scheduling and execution.
"""
async def schedule_job(
self,
job_type: str,
job_data: dict,
metadata: dict | None = None,
) -> str:
"""
Schedule a new job for execution.
Args:
job_type: Type of job (e.g., "batch_processing", "log_aggregation", "metrics_collection")
job_data: Job-specific data and parameters
metadata: Additional metadata for tracking
Returns:
job_id: UUID for tracking the job
"""
...
async def get_job_info(self, job_id: str) -> dict:
"""
Get complete information about a job.
Returns:
{
"job_id": str,
"job_type": str,
"status": JobStatus,
"created_at": datetime,
"started_at": datetime | None,
"completed_at": datetime | None,
"progress": float, # 0.0 to 1.0
"metadata": dict,
"error": str | None, # Error message if status == FAILED
"result": dict | None, # Job result if status == COMPLETED
}
"""
...
async def cancel_job(self, job_id: str) -> bool:
"""
Cancel a pending or running job.
Args:
job_id: Job to cancel
Returns:
True if job was cancelled, False if not found or already completed
"""
...
async def delete_job(self, job_id: str) -> bool:
"""
Delete a completed or cancelled job.
Args:
job_id: Job to delete
Returns:
True if job was deleted, False if not found
"""
...
async def list_jobs(
self,
job_type: str | None = None,
status: JobStatus | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict]:
"""
List jobs with optional filtering.
Args:
job_type: Filter by job type (e.g., "batch_processing", "log_aggregation", "metrics_collection")
status: Filter by status
limit: Maximum number of jobs to return
offset: Offset for pagination
Returns:
List of job info dictionaries
"""
...
def register_job_executor(
self,
job_type: str,
executor: Callable[[dict], Awaitable[dict]],
) -> None:
"""
Register a job executor function for a specific job type.
This allows components to register custom job execution logic with the scheduler.
When a job of the registered type is executed, the scheduler will call the
registered executor function.
Args:
job_type: The type of job (e.g., "vector_store_file_batch")
executor: Async function that takes job_data dict and returns result dict
"""
...
async def initialize(self) -> None:
"""
Initialize the scheduler (connect to backend, load persisted jobs).
Note: Does NOT start processing jobs. Call start() after all executors are registered.
"""
...
async def start(self) -> None:
"""
Start processing jobs after all executors are registered.
This resumes any incomplete jobs from storage. Must be called after initialize()
and after all job executors have been registered via register_job_executor().
"""
...
async def shutdown(self) -> None:
"""Gracefully shutdown the scheduler"""
...

View file

@ -0,0 +1,9 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .scheduler import CelerySchedulerImpl
__all__ = ["CelerySchedulerImpl"]

View file

@ -0,0 +1,83 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from collections.abc import Awaitable, Callable
from ..api import JobStatus, Scheduler
from ..config import CelerySchedulerConfig
class CelerySchedulerImpl(Scheduler):
"""
Celery-based scheduler implementation for distributed multi-worker deployments.
This scheduler uses Celery to distribute jobs across multiple worker processes
or machines. It provides:
- Persistent job queue (via Redis/RabbitMQ broker)
- Multi-worker coordination
- Crash recovery (jobs survive server restarts)
- Distributed task execution
This is suitable for:
- Production deployments
- Multi-worker scenarios
- High availability requirements
"""
def __init__(self, config: CelerySchedulerConfig):
self.config = config
self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {}
def register_job_executor(
self,
job_type: str,
executor: Callable[[dict], Awaitable[dict]],
) -> None:
"""Register a job executor function for a specific job type."""
raise NotImplementedError("Celery scheduler implementation is not yet available")
async def initialize(self) -> None:
"""Initialize the Celery scheduler."""
raise NotImplementedError("Celery scheduler implementation is not yet available")
async def start(self) -> None:
"""Start processing jobs after all executors are registered."""
raise NotImplementedError("Celery scheduler implementation is not yet available")
async def shutdown(self) -> None:
"""Gracefully shutdown the Celery scheduler."""
raise NotImplementedError("Celery scheduler implementation is not yet available")
async def schedule_job(
self,
job_type: str,
job_data: dict,
metadata: dict | None = None,
) -> str:
"""Schedule a new job for execution via Celery."""
raise NotImplementedError("Celery scheduler implementation is not yet available")
async def get_job_info(self, job_id: str) -> dict:
"""Get complete information about a job from Celery result backend."""
raise NotImplementedError("Celery scheduler implementation is not yet available")
async def cancel_job(self, job_id: str) -> bool:
"""Cancel a pending or running Celery job."""
raise NotImplementedError("Celery scheduler implementation is not yet available")
async def delete_job(self, job_id: str) -> bool:
"""Delete a completed or cancelled job from Celery result backend."""
raise NotImplementedError("Celery scheduler implementation is not yet available")
async def list_jobs(
self,
job_type: str | None = None,
status: JobStatus | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict]:
"""List jobs from Celery result backend with optional filtering."""
raise NotImplementedError("Celery scheduler implementation is not yet available")

View file

@ -0,0 +1,58 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Annotated, Literal
from pydantic import BaseModel, Field
from llama_stack.core.storage.datatypes import KVStoreReference
class SchedulerConfig(BaseModel):
"""Base class for scheduler configurations."""
type: str
class InlineSchedulerConfig(SchedulerConfig):
"""
Configuration for inline (asyncio-based) scheduler.
This scheduler runs jobs in the same process using asyncio tasks.
Suitable for development and single-worker deployments.
"""
type: Literal["inline"] = "inline"
kvstore: KVStoreReference = Field(
description="KVStore reference for persisting job state",
)
max_concurrent_jobs: int = Field(
default=10,
description="Maximum number of jobs that can run concurrently",
)
class CelerySchedulerConfig(SchedulerConfig):
"""
Configuration for Celery-based distributed scheduler.
This scheduler distributes jobs across multiple worker processes/machines.
Suitable for production and multi-worker deployments.
"""
type: Literal["celery"] = "celery"
broker_url: str = Field(
description="Celery broker URL (e.g., 'redis://localhost:6379/0')",
)
result_backend: str = Field(
description="Celery result backend URL (e.g., 'redis://localhost:6379/1')",
)
SchedulerConfigUnion = Annotated[
InlineSchedulerConfig | CelerySchedulerConfig,
Field(discriminator="type"),
]

View file

@ -0,0 +1,9 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .scheduler import InlineSchedulerImpl
__all__ = ["InlineSchedulerImpl"]

View file

@ -0,0 +1,78 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from collections.abc import Awaitable, Callable
from ..api import JobStatus, Scheduler
from ..config import InlineSchedulerConfig
class InlineSchedulerImpl(Scheduler):
"""
Inline scheduler implementation using asyncio for single-worker deployments.
This scheduler runs jobs in the same process using asyncio tasks. Jobs and their
state are persisted to KVStore for crash recovery.
This is suitable for:
- Development and testing
- Single-worker deployments
- Scenarios where external dependencies (Redis, RabbitMQ) are not available
"""
def __init__(self, config: InlineSchedulerConfig):
self.config = config
async def initialize(self) -> None:
"""Initialize the scheduler and load persisted jobs."""
raise NotImplementedError("Inline scheduler implementation is not yet available")
async def start(self) -> None:
"""Start processing jobs after all executors are registered."""
raise NotImplementedError("Inline scheduler implementation is not yet available")
def register_job_executor(
self,
job_type: str,
executor: Callable[[dict], Awaitable[dict]],
) -> None:
"""Register a job executor function for a specific job type."""
raise NotImplementedError("Inline scheduler implementation is not yet available")
async def shutdown(self) -> None:
"""Gracefully shutdown the scheduler."""
raise NotImplementedError("Inline scheduler implementation is not yet available")
async def schedule_job(
self,
job_type: str,
job_data: dict,
metadata: dict | None = None,
) -> str:
"""Schedule a new job for execution."""
raise NotImplementedError("Inline scheduler implementation is not yet available")
async def get_job_info(self, job_id: str) -> dict:
"""Get complete information about a job."""
raise NotImplementedError("Inline scheduler implementation is not yet available")
async def cancel_job(self, job_id: str) -> bool:
"""Cancel a pending or running job."""
raise NotImplementedError("Inline scheduler implementation is not yet available")
async def delete_job(self, job_id: str) -> bool:
"""Delete a completed or cancelled job."""
raise NotImplementedError("Inline scheduler implementation is not yet available")
async def list_jobs(
self,
job_type: str | None = None,
status: JobStatus | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict]:
"""List jobs with optional filtering."""
raise NotImplementedError("Inline scheduler implementation is not yet available")

View file

@ -0,0 +1,117 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from llama_stack.core.storage.datatypes import KVStoreReference, SqliteKVStoreConfig
from llama_stack.providers.utils.job_scheduler import InlineSchedulerConfig
from llama_stack.providers.utils.kvstore import register_kvstore_backends
async def default_test_executor(job_data: dict) -> dict:
"""Default test executor that simulates work."""
await asyncio.sleep(0.1)
return {
"message": "Test job completed successfully",
"job_data": job_data,
}
@pytest.fixture
def scheduler_config():
"""Create a test scheduler config with temporary SQLite database."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_scheduler.db"
backend_name = "kv_scheduler_test"
kvstore_config = SqliteKVStoreConfig(db_path=str(db_path))
register_kvstore_backends({backend_name: kvstore_config})
yield InlineSchedulerConfig(
kvstore=KVStoreReference(backend=backend_name, namespace="job_scheduler"),
max_concurrent_jobs=5,
)
async def test_scheduler_api_exists(scheduler_config):
"""Test that scheduler API is properly defined."""
from llama_stack.providers.utils.job_scheduler.inline import InlineSchedulerImpl
scheduler = InlineSchedulerImpl(scheduler_config)
# Verify all required methods exist
assert hasattr(scheduler, "initialize")
assert hasattr(scheduler, "start")
assert hasattr(scheduler, "shutdown")
assert hasattr(scheduler, "register_job_executor")
assert hasattr(scheduler, "schedule_job")
assert hasattr(scheduler, "get_job_info")
assert hasattr(scheduler, "cancel_job")
assert hasattr(scheduler, "delete_job")
assert hasattr(scheduler, "list_jobs")
async def test_scheduler_not_implemented(scheduler_config):
"""Test that scheduler methods raise NotImplementedError."""
from llama_stack.providers.utils.job_scheduler.inline import InlineSchedulerImpl
scheduler = InlineSchedulerImpl(scheduler_config)
# Test that all methods raise NotImplementedError
with pytest.raises(NotImplementedError, match="not yet available"):
await scheduler.initialize()
with pytest.raises(NotImplementedError, match="not yet available"):
await scheduler.start()
with pytest.raises(NotImplementedError, match="not yet available"):
scheduler.register_job_executor("test_job", default_test_executor)
with pytest.raises(NotImplementedError, match="not yet available"):
await scheduler.schedule_job("test_job", {})
with pytest.raises(NotImplementedError, match="not yet available"):
await scheduler.get_job_info("job_id")
with pytest.raises(NotImplementedError, match="not yet available"):
await scheduler.cancel_job("job_id")
with pytest.raises(NotImplementedError, match="not yet available"):
await scheduler.delete_job("job_id")
with pytest.raises(NotImplementedError, match="not yet available"):
await scheduler.list_jobs()
with pytest.raises(NotImplementedError, match="not yet available"):
await scheduler.shutdown()
async def test_two_phase_initialization_pattern(scheduler_config):
"""Test that the two-phase initialization pattern is supported."""
from llama_stack.providers.utils.job_scheduler.inline import InlineSchedulerImpl
scheduler = InlineSchedulerImpl(scheduler_config)
# Mock the methods to test the pattern
scheduler.initialize = AsyncMock()
scheduler.start = AsyncMock()
scheduler.register_job_executor = MagicMock()
# Phase 1: Initialize (loads jobs, doesn't start them)
await scheduler.initialize()
scheduler.initialize.assert_called_once()
# Register executors after initialization
scheduler.register_job_executor("test_job", default_test_executor)
scheduler.register_job_executor.assert_called_once()
# Phase 2: Start (resumes jobs after executors registered)
await scheduler.start()
scheduler.start.assert_called_once()