diff --git a/src/llama_stack/providers/utils/job_scheduler/README.md b/src/llama_stack/providers/utils/job_scheduler/README.md new file mode 100644 index 000000000..7b80a2c47 --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/README.md @@ -0,0 +1,269 @@ +# Job Scheduler + +## Run Config → Adapter Init Flow + +``` +┌─────────────────────────────────────────────────────────┐ +│ 1. Run Config YAML │ +├─────────────────────────────────────────────────────────┤ +│ providers: │ +│ job_scheduler: │ +│ - provider_type: inline::scheduler │ +│ config: { kvstore: {...} } │ +│ vector_io: │ +│ - provider_type: inline::faiss │ +│ config: { persistence: {...} } │ +└─────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────┐ +│ 2. Stack.initialize() │ +│ → resolve_impls(run_config, ...) │ +│ → instantiate_providers(sorted_by_deps) │ +└─────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────┐ +│ 3. For each provider (in dependency order): │ +│ │ +│ A. Job Scheduler (no dependencies) │ +│ ├─ get_provider_impl(config, deps={}) │ +│ ├─ scheduler = InlineSchedulerImpl(config) │ +│ └─ await scheduler.initialize() │ +│ │ +│ B. Vector IO (depends on inference, job_scheduler) │ +│ ├─ deps = { │ +│ │ Api.inference: , │ +│ │ Api.job_scheduler: │ +│ │ } │ +│ ├─ get_provider_impl(config, deps) │ +│ │ ├─ adapter = FaissVectorIOAdapter( │ +│ │ │ config, │ +│ │ │ deps[Api.inference], │ +│ │ │ deps.get(Api.files), │ +│ │ │ deps.get(Api.job_scheduler) ← Here! │ +│ │ │ ) │ +│ │ └─ await adapter.initialize() │ +│ └─ return adapter │ +└─────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────┐ +│ 4. Adapter.__init__(scheduler) │ +│ ├─ self.scheduler = scheduler │ +│ └─ scheduler.register_job_executor(...) │ +└─────────────────────────────────────────────────────────┘ +``` + +**Key Points:** +- **Dependency resolution**: Providers instantiated in topological order +- **Automatic injection**: Framework passes `deps[Api.job_scheduler]` +- **Registry declares it**: `optional_api_dependencies=[Api.job_scheduler]` +- **User configures it**: Run YAML specifies inline vs celery + +--- + +## Server Startup Flow + +### Two-Phase Initialization + +Separate scheduler initialization into two phases: +- **Phase 1 (`initialize`)**: Load jobs from storage, but don't resume them +- **Phase 2 (`start`)**: Resume jobs after all executors are registered + +``` +┌─────────────────────────────────────────────────────────┐ +│ Stack.initialize() │ +├─────────────────────────────────────────────────────────┤ +│ → resolve_impls() │ +│ ├─ Job Scheduler │ +│ │ └─ scheduler.initialize() ← Phase 1 │ +│ │ • Connect to KVStore │ +│ │ • Load jobs into memory │ +│ │ • DON'T resume jobs yet ❌ │ +│ │ │ +│ ├─ Vector IO Provider │ +│ │ ├─ __init__(): │ +│ │ │ └─ scheduler.register_job_executor(...) │ +│ │ └─ provider.initialize() │ +│ │ │ +│ └─ [All providers initialized, executors ✓] │ +│ │ +│ → Start schedulers │ +│ └─ scheduler.start() ← Phase 2 │ +│ └─ Resume incomplete jobs │ +│ • Executors are registered ✓ │ +│ • Jobs execute successfully ✓ │ +└─────────────────────────────────────────────────────────┘ +``` + +## Implementation + +### 1. Scheduler Protocol (`api.py`) + +```python +async def initialize(self) -> None: + """ + Initialize the scheduler (connect to backend, load persisted jobs). + + Note: Does NOT start processing jobs. Call start() after all + executors are registered. + """ + ... + + +async def start(self) -> None: + """ + Start processing jobs after all executors are registered. + + This resumes any incomplete jobs from storage. Must be called + after initialize() and after all job executors have been + registered via register_job_executor(). + """ + ... +``` + +### 2. Scheduler Implementation + +```python +class InlineSchedulerImpl(Scheduler): + async def initialize(self) -> None: + self._kvstore = await kvstore_impl(self.config.kvstore) + await self._load_jobs_from_storage() + # DON'T call _resume_incomplete_jobs() here! + + async def start(self) -> None: + """Call AFTER all executors are registered.""" + await self._resume_incomplete_jobs() +``` + +### 3. Stack Integration (`stack.py`) + +```python +async def initialize(self): + # ... existing code ... + impls = await resolve_impls(...) # All providers initialized + + # NEW: Start schedulers after all executors registered + if Api.job_scheduler in impls: + await impls[Api.job_scheduler].start() + + self.impls = impls +``` + +### 4. Provider Integration + +```python +class VectorIOAdapter: + def __init__(self, config, inference_api, files_api, scheduler): + # Register executor during __init__ (before scheduler.start()) + scheduler.register_job_executor( + "vector_store_file_batch", self._process_file_batch + ) +``` + +### Behavior + +### Case 1: Clean Start (No Jobs) +```python +await scheduler.start() +# _resume_incomplete_jobs() finds no jobs +# Loop completes, nothing happens ✓ +``` + +### Case 2: After Crash (Jobs Exist) +```python +await scheduler.start() +# _resume_incomplete_jobs() finds PENDING/RUNNING jobs +# Executors are registered ✓ +# Creates asyncio tasks successfully ✓ +# Jobs run in background ✓ +``` + + +## Usage: OpenAI Vector Store Mixin + +### Current Pattern (Direct asyncio tasks) +```python +class OpenAIVectorStoreMixin: + def __init__(self, files_api, kvstore): + self._file_batch_tasks: dict[str, asyncio.Task] = {} + + async def openai_create_vector_store_file_batch(self, params): + # Create background task directly + task = asyncio.create_task(self._process_file_batch_async(params)) + self._file_batch_tasks[batch_id] = task +``` + +### New Pattern (Job Scheduler) +```python +class OpenAIVectorStoreMixin: + def __init__(self, files_api, kvstore, scheduler): + self.scheduler = scheduler + # Register executor in __init__ (before scheduler.start()) + self.scheduler.register_job_executor( + "vector_store_file_batch", self._execute_file_batch + ) + + async def openai_create_vector_store_file_batch(self, params): + # Schedule job through scheduler + job_id = await self.scheduler.schedule_job( + job_type="vector_store_file_batch", + job_data={ + "batch_id": batch_id, + "vector_store_id": vector_store_id, + "file_ids": params.file_ids, + "attributes": params.attributes, + "chunking_strategy": chunking_strategy.model_dump(), + }, + metadata={"batch_id": batch_id}, + ) + return batch_object + + async def _execute_file_batch(self, job_data: dict) -> dict: + """Executor called by scheduler.""" + batch_id = job_data["batch_id"] + vector_store_id = job_data["vector_store_id"] + file_ids = job_data["file_ids"] + + # Process files (existing logic from _process_file_batch_async) + # await self._process_files_with_concurrency(...) + + return {"status": "completed", "files_processed": len(file_ids)} +``` + +### Benefits +- ✅ **Crash recovery**: Jobs survive server restarts +- ✅ **Persistence**: Job state stored in KVStore +- ✅ **Monitoring**: Query job status via `get_job_info(job_id)` +- ✅ **Cancellation**: Cancel jobs via `cancel_job(job_id)` +- ✅ **Clean separation**: Job scheduling decoupled from execution + +## Single Worker vs Multi Worker + + ✅ Single Worker (Inline Scheduler - Not Implemented Yet) +``` + providers: + job_scheduler: + - provider_type: inline::scheduler + config: + kvstore: { ... } + max_concurrent_jobs: 10 +``` +Works because: +- Jobs run in the same process via asyncio.create_task() +- In-memory _jobs dict is shared within the process +- Crash recovery works (jobs persist to KVStore) + + --- + ✅ Multi Worker (Celery Scheduler - Not Implemented Yet) +``` + providers: + job_scheduler: + - provider_type: celery::scheduler + config: + broker_url: redis://localhost:6379/0 + result_backend: redis://localhost:6379/1 +``` + Works because: + - Shared message broker (Redis/RabbitMQ) + - Celery handles distributed task queue + - Workers coordinate via broker + - Any worker can execute any job diff --git a/src/llama_stack/providers/utils/job_scheduler/__init__.py b/src/llama_stack/providers/utils/job_scheduler/__init__.py new file mode 100644 index 000000000..f86019f48 --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .api import JobStatus, Scheduler +from .config import CelerySchedulerConfig, InlineSchedulerConfig, SchedulerConfig + +__all__ = [ + "JobStatus", + "Scheduler", + "SchedulerConfig", + "InlineSchedulerConfig", + "CelerySchedulerConfig", +] diff --git a/src/llama_stack/providers/utils/job_scheduler/api.py b/src/llama_stack/providers/utils/job_scheduler/api.py new file mode 100644 index 000000000..5b3e289e1 --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/api.py @@ -0,0 +1,151 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from collections.abc import Awaitable, Callable +from enum import StrEnum +from typing import Protocol + + +class JobStatus(StrEnum): + VALIDATING = "validating" + FAILED = "failed" + IN_PROGRESS = "in_progress" + FINALIZING = "finalizing" + COMPLETED = "completed" + EXPIRED = "expired" + CANCELLING = "cancelling" + CANCELLED = "cancelled" + + +class Scheduler(Protocol): + """ + Abstract scheduler protocol for managing async jobs. + + This provides a pluggable backend for job scheduling and execution. + """ + + async def schedule_job( + self, + job_type: str, + job_data: dict, + metadata: dict | None = None, + ) -> str: + """ + Schedule a new job for execution. + + Args: + job_type: Type of job (e.g., "batch_processing", "log_aggregation", "metrics_collection") + job_data: Job-specific data and parameters + metadata: Additional metadata for tracking + + Returns: + job_id: UUID for tracking the job + """ + ... + + async def get_job_info(self, job_id: str) -> dict: + """ + Get complete information about a job. + + Returns: + { + "job_id": str, + "job_type": str, + "status": JobStatus, + "created_at": datetime, + "started_at": datetime | None, + "completed_at": datetime | None, + "progress": float, # 0.0 to 1.0 + "metadata": dict, + "error": str | None, # Error message if status == FAILED + "result": dict | None, # Job result if status == COMPLETED + } + """ + ... + + async def cancel_job(self, job_id: str) -> bool: + """ + Cancel a pending or running job. + + Args: + job_id: Job to cancel + + Returns: + True if job was cancelled, False if not found or already completed + """ + ... + + async def delete_job(self, job_id: str) -> bool: + """ + Delete a completed or cancelled job. + + Args: + job_id: Job to delete + + Returns: + True if job was deleted, False if not found + """ + ... + + async def list_jobs( + self, + job_type: str | None = None, + status: JobStatus | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + """ + List jobs with optional filtering. + + Args: + job_type: Filter by job type (e.g., "batch_processing", "log_aggregation", "metrics_collection") + status: Filter by status + limit: Maximum number of jobs to return + offset: Offset for pagination + + Returns: + List of job info dictionaries + """ + ... + + def register_job_executor( + self, + job_type: str, + executor: Callable[[dict], Awaitable[dict]], + ) -> None: + """ + Register a job executor function for a specific job type. + + This allows components to register custom job execution logic with the scheduler. + When a job of the registered type is executed, the scheduler will call the + registered executor function. + + Args: + job_type: The type of job (e.g., "vector_store_file_batch") + executor: Async function that takes job_data dict and returns result dict + """ + ... + + async def initialize(self) -> None: + """ + Initialize the scheduler (connect to backend, load persisted jobs). + + Note: Does NOT start processing jobs. Call start() after all executors are registered. + """ + ... + + async def start(self) -> None: + """ + Start processing jobs after all executors are registered. + + This resumes any incomplete jobs from storage. Must be called after initialize() + and after all job executors have been registered via register_job_executor(). + """ + ... + + async def shutdown(self) -> None: + """Gracefully shutdown the scheduler""" + ... diff --git a/src/llama_stack/providers/utils/job_scheduler/celery/__init__.py b/src/llama_stack/providers/utils/job_scheduler/celery/__init__.py new file mode 100644 index 000000000..27d9d01cb --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/celery/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .scheduler import CelerySchedulerImpl + +__all__ = ["CelerySchedulerImpl"] diff --git a/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py b/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py new file mode 100644 index 000000000..6dcaa9e51 --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py @@ -0,0 +1,83 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from collections.abc import Awaitable, Callable + +from ..api import JobStatus, Scheduler +from ..config import CelerySchedulerConfig + + +class CelerySchedulerImpl(Scheduler): + """ + Celery-based scheduler implementation for distributed multi-worker deployments. + + This scheduler uses Celery to distribute jobs across multiple worker processes + or machines. It provides: + - Persistent job queue (via Redis/RabbitMQ broker) + - Multi-worker coordination + - Crash recovery (jobs survive server restarts) + - Distributed task execution + + This is suitable for: + - Production deployments + - Multi-worker scenarios + - High availability requirements + """ + + def __init__(self, config: CelerySchedulerConfig): + self.config = config + self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} + + def register_job_executor( + self, + job_type: str, + executor: Callable[[dict], Awaitable[dict]], + ) -> None: + """Register a job executor function for a specific job type.""" + raise NotImplementedError("Celery scheduler implementation is not yet available") + + async def initialize(self) -> None: + """Initialize the Celery scheduler.""" + raise NotImplementedError("Celery scheduler implementation is not yet available") + + async def start(self) -> None: + """Start processing jobs after all executors are registered.""" + raise NotImplementedError("Celery scheduler implementation is not yet available") + + async def shutdown(self) -> None: + """Gracefully shutdown the Celery scheduler.""" + raise NotImplementedError("Celery scheduler implementation is not yet available") + + async def schedule_job( + self, + job_type: str, + job_data: dict, + metadata: dict | None = None, + ) -> str: + """Schedule a new job for execution via Celery.""" + raise NotImplementedError("Celery scheduler implementation is not yet available") + + async def get_job_info(self, job_id: str) -> dict: + """Get complete information about a job from Celery result backend.""" + raise NotImplementedError("Celery scheduler implementation is not yet available") + + async def cancel_job(self, job_id: str) -> bool: + """Cancel a pending or running Celery job.""" + raise NotImplementedError("Celery scheduler implementation is not yet available") + + async def delete_job(self, job_id: str) -> bool: + """Delete a completed or cancelled job from Celery result backend.""" + raise NotImplementedError("Celery scheduler implementation is not yet available") + + async def list_jobs( + self, + job_type: str | None = None, + status: JobStatus | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + """List jobs from Celery result backend with optional filtering.""" + raise NotImplementedError("Celery scheduler implementation is not yet available") diff --git a/src/llama_stack/providers/utils/job_scheduler/config.py b/src/llama_stack/providers/utils/job_scheduler/config.py new file mode 100644 index 000000000..d8eba89fb --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/config.py @@ -0,0 +1,58 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from typing import Annotated, Literal + +from pydantic import BaseModel, Field + +from llama_stack.core.storage.datatypes import KVStoreReference + + +class SchedulerConfig(BaseModel): + """Base class for scheduler configurations.""" + + type: str + + +class InlineSchedulerConfig(SchedulerConfig): + """ + Configuration for inline (asyncio-based) scheduler. + + This scheduler runs jobs in the same process using asyncio tasks. + Suitable for development and single-worker deployments. + """ + + type: Literal["inline"] = "inline" + kvstore: KVStoreReference = Field( + description="KVStore reference for persisting job state", + ) + max_concurrent_jobs: int = Field( + default=10, + description="Maximum number of jobs that can run concurrently", + ) + + +class CelerySchedulerConfig(SchedulerConfig): + """ + Configuration for Celery-based distributed scheduler. + + This scheduler distributes jobs across multiple worker processes/machines. + Suitable for production and multi-worker deployments. + """ + + type: Literal["celery"] = "celery" + broker_url: str = Field( + description="Celery broker URL (e.g., 'redis://localhost:6379/0')", + ) + result_backend: str = Field( + description="Celery result backend URL (e.g., 'redis://localhost:6379/1')", + ) + + +SchedulerConfigUnion = Annotated[ + InlineSchedulerConfig | CelerySchedulerConfig, + Field(discriminator="type"), +] diff --git a/src/llama_stack/providers/utils/job_scheduler/inline/__init__.py b/src/llama_stack/providers/utils/job_scheduler/inline/__init__.py new file mode 100644 index 000000000..5be56af5e --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/inline/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .scheduler import InlineSchedulerImpl + +__all__ = ["InlineSchedulerImpl"] diff --git a/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py b/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py new file mode 100644 index 000000000..3d8c5e744 --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py @@ -0,0 +1,78 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from collections.abc import Awaitable, Callable + +from ..api import JobStatus, Scheduler +from ..config import InlineSchedulerConfig + + +class InlineSchedulerImpl(Scheduler): + """ + Inline scheduler implementation using asyncio for single-worker deployments. + + This scheduler runs jobs in the same process using asyncio tasks. Jobs and their + state are persisted to KVStore for crash recovery. + + This is suitable for: + - Development and testing + - Single-worker deployments + - Scenarios where external dependencies (Redis, RabbitMQ) are not available + """ + + def __init__(self, config: InlineSchedulerConfig): + self.config = config + + async def initialize(self) -> None: + """Initialize the scheduler and load persisted jobs.""" + raise NotImplementedError("Inline scheduler implementation is not yet available") + + async def start(self) -> None: + """Start processing jobs after all executors are registered.""" + raise NotImplementedError("Inline scheduler implementation is not yet available") + + def register_job_executor( + self, + job_type: str, + executor: Callable[[dict], Awaitable[dict]], + ) -> None: + """Register a job executor function for a specific job type.""" + raise NotImplementedError("Inline scheduler implementation is not yet available") + + async def shutdown(self) -> None: + """Gracefully shutdown the scheduler.""" + raise NotImplementedError("Inline scheduler implementation is not yet available") + + async def schedule_job( + self, + job_type: str, + job_data: dict, + metadata: dict | None = None, + ) -> str: + """Schedule a new job for execution.""" + raise NotImplementedError("Inline scheduler implementation is not yet available") + + async def get_job_info(self, job_id: str) -> dict: + """Get complete information about a job.""" + raise NotImplementedError("Inline scheduler implementation is not yet available") + + async def cancel_job(self, job_id: str) -> bool: + """Cancel a pending or running job.""" + raise NotImplementedError("Inline scheduler implementation is not yet available") + + async def delete_job(self, job_id: str) -> bool: + """Delete a completed or cancelled job.""" + raise NotImplementedError("Inline scheduler implementation is not yet available") + + async def list_jobs( + self, + job_type: str | None = None, + status: JobStatus | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + """List jobs with optional filtering.""" + raise NotImplementedError("Inline scheduler implementation is not yet available") diff --git a/tests/unit/providers/utils/test_job_scheduler.py b/tests/unit/providers/utils/test_job_scheduler.py new file mode 100644 index 000000000..9b63de5d6 --- /dev/null +++ b/tests/unit/providers/utils/test_job_scheduler.py @@ -0,0 +1,117 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import asyncio +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from llama_stack.core.storage.datatypes import KVStoreReference, SqliteKVStoreConfig +from llama_stack.providers.utils.job_scheduler import InlineSchedulerConfig +from llama_stack.providers.utils.kvstore import register_kvstore_backends + + +async def default_test_executor(job_data: dict) -> dict: + """Default test executor that simulates work.""" + await asyncio.sleep(0.1) + return { + "message": "Test job completed successfully", + "job_data": job_data, + } + + +@pytest.fixture +def scheduler_config(): + """Create a test scheduler config with temporary SQLite database.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test_scheduler.db" + backend_name = "kv_scheduler_test" + kvstore_config = SqliteKVStoreConfig(db_path=str(db_path)) + register_kvstore_backends({backend_name: kvstore_config}) + + yield InlineSchedulerConfig( + kvstore=KVStoreReference(backend=backend_name, namespace="job_scheduler"), + max_concurrent_jobs=5, + ) + + +async def test_scheduler_api_exists(scheduler_config): + """Test that scheduler API is properly defined.""" + from llama_stack.providers.utils.job_scheduler.inline import InlineSchedulerImpl + + scheduler = InlineSchedulerImpl(scheduler_config) + + # Verify all required methods exist + assert hasattr(scheduler, "initialize") + assert hasattr(scheduler, "start") + assert hasattr(scheduler, "shutdown") + assert hasattr(scheduler, "register_job_executor") + assert hasattr(scheduler, "schedule_job") + assert hasattr(scheduler, "get_job_info") + assert hasattr(scheduler, "cancel_job") + assert hasattr(scheduler, "delete_job") + assert hasattr(scheduler, "list_jobs") + + +async def test_scheduler_not_implemented(scheduler_config): + """Test that scheduler methods raise NotImplementedError.""" + from llama_stack.providers.utils.job_scheduler.inline import InlineSchedulerImpl + + scheduler = InlineSchedulerImpl(scheduler_config) + + # Test that all methods raise NotImplementedError + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.initialize() + + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.start() + + with pytest.raises(NotImplementedError, match="not yet available"): + scheduler.register_job_executor("test_job", default_test_executor) + + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.schedule_job("test_job", {}) + + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.get_job_info("job_id") + + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.cancel_job("job_id") + + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.delete_job("job_id") + + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.list_jobs() + + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.shutdown() + + +async def test_two_phase_initialization_pattern(scheduler_config): + """Test that the two-phase initialization pattern is supported.""" + from llama_stack.providers.utils.job_scheduler.inline import InlineSchedulerImpl + + scheduler = InlineSchedulerImpl(scheduler_config) + + # Mock the methods to test the pattern + scheduler.initialize = AsyncMock() + scheduler.start = AsyncMock() + scheduler.register_job_executor = MagicMock() + + # Phase 1: Initialize (loads jobs, doesn't start them) + await scheduler.initialize() + scheduler.initialize.assert_called_once() + + # Register executors after initialization + scheduler.register_job_executor("test_job", default_test_executor) + scheduler.register_job_executor.assert_called_once() + + # Phase 2: Start (resumes jobs after executors registered) + await scheduler.start() + scheduler.start.assert_called_once()