diff --git a/llama_stack/providers/utils/job_scheduler/__init__.py b/llama_stack/providers/utils/job_scheduler/__init__.py deleted file mode 100644 index e09d9f45d..000000000 --- a/llama_stack/providers/utils/job_scheduler/__init__.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -from .api import JobStatus, Scheduler -from .config import CelerySchedulerConfig, InlineSchedulerConfig, SchedulerConfig - - -async def scheduler_impl(config: SchedulerConfig) -> Scheduler: - """ - Factory function to instantiate scheduler implementations. - - Args: - config: Scheduler configuration (InlineSchedulerConfig or CelerySchedulerConfig) - - Returns: - Scheduler: An initialized scheduler instance - - Raises: - ValueError: If the config type is unknown - """ - impl: Scheduler - if isinstance(config, InlineSchedulerConfig): - from .inline import InlineSchedulerImpl - - impl = InlineSchedulerImpl(config) - elif isinstance(config, CelerySchedulerConfig): - from .celery import CelerySchedulerImpl - - impl = CelerySchedulerImpl(config) - else: - raise ValueError(f"Unknown scheduler config type: {type(config)}") - - await impl.initialize() - return impl - - -__all__ = [ - "JobStatus", - "Scheduler", - "SchedulerConfig", - "InlineSchedulerConfig", - "CelerySchedulerConfig", - "scheduler_impl", -] diff --git a/llama_stack/providers/utils/job_scheduler/api.py b/llama_stack/providers/utils/job_scheduler/api.py deleted file mode 100644 index e46da8fcb..000000000 --- a/llama_stack/providers/utils/job_scheduler/api.py +++ /dev/null @@ -1,135 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -from collections.abc import Awaitable, Callable -from enum import StrEnum -from typing import Protocol - - -class JobStatus(StrEnum): - PENDING = "pending" - RUNNING = "running" - COMPLETED = "completed" - FAILED = "failed" - CANCELLED = "cancelled" - - -class Scheduler(Protocol): - """ - Abstract scheduler protocol for managing async jobs. - - This provides a pluggable backend for job scheduling and execution. - """ - - async def schedule_job( - self, - job_type: str, - job_data: dict, - metadata: dict | None = None, - ) -> str: - """ - Schedule a new job for execution. - - Args: - job_type: Type of job (e.g., "batch_processing", "log_aggregation", "metrics_collection") - job_data: Job-specific data and parameters - metadata: Additional metadata for tracking - - Returns: - job_id: UUID for tracking the job - """ - ... - - async def get_job_info(self, job_id: str) -> dict: - """ - Get complete information about a job. - - Returns: - { - "job_id": str, - "job_type": str, - "status": JobStatus, - "created_at": datetime, - "started_at": datetime | None, - "completed_at": datetime | None, - "progress": float, # 0.0 to 1.0 - "metadata": dict, - "error": str | None, # Error message if status == FAILED - "result": dict | None, # Job result if status == COMPLETED - } - """ - ... - - async def cancel_job(self, job_id: str) -> bool: - """ - Cancel a pending or running job. - - Args: - job_id: Job to cancel - - Returns: - True if job was cancelled, False if not found or already completed - """ - ... - - async def delete_job(self, job_id: str) -> bool: - """ - Delete a completed or cancelled job. - - Args: - job_id: Job to delete - - Returns: - True if job was deleted, False if not found - """ - ... - - async def list_jobs( - self, - job_type: str | None = None, - status: JobStatus | None = None, - limit: int = 100, - offset: int = 0, - ) -> list[dict]: - """ - List jobs with optional filtering. - - Args: - job_type: Filter by job type (e.g., "batch_processing", "log_aggregation", "metrics_collection") - status: Filter by status - limit: Maximum number of jobs to return - offset: Offset for pagination - - Returns: - List of job info dictionaries - """ - ... - - def register_job_executor( - self, - job_type: str, - executor: Callable[[dict], Awaitable[dict]], - ) -> None: - """ - Register a job executor function for a specific job type. - - This allows components to register custom job execution logic with the scheduler. - When a job of the registered type is executed, the scheduler will call the - registered executor function. - - Args: - job_type: The type of job (e.g., "vector_store_file_batch") - executor: Async function that takes job_data dict and returns result dict - """ - ... - - async def initialize(self) -> None: - """Initialize the scheduler (connect to backend, etc.)""" - ... - - async def shutdown(self) -> None: - """Gracefully shutdown the scheduler""" - ... diff --git a/llama_stack/providers/utils/job_scheduler/celery/__init__.py b/llama_stack/providers/utils/job_scheduler/celery/__init__.py deleted file mode 100644 index 27d9d01cb..000000000 --- a/llama_stack/providers/utils/job_scheduler/celery/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -from .scheduler import CelerySchedulerImpl - -__all__ = ["CelerySchedulerImpl"] diff --git a/llama_stack/providers/utils/job_scheduler/celery/scheduler.py b/llama_stack/providers/utils/job_scheduler/celery/scheduler.py deleted file mode 100644 index 0a5b6e220..000000000 --- a/llama_stack/providers/utils/job_scheduler/celery/scheduler.py +++ /dev/null @@ -1,81 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -from collections.abc import Awaitable, Callable - -from ..api import JobStatus, Scheduler -from ..config import CelerySchedulerConfig - - -class CelerySchedulerImpl(Scheduler): - """ - Celery-based scheduler implementation for distributed multi-worker deployments. - - This scheduler uses Celery to distribute jobs across multiple worker processes - or machines. It provides: - - Persistent job queue (via Redis/RabbitMQ broker) - - Multi-worker coordination - - Crash recovery (jobs survive server restarts) - - Distributed task execution - - This is suitable for: - - Production deployments - - Multi-worker scenarios - - High availability requirements - """ - - def __init__(self, config: CelerySchedulerConfig): - self.config = config - self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} - # TODO: Initialize Celery app with broker and result backend - raise NotImplementedError("Celery scheduler not yet implemented") - - def register_job_executor( - self, - job_type: str, - executor: Callable[[dict], Awaitable[dict]], - ) -> None: - """Register a job executor function for a specific job type.""" - self._job_executors[job_type] = executor - - async def initialize(self) -> None: - """Initialize the Celery scheduler.""" - raise NotImplementedError("Celery scheduler not yet implemented") - - async def shutdown(self) -> None: - """Gracefully shutdown the Celery scheduler.""" - raise NotImplementedError("Celery scheduler not yet implemented") - - async def schedule_job( - self, - job_type: str, - job_data: dict, - metadata: dict | None = None, - ) -> str: - """Schedule a new job for execution via Celery.""" - raise NotImplementedError("Celery scheduler not yet implemented") - - async def get_job_info(self, job_id: str) -> dict: - """Get complete information about a job from Celery result backend.""" - raise NotImplementedError("Celery scheduler not yet implemented") - - async def cancel_job(self, job_id: str) -> bool: - """Cancel a pending or running Celery job.""" - raise NotImplementedError("Celery scheduler not yet implemented") - - async def delete_job(self, job_id: str) -> bool: - """Delete a completed or cancelled job from Celery result backend.""" - raise NotImplementedError("Celery scheduler not yet implemented") - - async def list_jobs( - self, - job_type: str | None = None, - status: JobStatus | None = None, - limit: int = 100, - offset: int = 0, - ) -> list[dict]: - """List jobs from Celery result backend with optional filtering.""" - raise NotImplementedError("Celery scheduler not yet implemented") diff --git a/llama_stack/providers/utils/job_scheduler/config.py b/llama_stack/providers/utils/job_scheduler/config.py deleted file mode 100644 index d8eba89fb..000000000 --- a/llama_stack/providers/utils/job_scheduler/config.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -from typing import Annotated, Literal - -from pydantic import BaseModel, Field - -from llama_stack.core.storage.datatypes import KVStoreReference - - -class SchedulerConfig(BaseModel): - """Base class for scheduler configurations.""" - - type: str - - -class InlineSchedulerConfig(SchedulerConfig): - """ - Configuration for inline (asyncio-based) scheduler. - - This scheduler runs jobs in the same process using asyncio tasks. - Suitable for development and single-worker deployments. - """ - - type: Literal["inline"] = "inline" - kvstore: KVStoreReference = Field( - description="KVStore reference for persisting job state", - ) - max_concurrent_jobs: int = Field( - default=10, - description="Maximum number of jobs that can run concurrently", - ) - - -class CelerySchedulerConfig(SchedulerConfig): - """ - Configuration for Celery-based distributed scheduler. - - This scheduler distributes jobs across multiple worker processes/machines. - Suitable for production and multi-worker deployments. - """ - - type: Literal["celery"] = "celery" - broker_url: str = Field( - description="Celery broker URL (e.g., 'redis://localhost:6379/0')", - ) - result_backend: str = Field( - description="Celery result backend URL (e.g., 'redis://localhost:6379/1')", - ) - - -SchedulerConfigUnion = Annotated[ - InlineSchedulerConfig | CelerySchedulerConfig, - Field(discriminator="type"), -] diff --git a/llama_stack/providers/utils/job_scheduler/inline/__init__.py b/llama_stack/providers/utils/job_scheduler/inline/__init__.py deleted file mode 100644 index 5be56af5e..000000000 --- a/llama_stack/providers/utils/job_scheduler/inline/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -from .scheduler import InlineSchedulerImpl - -__all__ = ["InlineSchedulerImpl"] diff --git a/llama_stack/providers/utils/job_scheduler/inline/scheduler.py b/llama_stack/providers/utils/job_scheduler/inline/scheduler.py deleted file mode 100644 index b253b8a59..000000000 --- a/llama_stack/providers/utils/job_scheduler/inline/scheduler.py +++ /dev/null @@ -1,304 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import asyncio -import json -import traceback -import uuid -from collections.abc import Awaitable, Callable -from datetime import UTC, datetime -from typing import Any - -from llama_stack.core.utils.serialize import EnumEncoder -from llama_stack.providers.utils.kvstore import kvstore_impl -from llama_stack.providers.utils.kvstore.api import KVStore - -from ..api import JobStatus, Scheduler -from ..config import InlineSchedulerConfig - -JOB_PREFIX = "job_scheduler:jobs:" - - -class InlineSchedulerImpl(Scheduler): - """ - Inline scheduler implementation using asyncio for single-worker deployments. - - This scheduler runs jobs in the same process using asyncio tasks. Jobs and their - state are persisted to KVStore for crash recovery. - - This is suitable for: - - Development and testing - - Single-worker deployments - - Scenarios where external dependencies (Redis, RabbitMQ) are not available - """ - - def __init__(self, config: InlineSchedulerConfig): - self.config = config - self._jobs: dict[str, dict[str, Any]] = {} - self._tasks: dict[str, asyncio.Task] = {} - self._semaphore: asyncio.Semaphore - self._shutdown_event = asyncio.Event() - self._kvstore: KVStore - self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} - - async def initialize(self) -> None: - """Initialize the scheduler and load persisted jobs.""" - self._semaphore = asyncio.Semaphore(self.config.max_concurrent_jobs) - - # Initialize KVStore - self._kvstore = await kvstore_impl(self.config.kvstore) - - # Load persisted jobs from KVStore - await self._load_jobs_from_storage() - - # Resume incomplete jobs - await self._resume_incomplete_jobs() - - def register_job_executor( - self, - job_type: str, - executor: Callable[[dict], Awaitable[dict]], - ) -> None: - """Register a job executor function for a specific job type.""" - self._job_executors[job_type] = executor - - async def _load_jobs_from_storage(self) -> None: - """Load all jobs from KVStore into memory.""" - start_key = JOB_PREFIX - end_key = f"{JOB_PREFIX}\xff" - - stored_values = await self._kvstore.values_in_range(start_key, end_key) - - for value in stored_values: - job = json.loads(value) - # Deserialize datetime strings back to datetime objects - job["created_at"] = datetime.fromisoformat(job["created_at"]) - if job.get("started_at"): - job["started_at"] = datetime.fromisoformat(job["started_at"]) - if job.get("completed_at"): - job["completed_at"] = datetime.fromisoformat(job["completed_at"]) - job["status"] = JobStatus(job["status"]) - - self._jobs[job["job_id"]] = job - - async def _resume_incomplete_jobs(self) -> None: - """Resume jobs that were running when server crashed.""" - for job_id, job in self._jobs.items(): - if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: - # Reset running jobs to pending - if job["status"] == JobStatus.RUNNING: - job["status"] = JobStatus.PENDING - job["started_at"] = None - await self._save_job_to_storage(job) - - # Restart the job - task = asyncio.create_task(self._run_job(job_id)) - self._tasks[job_id] = task - - async def _save_job_to_storage(self, job: dict[str, Any]) -> None: - """Persist job to KVStore.""" - key = f"{JOB_PREFIX}{job['job_id']}" - await self._kvstore.set(key, json.dumps(job, cls=EnumEncoder)) - - async def _delete_job_from_storage(self, job_id: str) -> None: - """Delete job from KVStore.""" - key = f"{JOB_PREFIX}{job_id}" - await self._kvstore.delete(key) - - async def shutdown(self) -> None: - """Gracefully shutdown the scheduler.""" - self._shutdown_event.set() - - # Cancel all running tasks - for task in self._tasks.values(): - if not task.done(): - task.cancel() - - # Wait for all tasks to complete - if self._tasks: - await asyncio.gather(*self._tasks.values(), return_exceptions=True) - - self._tasks.clear() - - async def schedule_job( - self, - job_type: str, - job_data: dict, - metadata: dict | None = None, - ) -> str: - """Schedule a new job for execution.""" - job_id = str(uuid.uuid4()) - - job_info = { - "job_id": job_id, - "job_type": job_type, - "status": JobStatus.PENDING, - "created_at": datetime.now(UTC), - "started_at": None, - "completed_at": None, - "progress": 0.0, - "metadata": metadata or {}, - "job_data": job_data, - "error": None, - "result": None, - } - - self._jobs[job_id] = job_info - - # Persist to KVStore - await self._save_job_to_storage(job_info) - - # Create and schedule the task - task = asyncio.create_task(self._run_job(job_id)) - self._tasks[job_id] = task - - return job_id - - async def _run_job(self, job_id: str) -> None: - """Run a job asynchronously.""" - job = self._jobs[job_id] - - try: - # Acquire semaphore to limit concurrent jobs - async with self._semaphore: - # Update status to RUNNING - job["status"] = JobStatus.RUNNING - job["started_at"] = datetime.now(UTC) - await self._save_job_to_storage(job) - - # Execute the job based on job_type - result = await self._execute_job(job) - - # Mark as completed - job["status"] = JobStatus.COMPLETED - job["completed_at"] = datetime.now(UTC) - job["progress"] = 1.0 - job["result"] = result - await self._save_job_to_storage(job) - - except asyncio.CancelledError: - # Job was cancelled - job["status"] = JobStatus.CANCELLED - job["completed_at"] = datetime.now(UTC) - await self._save_job_to_storage(job) - raise - - except Exception as e: - # Job failed - job["status"] = JobStatus.FAILED - job["completed_at"] = datetime.now(UTC) - job["error"] = str(e) - job["result"] = {"error_details": traceback.format_exc()} - await self._save_job_to_storage(job) - - finally: - # Clean up task reference - if job_id in self._tasks: - del self._tasks[job_id] - - async def _execute_job(self, job: dict) -> dict: - """ - Execute a job based on its type. - - If a custom executor is registered for the job type, it will be called. - Otherwise, raises an error for unknown job types. - """ - job_type = job["job_type"] - job_data = job["job_data"] - - # Check if a custom executor is registered for this job type - if job_type in self._job_executors: - executor = self._job_executors[job_type] - return await executor(job_data) - - raise ValueError(f"No executor registered for job type: {job_type}") - - async def get_job_info(self, job_id: str) -> dict: - """Get complete information about a job.""" - if job_id not in self._jobs: - raise ValueError(f"Job {job_id} not found") - - job = self._jobs[job_id].copy() - - # Remove internal job_data field from response - job.pop("job_data", None) - - return job - - async def cancel_job(self, job_id: str) -> bool: - """Cancel a pending or running job.""" - if job_id not in self._jobs: - return False - - job = self._jobs[job_id] - - # Can only cancel pending or running jobs - if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: - return False - - # Cancel the task if it exists - if job_id in self._tasks: - task = self._tasks[job_id] - if not task.done(): - task.cancel() - - # Update job status - job["status"] = JobStatus.CANCELLED - job["completed_at"] = datetime.now(UTC) - await self._save_job_to_storage(job) - - return True - - async def delete_job(self, job_id: str) -> bool: - """Delete a completed or cancelled job.""" - if job_id not in self._jobs: - return False - - job = self._jobs[job_id] - - # Can only delete completed, failed, or cancelled jobs - if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: - return False - - # Remove from memory and storage - del self._jobs[job_id] - await self._delete_job_from_storage(job_id) - - return True - - async def list_jobs( - self, - job_type: str | None = None, - status: JobStatus | None = None, - limit: int = 100, - offset: int = 0, - ) -> list[dict]: - """List jobs with optional filtering.""" - jobs = list(self._jobs.values()) - - # Filter by job_type - if job_type is not None: - jobs = [j for j in jobs if j["job_type"] == job_type] - - # Filter by status - if status is not None: - jobs = [j for j in jobs if j["status"] == status] - - # Sort by created_at (newest first) - jobs.sort(key=lambda j: j["created_at"], reverse=True) - - # Apply pagination - jobs = jobs[offset : offset + limit] - - # Convert to return format (remove internal fields) - result = [] - for job in jobs: - job_copy = job.copy() - # Remove internal job_data field - job_copy.pop("job_data", None) - result.append(job_copy) - - return result diff --git a/src/llama_stack/providers/utils/job_scheduler/README.md b/src/llama_stack/providers/utils/job_scheduler/README.md new file mode 100644 index 000000000..a32248ecc --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/README.md @@ -0,0 +1,237 @@ +# Job Scheduler + +## Run Config → Adapter Init Flow + +``` +┌─────────────────────────────────────────────────────────┐ +│ 1. Run Config YAML │ +├─────────────────────────────────────────────────────────┤ +│ providers: │ +│ job_scheduler: │ +│ - provider_type: inline::scheduler │ +│ config: { kvstore: {...} } │ +│ vector_io: │ +│ - provider_type: inline::faiss │ +│ config: { persistence: {...} } │ +└─────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────┐ +│ 2. Stack.initialize() │ +│ → resolve_impls(run_config, ...) │ +│ → instantiate_providers(sorted_by_deps) │ +└─────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────┐ +│ 3. For each provider (in dependency order): │ +│ │ +│ A. Job Scheduler (no dependencies) │ +│ ├─ get_provider_impl(config, deps={}) │ +│ ├─ scheduler = InlineSchedulerImpl(config) │ +│ └─ await scheduler.initialize() │ +│ │ +│ B. Vector IO (depends on inference, job_scheduler) │ +│ ├─ deps = { │ +│ │ Api.inference: , │ +│ │ Api.job_scheduler: │ +│ │ } │ +│ ├─ get_provider_impl(config, deps) │ +│ │ ├─ adapter = FaissVectorIOAdapter( │ +│ │ │ config, │ +│ │ │ deps[Api.inference], │ +│ │ │ deps.get(Api.files), │ +│ │ │ deps.get(Api.job_scheduler) ← Here! │ +│ │ │ ) │ +│ │ └─ await adapter.initialize() │ +│ └─ return adapter │ +└─────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────┐ +│ 4. Adapter.__init__(scheduler) │ +│ ├─ self.scheduler = scheduler │ +│ └─ scheduler.register_job_executor(...) │ +└─────────────────────────────────────────────────────────┘ +``` + +**Key Points:** +- **Dependency resolution**: Providers instantiated in topological order +- **Automatic injection**: Framework passes `deps[Api.job_scheduler]` +- **Registry declares it**: `optional_api_dependencies=[Api.job_scheduler]` +- **User configures it**: Run YAML specifies inline vs celery + +--- + +## Server Startup Flow + +## Two-Phase Initialization + +Separate scheduler initialization into two phases: +- **Phase 1 (`initialize`)**: Load jobs from storage, but don't resume them +- **Phase 2 (`start`)**: Resume jobs after all executors are registered + +``` +┌─────────────────────────────────────────────────────────┐ +│ Stack.initialize() │ +├─────────────────────────────────────────────────────────┤ +│ → resolve_impls() │ +│ ├─ Job Scheduler │ +│ │ └─ scheduler.initialize() ← Phase 1 │ +│ │ • Connect to KVStore │ +│ │ • Load jobs into memory │ +│ │ • DON'T resume jobs yet ❌ │ +│ │ │ +│ ├─ Vector IO Provider │ +│ │ ├─ __init__(): │ +│ │ │ └─ scheduler.register_job_executor(...) │ +│ │ └─ provider.initialize() │ +│ │ │ +│ └─ [All providers initialized, executors ✓] │ +│ │ +│ → Start schedulers │ +│ └─ scheduler.start() ← Phase 2 │ +│ └─ Resume incomplete jobs │ +│ • Executors are registered ✓ │ +│ • Jobs execute successfully ✓ │ +└─────────────────────────────────────────────────────────┘ +``` + +## Implementation + +### 1. Scheduler Protocol (`api.py`) + +```python +async def initialize(self) -> None: + """ + Initialize the scheduler (connect to backend, load persisted jobs). + + Note: Does NOT start processing jobs. Call start() after all + executors are registered. + """ + ... + + +async def start(self) -> None: + """ + Start processing jobs after all executors are registered. + + This resumes any incomplete jobs from storage. Must be called + after initialize() and after all job executors have been + registered via register_job_executor(). + """ + ... +``` + +### 2. Scheduler Implementation + +```python +class InlineSchedulerImpl(Scheduler): + async def initialize(self) -> None: + self._kvstore = await kvstore_impl(self.config.kvstore) + await self._load_jobs_from_storage() + # DON'T call _resume_incomplete_jobs() here! + + async def start(self) -> None: + """Call AFTER all executors are registered.""" + await self._resume_incomplete_jobs() +``` + +### 3. Stack Integration (`stack.py`) + +```python +async def initialize(self): + # ... existing code ... + impls = await resolve_impls(...) # All providers initialized + + # NEW: Start schedulers after all executors registered + if Api.job_scheduler in impls: + await impls[Api.job_scheduler].start() + + self.impls = impls +``` + +### 4. Provider Integration + +```python +class VectorIOAdapter: + def __init__(self, config, inference_api, files_api, scheduler): + # Register executor during __init__ (before scheduler.start()) + scheduler.register_job_executor( + "vector_store_file_batch", self._process_file_batch + ) +``` + +## Behavior + +### Case 1: Clean Start (No Jobs) +```python +await scheduler.start() +# _resume_incomplete_jobs() finds no jobs +# Loop completes, nothing happens ✓ +``` + +### Case 2: After Crash (Jobs Exist) +```python +await scheduler.start() +# _resume_incomplete_jobs() finds PENDING/RUNNING jobs +# Executors are registered ✓ +# Creates asyncio tasks successfully ✓ +# Jobs run in background ✓ +``` + + +## Usage: OpenAI Vector Store Mixin + +### Current Pattern (Direct asyncio tasks) +```python +class OpenAIVectorStoreMixin: + def __init__(self, files_api, kvstore): + self._file_batch_tasks: dict[str, asyncio.Task] = {} + + async def openai_create_vector_store_file_batch(self, params): + # Create background task directly + task = asyncio.create_task(self._process_file_batch_async(params)) + self._file_batch_tasks[batch_id] = task +``` + +### New Pattern (Job Scheduler) +```python +class OpenAIVectorStoreMixin: + def __init__(self, files_api, kvstore, scheduler): + self.scheduler = scheduler + # Register executor in __init__ (before scheduler.start()) + self.scheduler.register_job_executor( + "vector_store_file_batch", self._execute_file_batch + ) + + async def openai_create_vector_store_file_batch(self, params): + # Schedule job through scheduler + job_id = await self.scheduler.schedule_job( + job_type="vector_store_file_batch", + job_data={ + "batch_id": batch_id, + "vector_store_id": vector_store_id, + "file_ids": params.file_ids, + "attributes": params.attributes, + "chunking_strategy": chunking_strategy.model_dump(), + }, + metadata={"batch_id": batch_id}, + ) + return batch_object + + async def _execute_file_batch(self, job_data: dict) -> dict: + """Executor called by scheduler.""" + batch_id = job_data["batch_id"] + vector_store_id = job_data["vector_store_id"] + file_ids = job_data["file_ids"] + + # Process files (existing logic from _process_file_batch_async) + # await self._process_files_with_concurrency(...) + + return {"status": "completed", "files_processed": len(file_ids)} +``` + +### Benefits +- ✅ **Crash recovery**: Jobs survive server restarts +- ✅ **Persistence**: Job state stored in KVStore +- ✅ **Monitoring**: Query job status via `get_job_info(job_id)` +- ✅ **Cancellation**: Cancel jobs via `cancel_job(job_id)` +- ✅ **Clean separation**: Job scheduling decoupled from execution diff --git a/src/llama_stack/providers/utils/job_scheduler/api.py b/src/llama_stack/providers/utils/job_scheduler/api.py index e46da8fcb..023e77430 100644 --- a/src/llama_stack/providers/utils/job_scheduler/api.py +++ b/src/llama_stack/providers/utils/job_scheduler/api.py @@ -127,7 +127,20 @@ class Scheduler(Protocol): ... async def initialize(self) -> None: - """Initialize the scheduler (connect to backend, etc.)""" + """ + Initialize the scheduler (connect to backend, load persisted jobs). + + Note: Does NOT start processing jobs. Call start() after all executors are registered. + """ + ... + + async def start(self) -> None: + """ + Start processing jobs after all executors are registered. + + This resumes any incomplete jobs from storage. Must be called after initialize() + and after all job executors have been registered via register_job_executor(). + """ ... async def shutdown(self) -> None: diff --git a/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py b/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py index 0a5b6e220..6dcaa9e51 100644 --- a/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py +++ b/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py @@ -30,8 +30,6 @@ class CelerySchedulerImpl(Scheduler): def __init__(self, config: CelerySchedulerConfig): self.config = config self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} - # TODO: Initialize Celery app with broker and result backend - raise NotImplementedError("Celery scheduler not yet implemented") def register_job_executor( self, @@ -39,15 +37,19 @@ class CelerySchedulerImpl(Scheduler): executor: Callable[[dict], Awaitable[dict]], ) -> None: """Register a job executor function for a specific job type.""" - self._job_executors[job_type] = executor + raise NotImplementedError("Celery scheduler implementation is not yet available") async def initialize(self) -> None: """Initialize the Celery scheduler.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") + + async def start(self) -> None: + """Start processing jobs after all executors are registered.""" + raise NotImplementedError("Celery scheduler implementation is not yet available") async def shutdown(self) -> None: """Gracefully shutdown the Celery scheduler.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") async def schedule_job( self, @@ -56,19 +58,19 @@ class CelerySchedulerImpl(Scheduler): metadata: dict | None = None, ) -> str: """Schedule a new job for execution via Celery.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") async def get_job_info(self, job_id: str) -> dict: """Get complete information about a job from Celery result backend.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") async def cancel_job(self, job_id: str) -> bool: """Cancel a pending or running Celery job.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") async def delete_job(self, job_id: str) -> bool: """Delete a completed or cancelled job from Celery result backend.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") async def list_jobs( self, @@ -78,4 +80,4 @@ class CelerySchedulerImpl(Scheduler): offset: int = 0, ) -> list[dict]: """List jobs from Celery result backend with optional filtering.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") diff --git a/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py b/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py index 68d201604..3d8c5e744 100644 --- a/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py +++ b/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py @@ -4,23 +4,11 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -import asyncio -import json -import traceback -import uuid from collections.abc import Awaitable, Callable -from datetime import UTC, datetime -from typing import Any - -from llama_stack.core.utils.serialize import EnumEncoder -from llama_stack.providers.utils.kvstore import kvstore_impl -from llama_stack.providers.utils.kvstore.api import KVStore from ..api import JobStatus, Scheduler from ..config import InlineSchedulerConfig -JOB_PREFIX = "job_scheduler:jobs:" - class InlineSchedulerImpl(Scheduler): """ @@ -37,25 +25,14 @@ class InlineSchedulerImpl(Scheduler): def __init__(self, config: InlineSchedulerConfig): self.config = config - self._jobs: dict[str, dict[str, Any]] = {} - self._tasks: dict[str, asyncio.Task] = {} - self._semaphore: asyncio.Semaphore - self._shutdown_event = asyncio.Event() - self._kvstore: KVStore - self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} async def initialize(self) -> None: """Initialize the scheduler and load persisted jobs.""" - self._semaphore = asyncio.Semaphore(self.config.max_concurrent_jobs) + raise NotImplementedError("Inline scheduler implementation is not yet available") - # Initialize KVStore - self._kvstore = await kvstore_impl(self.config.kvstore) - - # Load persisted jobs from KVStore - await self._load_jobs_from_storage() - - # Resume incomplete jobs - await self._resume_incomplete_jobs() + async def start(self) -> None: + """Start processing jobs after all executors are registered.""" + raise NotImplementedError("Inline scheduler implementation is not yet available") def register_job_executor( self, @@ -63,64 +40,11 @@ class InlineSchedulerImpl(Scheduler): executor: Callable[[dict], Awaitable[dict]], ) -> None: """Register a job executor function for a specific job type.""" - self._job_executors[job_type] = executor - - async def _load_jobs_from_storage(self) -> None: - """Load all jobs from KVStore into memory.""" - start_key = JOB_PREFIX - end_key = f"{JOB_PREFIX}\xff" - - stored_values = await self._kvstore.values_in_range(start_key, end_key) - - for value in stored_values: - job = json.loads(value) - job["created_at"] = datetime.fromisoformat(job["created_at"]) - if job.get("started_at"): - job["started_at"] = datetime.fromisoformat(job["started_at"]) - if job.get("completed_at"): - job["completed_at"] = datetime.fromisoformat(job["completed_at"]) - job["status"] = JobStatus(job["status"]) - - self._jobs[job["job_id"]] = job - - async def _resume_incomplete_jobs(self) -> None: - """Resume jobs that were running when server crashed.""" - for job_id, job in self._jobs.items(): - if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: - # Reset running jobs to pending - if job["status"] == JobStatus.RUNNING: - job["status"] = JobStatus.PENDING - job["started_at"] = None - await self._save_job_to_storage(job) - - # Restart the job - task = asyncio.create_task(self._run_job(job_id)) - self._tasks[job_id] = task - - async def _save_job_to_storage(self, job: dict[str, Any]) -> None: - """Persist job to KVStore.""" - key = f"{JOB_PREFIX}{job['job_id']}" - await self._kvstore.set(key, json.dumps(job, cls=EnumEncoder)) - - async def _delete_job_from_storage(self, job_id: str) -> None: - """Delete job from KVStore.""" - key = f"{JOB_PREFIX}{job_id}" - await self._kvstore.delete(key) + raise NotImplementedError("Inline scheduler implementation is not yet available") async def shutdown(self) -> None: """Gracefully shutdown the scheduler.""" - self._shutdown_event.set() - - # Cancel all running tasks - for task in self._tasks.values(): - if not task.done(): - task.cancel() - - # Wait for all tasks to complete - if self._tasks: - await asyncio.gather(*self._tasks.values(), return_exceptions=True) - - self._tasks.clear() + raise NotImplementedError("Inline scheduler implementation is not yet available") async def schedule_job( self, @@ -129,143 +53,19 @@ class InlineSchedulerImpl(Scheduler): metadata: dict | None = None, ) -> str: """Schedule a new job for execution.""" - job_id = str(uuid.uuid4()) - - job_info = { - "job_id": job_id, - "job_type": job_type, - "status": JobStatus.PENDING, - "created_at": datetime.now(UTC), - "started_at": None, - "completed_at": None, - "progress": 0.0, - "metadata": metadata or {}, - "job_data": job_data, - "error": None, - "result": None, - } - - self._jobs[job_id] = job_info - - # Persist to KVStore - await self._save_job_to_storage(job_info) - - # Create and schedule the task - task = asyncio.create_task(self._run_job(job_id)) - self._tasks[job_id] = task - - return job_id - - async def _run_job(self, job_id: str) -> None: - """Run a job asynchronously.""" - job = self._jobs[job_id] - - try: - # Acquire semaphore to limit concurrent jobs - async with self._semaphore: - # Update status to RUNNING - job["status"] = JobStatus.RUNNING - job["started_at"] = datetime.now(UTC) - await self._save_job_to_storage(job) - - # Execute the job based on job_type - result = await self._execute_job(job) - - # Mark as completed - job["status"] = JobStatus.COMPLETED - job["completed_at"] = datetime.now(UTC) - job["progress"] = 1.0 - job["result"] = result - await self._save_job_to_storage(job) - - except asyncio.CancelledError: - # Job was cancelled - job["status"] = JobStatus.CANCELLED - job["completed_at"] = datetime.now(UTC) - await self._save_job_to_storage(job) - raise - - except Exception as e: - # Job failed - job["status"] = JobStatus.FAILED - job["completed_at"] = datetime.now(UTC) - job["error"] = str(e) - job["result"] = {"error_details": traceback.format_exc()} - await self._save_job_to_storage(job) - - finally: - # Clean up task reference - if job_id in self._tasks: - del self._tasks[job_id] - - async def _execute_job(self, job: dict) -> dict: - """ - Execute a job based on its type. - - If a custom executor is registered for the job type, it will be called. - Otherwise, raises an error for unknown job types. - """ - job_type = job["job_type"] - job_data = job["job_data"] - - if job_type in self._job_executors: - executor = self._job_executors[job_type] - return await executor(job_data) - - raise ValueError(f"No executor registered for job type: {job_type}") + raise NotImplementedError("Inline scheduler implementation is not yet available") async def get_job_info(self, job_id: str) -> dict: """Get complete information about a job.""" - if job_id not in self._jobs: - raise ValueError(f"Job {job_id} not found") - - job = self._jobs[job_id].copy() - - # Remove internal job_data field from response - job.pop("job_data", None) - - return job + raise NotImplementedError("Inline scheduler implementation is not yet available") async def cancel_job(self, job_id: str) -> bool: """Cancel a pending or running job.""" - if job_id not in self._jobs: - return False - - job = self._jobs[job_id] - - # Can only cancel pending or running jobs - if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: - return False - - # Cancel the task if it exists - if job_id in self._tasks: - task = self._tasks[job_id] - if not task.done(): - task.cancel() - - # Update job status - job["status"] = JobStatus.CANCELLED - job["completed_at"] = datetime.now(UTC) - await self._save_job_to_storage(job) - - return True + raise NotImplementedError("Inline scheduler implementation is not yet available") async def delete_job(self, job_id: str) -> bool: """Delete a completed or cancelled job.""" - if job_id not in self._jobs: - return False - - job = self._jobs[job_id] - - # Can only delete completed, failed, or cancelled jobs - if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: - return False - - # Remove from memory and storage - del self._jobs[job_id] - await self._delete_job_from_storage(job_id) - - return True + raise NotImplementedError("Inline scheduler implementation is not yet available") async def list_jobs( self, @@ -275,28 +75,4 @@ class InlineSchedulerImpl(Scheduler): offset: int = 0, ) -> list[dict]: """List jobs with optional filtering.""" - jobs = list(self._jobs.values()) - - # Filter by job_type - if job_type is not None: - jobs = [j for j in jobs if j["job_type"] == job_type] - - # Filter by status - if status is not None: - jobs = [j for j in jobs if j["status"] == status] - - # Sort by created_at (newest first) - jobs.sort(key=lambda j: j["created_at"], reverse=True) - - # Apply pagination - jobs = jobs[offset : offset + limit] - - # Convert to return format (remove internal fields) - result = [] - for job in jobs: - job_copy = job.copy() - # Remove internal job_data field - job_copy.pop("job_data", None) - result.append(job_copy) - - return result + raise NotImplementedError("Inline scheduler implementation is not yet available") diff --git a/tests/unit/providers/utils/test_job_scheduler.py b/tests/unit/providers/utils/test_job_scheduler.py index 3283d8514..a6833f8df 100644 --- a/tests/unit/providers/utils/test_job_scheduler.py +++ b/tests/unit/providers/utils/test_job_scheduler.py @@ -7,13 +7,13 @@ import asyncio import tempfile from pathlib import Path +from unittest.mock import AsyncMock, MagicMock import pytest from llama_stack.core.storage.datatypes import KVStoreReference, SqliteKVStoreConfig from llama_stack.providers.utils.job_scheduler import ( InlineSchedulerConfig, - JobStatus, scheduler_impl, ) from llama_stack.providers.utils.kvstore import register_kvstore_backends @@ -43,238 +43,72 @@ def scheduler_config(): ) -async def test_inline_scheduler_basic(scheduler_config): - """Test basic scheduler functionality.""" +async def test_scheduler_api_exists(scheduler_config): + """Test that scheduler API is properly defined.""" scheduler = await scheduler_impl(scheduler_config) - try: - # Register default executor + # Verify all required methods exist + assert hasattr(scheduler, "initialize") + assert hasattr(scheduler, "start") + assert hasattr(scheduler, "shutdown") + assert hasattr(scheduler, "register_job_executor") + assert hasattr(scheduler, "schedule_job") + assert hasattr(scheduler, "get_job_info") + assert hasattr(scheduler, "cancel_job") + assert hasattr(scheduler, "delete_job") + assert hasattr(scheduler, "list_jobs") + + +async def test_scheduler_not_implemented(scheduler_config): + """Test that scheduler methods raise NotImplementedError.""" + scheduler = await scheduler_impl(scheduler_config) + + # Test that all methods raise NotImplementedError + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.initialize() + + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.start() + + with pytest.raises(NotImplementedError, match="not yet available"): scheduler.register_job_executor("test_job", default_test_executor) - # Schedule a job - job_id = await scheduler.schedule_job( - job_type="test_job", - job_data={"test": "data"}, - metadata={"user": "test_user"}, - ) + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.schedule_job("test_job", {}) - assert job_id is not None - assert isinstance(job_id, str) + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.get_job_info("job_id") - # Wait a bit for the job to complete - await asyncio.sleep(0.2) + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.cancel_job("job_id") - # Get job info - job_info = await scheduler.get_job_info(job_id) - assert job_info["job_id"] == job_id - assert job_info["job_type"] == "test_job" - assert job_info["status"] == JobStatus.COMPLETED.value - assert job_info["metadata"]["user"] == "test_user" - assert job_info["progress"] == 1.0 - assert job_info["result"] is not None + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.delete_job("job_id") - finally: + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.list_jobs() + + with pytest.raises(NotImplementedError, match="not yet available"): await scheduler.shutdown() -async def test_inline_scheduler_list_jobs(scheduler_config): - """Test listing jobs with filters.""" +async def test_two_phase_initialization_pattern(scheduler_config): + """Test that the two-phase initialization pattern is supported.""" scheduler = await scheduler_impl(scheduler_config) - try: - # Register executors for different job types - scheduler.register_job_executor("batch_processing", default_test_executor) - scheduler.register_job_executor("log_aggregation", default_test_executor) + # Mock the methods to test the pattern + scheduler.initialize = AsyncMock() + scheduler.start = AsyncMock() + scheduler.register_job_executor = MagicMock() - # Schedule multiple jobs - await scheduler.schedule_job( - job_type="batch_processing", - job_data={"batch": 1}, - ) - await scheduler.schedule_job( - job_type="log_aggregation", - job_data={"logs": []}, - ) - await scheduler.schedule_job( - job_type="batch_processing", - job_data={"batch": 2}, - ) + # Phase 1: Initialize (loads jobs, doesn't start them) + await scheduler.initialize() + scheduler.initialize.assert_called_once() - # Wait for jobs to complete - await asyncio.sleep(0.3) + # Register executors after initialization + scheduler.register_job_executor("test_job", default_test_executor) + scheduler.register_job_executor.assert_called_once() - # List all jobs - all_jobs = await scheduler.list_jobs() - assert len(all_jobs) == 3 - - # List jobs by type - batch_jobs = await scheduler.list_jobs(job_type="batch_processing") - assert len(batch_jobs) == 2 - - log_jobs = await scheduler.list_jobs(job_type="log_aggregation") - assert len(log_jobs) == 1 - - # List jobs by status - completed_jobs = await scheduler.list_jobs(status=JobStatus.COMPLETED) - assert len(completed_jobs) == 3 - - finally: - await scheduler.shutdown() - - -async def test_inline_scheduler_cancel_job(scheduler_config): - """Test cancelling a job.""" - scheduler = await scheduler_impl(scheduler_config) - - try: - # Register executor - scheduler.register_job_executor("long_running_job", default_test_executor) - - # Schedule a job - job_id = await scheduler.schedule_job( - job_type="long_running_job", - job_data={"duration": 10}, - ) - - # Try to cancel immediately - await scheduler.cancel_job(job_id) - - # Wait a bit - await asyncio.sleep(0.1) - - # Check job status - job_info = await scheduler.get_job_info(job_id) - # Job might be CANCELLED or COMPLETED depending on timing - assert job_info["status"] in [JobStatus.CANCELLED.value, JobStatus.COMPLETED.value] - - finally: - await scheduler.shutdown() - - -async def test_inline_scheduler_delete_job(scheduler_config): - """Test deleting a completed job.""" - scheduler = await scheduler_impl(scheduler_config) - - try: - # Register executor - scheduler.register_job_executor("test_job", default_test_executor) - - # Schedule a job - job_id = await scheduler.schedule_job( - job_type="test_job", - job_data={"test": "data"}, - ) - - # Wait for completion - await asyncio.sleep(0.2) - - # Verify job exists - job_info = await scheduler.get_job_info(job_id) - assert job_info["status"] == JobStatus.COMPLETED.value - - # Delete the job - success = await scheduler.delete_job(job_id) - assert success is True - - # Verify job is deleted - with pytest.raises(ValueError, match="not found"): - await scheduler.get_job_info(job_id) - - # Deleting again should return False - success = await scheduler.delete_job(job_id) - assert success is False - - finally: - await scheduler.shutdown() - - -async def test_inline_scheduler_concurrent_jobs(scheduler_config): - """Test running multiple jobs concurrently.""" - scheduler = await scheduler_impl(scheduler_config) - - try: - # Register executor - scheduler.register_job_executor("test_job", default_test_executor) - - # Schedule multiple jobs - job_ids = [] - for i in range(5): - job_id = await scheduler.schedule_job( - job_type="test_job", - job_data={"index": i}, - ) - job_ids.append(job_id) - - # Wait for all jobs to complete - await asyncio.sleep(0.5) - - # Verify all jobs completed - for job_id in job_ids: - job_info = await scheduler.get_job_info(job_id) - assert job_info["status"] == JobStatus.COMPLETED.value - - finally: - await scheduler.shutdown() - - -async def test_inline_scheduler_pagination(scheduler_config): - """Test job listing with pagination.""" - scheduler = await scheduler_impl(scheduler_config) - - try: - # Register executor - scheduler.register_job_executor("test_job", default_test_executor) - - # Schedule 10 jobs - for i in range(10): - await scheduler.schedule_job( - job_type="test_job", - job_data={"index": i}, - ) - - # Wait for completion - await asyncio.sleep(0.5) - - # Test pagination - page1 = await scheduler.list_jobs(limit=5, offset=0) - assert len(page1) == 5 - - page2 = await scheduler.list_jobs(limit=5, offset=5) - assert len(page2) == 5 - - page3 = await scheduler.list_jobs(limit=5, offset=10) - assert len(page3) == 0 - - finally: - await scheduler.shutdown() - - -async def test_inline_scheduler_register_job_executor(scheduler_config): - """Test registering a job executor.""" - scheduler = await scheduler_impl(scheduler_config) - - try: - # Define a custom job executor - async def custom_executor(job_data: dict) -> dict: - return {"custom": "result", "input": job_data} - - # Register the executor - scheduler.register_job_executor("custom_job_type", custom_executor) - - # Schedule a job with the custom type - job_id = await scheduler.schedule_job( - job_type="custom_job_type", - job_data={"test": "data"}, - ) - - # Wait for job to complete - await asyncio.sleep(0.2) - - # Verify the custom executor was called - job_info = await scheduler.get_job_info(job_id) - assert job_info["status"] == JobStatus.COMPLETED.value - assert job_info["result"]["custom"] == "result" - assert job_info["result"]["input"]["test"] == "data" - - finally: - await scheduler.shutdown() + # Phase 2: Start (resumes jobs after executors registered) + await scheduler.start() + scheduler.start.assert_called_once()