From 16c0321482dc0fc1256aee7403f6df374ab9c70f Mon Sep 17 00:00:00 2001 From: Swapna Lekkala Date: Wed, 29 Oct 2025 11:12:42 -0700 Subject: [PATCH 01/10] feat: Add Async Job Scheduler --- .../providers/utils/job_scheduler/__init__.py | 47 +++ .../providers/utils/job_scheduler/api.py | 135 ++++++++ .../utils/job_scheduler/celery/__init__.py | 9 + .../utils/job_scheduler/celery/scheduler.py | 81 +++++ .../providers/utils/job_scheduler/config.py | 59 ++++ .../utils/job_scheduler/inline/__init__.py | 9 + .../utils/job_scheduler/inline/scheduler.py | 308 ++++++++++++++++++ .../providers/utils/test_job_scheduler.py | 280 ++++++++++++++++ 8 files changed, 928 insertions(+) create mode 100644 llama_stack/providers/utils/job_scheduler/__init__.py create mode 100644 llama_stack/providers/utils/job_scheduler/api.py create mode 100644 llama_stack/providers/utils/job_scheduler/celery/__init__.py create mode 100644 llama_stack/providers/utils/job_scheduler/celery/scheduler.py create mode 100644 llama_stack/providers/utils/job_scheduler/config.py create mode 100644 llama_stack/providers/utils/job_scheduler/inline/__init__.py create mode 100644 llama_stack/providers/utils/job_scheduler/inline/scheduler.py create mode 100644 tests/unit/providers/utils/test_job_scheduler.py diff --git a/llama_stack/providers/utils/job_scheduler/__init__.py b/llama_stack/providers/utils/job_scheduler/__init__.py new file mode 100644 index 000000000..e09d9f45d --- /dev/null +++ b/llama_stack/providers/utils/job_scheduler/__init__.py @@ -0,0 +1,47 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .api import JobStatus, Scheduler +from .config import CelerySchedulerConfig, InlineSchedulerConfig, SchedulerConfig + + +async def scheduler_impl(config: SchedulerConfig) -> Scheduler: + """ + Factory function to instantiate scheduler implementations. + + Args: + config: Scheduler configuration (InlineSchedulerConfig or CelerySchedulerConfig) + + Returns: + Scheduler: An initialized scheduler instance + + Raises: + ValueError: If the config type is unknown + """ + impl: Scheduler + if isinstance(config, InlineSchedulerConfig): + from .inline import InlineSchedulerImpl + + impl = InlineSchedulerImpl(config) + elif isinstance(config, CelerySchedulerConfig): + from .celery import CelerySchedulerImpl + + impl = CelerySchedulerImpl(config) + else: + raise ValueError(f"Unknown scheduler config type: {type(config)}") + + await impl.initialize() + return impl + + +__all__ = [ + "JobStatus", + "Scheduler", + "SchedulerConfig", + "InlineSchedulerConfig", + "CelerySchedulerConfig", + "scheduler_impl", +] diff --git a/llama_stack/providers/utils/job_scheduler/api.py b/llama_stack/providers/utils/job_scheduler/api.py new file mode 100644 index 000000000..e46da8fcb --- /dev/null +++ b/llama_stack/providers/utils/job_scheduler/api.py @@ -0,0 +1,135 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from collections.abc import Awaitable, Callable +from enum import StrEnum +from typing import Protocol + + +class JobStatus(StrEnum): + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +class Scheduler(Protocol): + """ + Abstract scheduler protocol for managing async jobs. + + This provides a pluggable backend for job scheduling and execution. + """ + + async def schedule_job( + self, + job_type: str, + job_data: dict, + metadata: dict | None = None, + ) -> str: + """ + Schedule a new job for execution. + + Args: + job_type: Type of job (e.g., "batch_processing", "log_aggregation", "metrics_collection") + job_data: Job-specific data and parameters + metadata: Additional metadata for tracking + + Returns: + job_id: UUID for tracking the job + """ + ... + + async def get_job_info(self, job_id: str) -> dict: + """ + Get complete information about a job. + + Returns: + { + "job_id": str, + "job_type": str, + "status": JobStatus, + "created_at": datetime, + "started_at": datetime | None, + "completed_at": datetime | None, + "progress": float, # 0.0 to 1.0 + "metadata": dict, + "error": str | None, # Error message if status == FAILED + "result": dict | None, # Job result if status == COMPLETED + } + """ + ... + + async def cancel_job(self, job_id: str) -> bool: + """ + Cancel a pending or running job. + + Args: + job_id: Job to cancel + + Returns: + True if job was cancelled, False if not found or already completed + """ + ... + + async def delete_job(self, job_id: str) -> bool: + """ + Delete a completed or cancelled job. + + Args: + job_id: Job to delete + + Returns: + True if job was deleted, False if not found + """ + ... + + async def list_jobs( + self, + job_type: str | None = None, + status: JobStatus | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + """ + List jobs with optional filtering. + + Args: + job_type: Filter by job type (e.g., "batch_processing", "log_aggregation", "metrics_collection") + status: Filter by status + limit: Maximum number of jobs to return + offset: Offset for pagination + + Returns: + List of job info dictionaries + """ + ... + + def register_job_executor( + self, + job_type: str, + executor: Callable[[dict], Awaitable[dict]], + ) -> None: + """ + Register a job executor function for a specific job type. + + This allows components to register custom job execution logic with the scheduler. + When a job of the registered type is executed, the scheduler will call the + registered executor function. + + Args: + job_type: The type of job (e.g., "vector_store_file_batch") + executor: Async function that takes job_data dict and returns result dict + """ + ... + + async def initialize(self) -> None: + """Initialize the scheduler (connect to backend, etc.)""" + ... + + async def shutdown(self) -> None: + """Gracefully shutdown the scheduler""" + ... diff --git a/llama_stack/providers/utils/job_scheduler/celery/__init__.py b/llama_stack/providers/utils/job_scheduler/celery/__init__.py new file mode 100644 index 000000000..27d9d01cb --- /dev/null +++ b/llama_stack/providers/utils/job_scheduler/celery/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .scheduler import CelerySchedulerImpl + +__all__ = ["CelerySchedulerImpl"] diff --git a/llama_stack/providers/utils/job_scheduler/celery/scheduler.py b/llama_stack/providers/utils/job_scheduler/celery/scheduler.py new file mode 100644 index 000000000..0a5b6e220 --- /dev/null +++ b/llama_stack/providers/utils/job_scheduler/celery/scheduler.py @@ -0,0 +1,81 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from collections.abc import Awaitable, Callable + +from ..api import JobStatus, Scheduler +from ..config import CelerySchedulerConfig + + +class CelerySchedulerImpl(Scheduler): + """ + Celery-based scheduler implementation for distributed multi-worker deployments. + + This scheduler uses Celery to distribute jobs across multiple worker processes + or machines. It provides: + - Persistent job queue (via Redis/RabbitMQ broker) + - Multi-worker coordination + - Crash recovery (jobs survive server restarts) + - Distributed task execution + + This is suitable for: + - Production deployments + - Multi-worker scenarios + - High availability requirements + """ + + def __init__(self, config: CelerySchedulerConfig): + self.config = config + self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} + # TODO: Initialize Celery app with broker and result backend + raise NotImplementedError("Celery scheduler not yet implemented") + + def register_job_executor( + self, + job_type: str, + executor: Callable[[dict], Awaitable[dict]], + ) -> None: + """Register a job executor function for a specific job type.""" + self._job_executors[job_type] = executor + + async def initialize(self) -> None: + """Initialize the Celery scheduler.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def shutdown(self) -> None: + """Gracefully shutdown the Celery scheduler.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def schedule_job( + self, + job_type: str, + job_data: dict, + metadata: dict | None = None, + ) -> str: + """Schedule a new job for execution via Celery.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def get_job_info(self, job_id: str) -> dict: + """Get complete information about a job from Celery result backend.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def cancel_job(self, job_id: str) -> bool: + """Cancel a pending or running Celery job.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def delete_job(self, job_id: str) -> bool: + """Delete a completed or cancelled job from Celery result backend.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def list_jobs( + self, + job_type: str | None = None, + status: JobStatus | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + """List jobs from Celery result backend with optional filtering.""" + raise NotImplementedError("Celery scheduler not yet implemented") diff --git a/llama_stack/providers/utils/job_scheduler/config.py b/llama_stack/providers/utils/job_scheduler/config.py new file mode 100644 index 000000000..325387cfd --- /dev/null +++ b/llama_stack/providers/utils/job_scheduler/config.py @@ -0,0 +1,59 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from typing import Annotated, Literal + +from pydantic import BaseModel, Field + +from llama_stack.core.storage.datatypes import KVStoreReference + + +class SchedulerConfig(BaseModel): + """Base class for scheduler configurations.""" + + type: str + + +class InlineSchedulerConfig(SchedulerConfig): + """ + Configuration for inline (asyncio-based) scheduler. + + This scheduler runs jobs in the same process using asyncio tasks. + Suitable for development and single-worker deployments. + """ + + type: Literal["inline"] = "inline" + kvstore: KVStoreReference = Field( + description="KVStore reference for persisting job state", + ) + max_concurrent_jobs: int = Field( + default=10, + description="Maximum number of jobs that can run concurrently", + ) + + +class CelerySchedulerConfig(SchedulerConfig): + """ + Configuration for Celery-based distributed scheduler. + + This scheduler distributes jobs across multiple worker processes/machines. + Suitable for production and multi-worker deployments. + """ + + type: Literal["celery"] = "celery" + broker_url: str = Field( + description="Celery broker URL (e.g., 'redis://localhost:6379/0')", + ) + result_backend: str = Field( + description="Celery result backend URL (e.g., 'redis://localhost:6379/1')", + ) + + +# Union type for all scheduler configs with discriminator +SchedulerConfigUnion = Annotated[ + InlineSchedulerConfig | CelerySchedulerConfig, + Field(discriminator="type"), +] diff --git a/llama_stack/providers/utils/job_scheduler/inline/__init__.py b/llama_stack/providers/utils/job_scheduler/inline/__init__.py new file mode 100644 index 000000000..5be56af5e --- /dev/null +++ b/llama_stack/providers/utils/job_scheduler/inline/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .scheduler import InlineSchedulerImpl + +__all__ = ["InlineSchedulerImpl"] diff --git a/llama_stack/providers/utils/job_scheduler/inline/scheduler.py b/llama_stack/providers/utils/job_scheduler/inline/scheduler.py new file mode 100644 index 000000000..b53621128 --- /dev/null +++ b/llama_stack/providers/utils/job_scheduler/inline/scheduler.py @@ -0,0 +1,308 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import asyncio +import json +import traceback +import uuid +from collections.abc import Awaitable, Callable +from datetime import UTC, datetime +from typing import Any + +from llama_stack.core.utils.serialize import EnumEncoder +from llama_stack.providers.utils.kvstore import kvstore_impl +from llama_stack.providers.utils.kvstore.api import KVStore + +from ..api import JobStatus, Scheduler +from ..config import InlineSchedulerConfig + +JOB_PREFIX = "job_scheduler:jobs:" + + +class InlineSchedulerImpl(Scheduler): + """ + Inline scheduler implementation using asyncio for single-worker deployments. + + This scheduler runs jobs in the same process using asyncio tasks. Jobs and their + state are persisted to KVStore for crash recovery. + + This is suitable for: + - Development and testing + - Single-worker deployments + - Scenarios where external dependencies (Redis, RabbitMQ) are not available + """ + + def __init__(self, config: InlineSchedulerConfig): + self.config = config + self._jobs: dict[str, dict[str, Any]] = {} + self._tasks: dict[str, asyncio.Task] = {} + self._semaphore: asyncio.Semaphore + self._shutdown_event = asyncio.Event() + self._kvstore: KVStore + self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} + + async def initialize(self) -> None: + """Initialize the scheduler and load persisted jobs.""" + self._semaphore = asyncio.Semaphore(self.config.max_concurrent_jobs) + + # Initialize KVStore + self._kvstore = await kvstore_impl(self.config.kvstore) + + # Load persisted jobs from KVStore + await self._load_jobs_from_storage() + + # Resume incomplete jobs + await self._resume_incomplete_jobs() + + def register_job_executor( + self, + job_type: str, + executor: Callable[[dict], Awaitable[dict]], + ) -> None: + """Register a job executor function for a specific job type.""" + self._job_executors[job_type] = executor + + async def _load_jobs_from_storage(self) -> None: + """Load all jobs from KVStore into memory.""" + start_key = JOB_PREFIX + end_key = f"{JOB_PREFIX}\xff" + + stored_values = await self._kvstore.values_in_range(start_key, end_key) + + for value in stored_values: + job = json.loads(value) + # Deserialize datetime strings back to datetime objects + job["created_at"] = datetime.fromisoformat(job["created_at"]) + if job.get("started_at"): + job["started_at"] = datetime.fromisoformat(job["started_at"]) + if job.get("completed_at"): + job["completed_at"] = datetime.fromisoformat(job["completed_at"]) + # Deserialize JobStatus enum + job["status"] = JobStatus(job["status"]) + + self._jobs[job["job_id"]] = job + + async def _resume_incomplete_jobs(self) -> None: + """Resume jobs that were running when server crashed.""" + for job_id, job in self._jobs.items(): + if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: + # Reset running jobs to pending + if job["status"] == JobStatus.RUNNING: + job["status"] = JobStatus.PENDING + job["started_at"] = None + await self._save_job_to_storage(job) + + # Restart the job + task = asyncio.create_task(self._run_job(job_id)) + self._tasks[job_id] = task + + async def _save_job_to_storage(self, job: dict[str, Any]) -> None: + """Persist job to KVStore.""" + key = f"{JOB_PREFIX}{job['job_id']}" + await self._kvstore.set(key, json.dumps(job, cls=EnumEncoder)) + + async def _delete_job_from_storage(self, job_id: str) -> None: + """Delete job from KVStore.""" + key = f"{JOB_PREFIX}{job_id}" + await self._kvstore.delete(key) + + async def shutdown(self) -> None: + """Gracefully shutdown the scheduler.""" + self._shutdown_event.set() + + # Cancel all running tasks + for task in self._tasks.values(): + if not task.done(): + task.cancel() + + # Wait for all tasks to complete + if self._tasks: + await asyncio.gather(*self._tasks.values(), return_exceptions=True) + + self._tasks.clear() + + async def schedule_job( + self, + job_type: str, + job_data: dict, + metadata: dict | None = None, + ) -> str: + """Schedule a new job for execution.""" + job_id = str(uuid.uuid4()) + + job_info = { + "job_id": job_id, + "job_type": job_type, + "status": JobStatus.PENDING, + "created_at": datetime.now(UTC), + "started_at": None, + "completed_at": None, + "progress": 0.0, + "metadata": metadata or {}, + "job_data": job_data, + "error": None, + "result": None, + } + + self._jobs[job_id] = job_info + + # Persist to KVStore + await self._save_job_to_storage(job_info) + + # Create and schedule the task + task = asyncio.create_task(self._run_job(job_id)) + self._tasks[job_id] = task + + return job_id + + async def _run_job(self, job_id: str) -> None: + """Run a job asynchronously.""" + job = self._jobs[job_id] + + try: + # Acquire semaphore to limit concurrent jobs + async with self._semaphore: + # Update status to RUNNING + job["status"] = JobStatus.RUNNING + job["started_at"] = datetime.now(UTC) + await self._save_job_to_storage(job) + + # Execute the job based on job_type + # This is where job-specific logic would be called + # For now, we'll simulate a simple job execution + result = await self._execute_job(job) + + # Mark as completed + job["status"] = JobStatus.COMPLETED + job["completed_at"] = datetime.now(UTC) + job["progress"] = 1.0 + job["result"] = result + await self._save_job_to_storage(job) + + except asyncio.CancelledError: + # Job was cancelled + job["status"] = JobStatus.CANCELLED + job["completed_at"] = datetime.now(UTC) + await self._save_job_to_storage(job) + raise + + except Exception as e: + # Job failed + job["status"] = JobStatus.FAILED + job["completed_at"] = datetime.now(UTC) + job["error"] = str(e) + job["result"] = {"error_details": traceback.format_exc()} + await self._save_job_to_storage(job) + + finally: + # Clean up task reference + if job_id in self._tasks: + del self._tasks[job_id] + + async def _execute_job(self, job: dict) -> dict: + """ + Execute a job based on its type. + + If a custom executor is registered for the job type, it will be called. + Otherwise, raises an error for unknown job types. + """ + job_type = job["job_type"] + job_data = job["job_data"] + + # Check if a custom executor is registered for this job type + if job_type in self._job_executors: + executor = self._job_executors[job_type] + return await executor(job_data) # Call the registered executor + + # No executor registered for this job type + raise ValueError(f"No executor registered for job type: {job_type}") + + async def get_job_info(self, job_id: str) -> dict: + """Get complete information about a job.""" + if job_id not in self._jobs: + raise ValueError(f"Job {job_id} not found") + + job = self._jobs[job_id].copy() + + # Remove internal job_data field from response + job.pop("job_data", None) + + return job + + async def cancel_job(self, job_id: str) -> bool: + """Cancel a pending or running job.""" + if job_id not in self._jobs: + return False + + job = self._jobs[job_id] + + # Can only cancel pending or running jobs + if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: + return False + + # Cancel the task if it exists + if job_id in self._tasks: + task = self._tasks[job_id] + if not task.done(): + task.cancel() + + # Update job status + job["status"] = JobStatus.CANCELLED + job["completed_at"] = datetime.now(UTC) + await self._save_job_to_storage(job) + + return True + + async def delete_job(self, job_id: str) -> bool: + """Delete a completed or cancelled job.""" + if job_id not in self._jobs: + return False + + job = self._jobs[job_id] + + # Can only delete completed, failed, or cancelled jobs + if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: + return False + + # Remove from memory and storage + del self._jobs[job_id] + await self._delete_job_from_storage(job_id) + + return True + + async def list_jobs( + self, + job_type: str | None = None, + status: JobStatus | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + """List jobs with optional filtering.""" + jobs = list(self._jobs.values()) + + # Filter by job_type + if job_type is not None: + jobs = [j for j in jobs if j["job_type"] == job_type] + + # Filter by status + if status is not None: + jobs = [j for j in jobs if j["status"] == status] + + # Sort by created_at (newest first) + jobs.sort(key=lambda j: j["created_at"], reverse=True) + + # Apply pagination + jobs = jobs[offset : offset + limit] + + # Convert to return format (remove internal fields) + result = [] + for job in jobs: + job_copy = job.copy() + # Remove internal job_data field + job_copy.pop("job_data", None) + result.append(job_copy) + + return result diff --git a/tests/unit/providers/utils/test_job_scheduler.py b/tests/unit/providers/utils/test_job_scheduler.py new file mode 100644 index 000000000..3283d8514 --- /dev/null +++ b/tests/unit/providers/utils/test_job_scheduler.py @@ -0,0 +1,280 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import asyncio +import tempfile +from pathlib import Path + +import pytest + +from llama_stack.core.storage.datatypes import KVStoreReference, SqliteKVStoreConfig +from llama_stack.providers.utils.job_scheduler import ( + InlineSchedulerConfig, + JobStatus, + scheduler_impl, +) +from llama_stack.providers.utils.kvstore import register_kvstore_backends + + +async def default_test_executor(job_data: dict) -> dict: + """Default test executor that simulates work.""" + await asyncio.sleep(0.1) + return { + "message": "Test job completed successfully", + "job_data": job_data, + } + + +@pytest.fixture +def scheduler_config(): + """Create a test scheduler config with temporary SQLite database.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test_scheduler.db" + backend_name = "kv_scheduler_test" + kvstore_config = SqliteKVStoreConfig(db_path=str(db_path)) + register_kvstore_backends({backend_name: kvstore_config}) + + yield InlineSchedulerConfig( + kvstore=KVStoreReference(backend=backend_name, namespace="job_scheduler"), + max_concurrent_jobs=5, + ) + + +async def test_inline_scheduler_basic(scheduler_config): + """Test basic scheduler functionality.""" + scheduler = await scheduler_impl(scheduler_config) + + try: + # Register default executor + scheduler.register_job_executor("test_job", default_test_executor) + + # Schedule a job + job_id = await scheduler.schedule_job( + job_type="test_job", + job_data={"test": "data"}, + metadata={"user": "test_user"}, + ) + + assert job_id is not None + assert isinstance(job_id, str) + + # Wait a bit for the job to complete + await asyncio.sleep(0.2) + + # Get job info + job_info = await scheduler.get_job_info(job_id) + assert job_info["job_id"] == job_id + assert job_info["job_type"] == "test_job" + assert job_info["status"] == JobStatus.COMPLETED.value + assert job_info["metadata"]["user"] == "test_user" + assert job_info["progress"] == 1.0 + assert job_info["result"] is not None + + finally: + await scheduler.shutdown() + + +async def test_inline_scheduler_list_jobs(scheduler_config): + """Test listing jobs with filters.""" + scheduler = await scheduler_impl(scheduler_config) + + try: + # Register executors for different job types + scheduler.register_job_executor("batch_processing", default_test_executor) + scheduler.register_job_executor("log_aggregation", default_test_executor) + + # Schedule multiple jobs + await scheduler.schedule_job( + job_type="batch_processing", + job_data={"batch": 1}, + ) + await scheduler.schedule_job( + job_type="log_aggregation", + job_data={"logs": []}, + ) + await scheduler.schedule_job( + job_type="batch_processing", + job_data={"batch": 2}, + ) + + # Wait for jobs to complete + await asyncio.sleep(0.3) + + # List all jobs + all_jobs = await scheduler.list_jobs() + assert len(all_jobs) == 3 + + # List jobs by type + batch_jobs = await scheduler.list_jobs(job_type="batch_processing") + assert len(batch_jobs) == 2 + + log_jobs = await scheduler.list_jobs(job_type="log_aggregation") + assert len(log_jobs) == 1 + + # List jobs by status + completed_jobs = await scheduler.list_jobs(status=JobStatus.COMPLETED) + assert len(completed_jobs) == 3 + + finally: + await scheduler.shutdown() + + +async def test_inline_scheduler_cancel_job(scheduler_config): + """Test cancelling a job.""" + scheduler = await scheduler_impl(scheduler_config) + + try: + # Register executor + scheduler.register_job_executor("long_running_job", default_test_executor) + + # Schedule a job + job_id = await scheduler.schedule_job( + job_type="long_running_job", + job_data={"duration": 10}, + ) + + # Try to cancel immediately + await scheduler.cancel_job(job_id) + + # Wait a bit + await asyncio.sleep(0.1) + + # Check job status + job_info = await scheduler.get_job_info(job_id) + # Job might be CANCELLED or COMPLETED depending on timing + assert job_info["status"] in [JobStatus.CANCELLED.value, JobStatus.COMPLETED.value] + + finally: + await scheduler.shutdown() + + +async def test_inline_scheduler_delete_job(scheduler_config): + """Test deleting a completed job.""" + scheduler = await scheduler_impl(scheduler_config) + + try: + # Register executor + scheduler.register_job_executor("test_job", default_test_executor) + + # Schedule a job + job_id = await scheduler.schedule_job( + job_type="test_job", + job_data={"test": "data"}, + ) + + # Wait for completion + await asyncio.sleep(0.2) + + # Verify job exists + job_info = await scheduler.get_job_info(job_id) + assert job_info["status"] == JobStatus.COMPLETED.value + + # Delete the job + success = await scheduler.delete_job(job_id) + assert success is True + + # Verify job is deleted + with pytest.raises(ValueError, match="not found"): + await scheduler.get_job_info(job_id) + + # Deleting again should return False + success = await scheduler.delete_job(job_id) + assert success is False + + finally: + await scheduler.shutdown() + + +async def test_inline_scheduler_concurrent_jobs(scheduler_config): + """Test running multiple jobs concurrently.""" + scheduler = await scheduler_impl(scheduler_config) + + try: + # Register executor + scheduler.register_job_executor("test_job", default_test_executor) + + # Schedule multiple jobs + job_ids = [] + for i in range(5): + job_id = await scheduler.schedule_job( + job_type="test_job", + job_data={"index": i}, + ) + job_ids.append(job_id) + + # Wait for all jobs to complete + await asyncio.sleep(0.5) + + # Verify all jobs completed + for job_id in job_ids: + job_info = await scheduler.get_job_info(job_id) + assert job_info["status"] == JobStatus.COMPLETED.value + + finally: + await scheduler.shutdown() + + +async def test_inline_scheduler_pagination(scheduler_config): + """Test job listing with pagination.""" + scheduler = await scheduler_impl(scheduler_config) + + try: + # Register executor + scheduler.register_job_executor("test_job", default_test_executor) + + # Schedule 10 jobs + for i in range(10): + await scheduler.schedule_job( + job_type="test_job", + job_data={"index": i}, + ) + + # Wait for completion + await asyncio.sleep(0.5) + + # Test pagination + page1 = await scheduler.list_jobs(limit=5, offset=0) + assert len(page1) == 5 + + page2 = await scheduler.list_jobs(limit=5, offset=5) + assert len(page2) == 5 + + page3 = await scheduler.list_jobs(limit=5, offset=10) + assert len(page3) == 0 + + finally: + await scheduler.shutdown() + + +async def test_inline_scheduler_register_job_executor(scheduler_config): + """Test registering a job executor.""" + scheduler = await scheduler_impl(scheduler_config) + + try: + # Define a custom job executor + async def custom_executor(job_data: dict) -> dict: + return {"custom": "result", "input": job_data} + + # Register the executor + scheduler.register_job_executor("custom_job_type", custom_executor) + + # Schedule a job with the custom type + job_id = await scheduler.schedule_job( + job_type="custom_job_type", + job_data={"test": "data"}, + ) + + # Wait for job to complete + await asyncio.sleep(0.2) + + # Verify the custom executor was called + job_info = await scheduler.get_job_info(job_id) + assert job_info["status"] == JobStatus.COMPLETED.value + assert job_info["result"]["custom"] == "result" + assert job_info["result"]["input"]["test"] == "data" + + finally: + await scheduler.shutdown() From 2daecd34f1afc1e10508fcc3a431a85c13a065e8 Mon Sep 17 00:00:00 2001 From: Swapna Lekkala Date: Wed, 29 Oct 2025 11:58:31 -0700 Subject: [PATCH 02/10] rebase --- .../providers/utils/job_scheduler/__init__.py | 47 +++ .../providers/utils/job_scheduler/api.py | 135 ++++++++ .../utils/job_scheduler/celery/__init__.py | 9 + .../utils/job_scheduler/celery/scheduler.py | 81 +++++ .../providers/utils/job_scheduler/config.py | 58 ++++ .../utils/job_scheduler/inline/__init__.py | 9 + .../utils/job_scheduler/inline/scheduler.py | 303 ++++++++++++++++++ 7 files changed, 642 insertions(+) create mode 100644 src/llama_stack/providers/utils/job_scheduler/__init__.py create mode 100644 src/llama_stack/providers/utils/job_scheduler/api.py create mode 100644 src/llama_stack/providers/utils/job_scheduler/celery/__init__.py create mode 100644 src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py create mode 100644 src/llama_stack/providers/utils/job_scheduler/config.py create mode 100644 src/llama_stack/providers/utils/job_scheduler/inline/__init__.py create mode 100644 src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py diff --git a/src/llama_stack/providers/utils/job_scheduler/__init__.py b/src/llama_stack/providers/utils/job_scheduler/__init__.py new file mode 100644 index 000000000..e09d9f45d --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/__init__.py @@ -0,0 +1,47 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .api import JobStatus, Scheduler +from .config import CelerySchedulerConfig, InlineSchedulerConfig, SchedulerConfig + + +async def scheduler_impl(config: SchedulerConfig) -> Scheduler: + """ + Factory function to instantiate scheduler implementations. + + Args: + config: Scheduler configuration (InlineSchedulerConfig or CelerySchedulerConfig) + + Returns: + Scheduler: An initialized scheduler instance + + Raises: + ValueError: If the config type is unknown + """ + impl: Scheduler + if isinstance(config, InlineSchedulerConfig): + from .inline import InlineSchedulerImpl + + impl = InlineSchedulerImpl(config) + elif isinstance(config, CelerySchedulerConfig): + from .celery import CelerySchedulerImpl + + impl = CelerySchedulerImpl(config) + else: + raise ValueError(f"Unknown scheduler config type: {type(config)}") + + await impl.initialize() + return impl + + +__all__ = [ + "JobStatus", + "Scheduler", + "SchedulerConfig", + "InlineSchedulerConfig", + "CelerySchedulerConfig", + "scheduler_impl", +] diff --git a/src/llama_stack/providers/utils/job_scheduler/api.py b/src/llama_stack/providers/utils/job_scheduler/api.py new file mode 100644 index 000000000..e46da8fcb --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/api.py @@ -0,0 +1,135 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from collections.abc import Awaitable, Callable +from enum import StrEnum +from typing import Protocol + + +class JobStatus(StrEnum): + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +class Scheduler(Protocol): + """ + Abstract scheduler protocol for managing async jobs. + + This provides a pluggable backend for job scheduling and execution. + """ + + async def schedule_job( + self, + job_type: str, + job_data: dict, + metadata: dict | None = None, + ) -> str: + """ + Schedule a new job for execution. + + Args: + job_type: Type of job (e.g., "batch_processing", "log_aggregation", "metrics_collection") + job_data: Job-specific data and parameters + metadata: Additional metadata for tracking + + Returns: + job_id: UUID for tracking the job + """ + ... + + async def get_job_info(self, job_id: str) -> dict: + """ + Get complete information about a job. + + Returns: + { + "job_id": str, + "job_type": str, + "status": JobStatus, + "created_at": datetime, + "started_at": datetime | None, + "completed_at": datetime | None, + "progress": float, # 0.0 to 1.0 + "metadata": dict, + "error": str | None, # Error message if status == FAILED + "result": dict | None, # Job result if status == COMPLETED + } + """ + ... + + async def cancel_job(self, job_id: str) -> bool: + """ + Cancel a pending or running job. + + Args: + job_id: Job to cancel + + Returns: + True if job was cancelled, False if not found or already completed + """ + ... + + async def delete_job(self, job_id: str) -> bool: + """ + Delete a completed or cancelled job. + + Args: + job_id: Job to delete + + Returns: + True if job was deleted, False if not found + """ + ... + + async def list_jobs( + self, + job_type: str | None = None, + status: JobStatus | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + """ + List jobs with optional filtering. + + Args: + job_type: Filter by job type (e.g., "batch_processing", "log_aggregation", "metrics_collection") + status: Filter by status + limit: Maximum number of jobs to return + offset: Offset for pagination + + Returns: + List of job info dictionaries + """ + ... + + def register_job_executor( + self, + job_type: str, + executor: Callable[[dict], Awaitable[dict]], + ) -> None: + """ + Register a job executor function for a specific job type. + + This allows components to register custom job execution logic with the scheduler. + When a job of the registered type is executed, the scheduler will call the + registered executor function. + + Args: + job_type: The type of job (e.g., "vector_store_file_batch") + executor: Async function that takes job_data dict and returns result dict + """ + ... + + async def initialize(self) -> None: + """Initialize the scheduler (connect to backend, etc.)""" + ... + + async def shutdown(self) -> None: + """Gracefully shutdown the scheduler""" + ... diff --git a/src/llama_stack/providers/utils/job_scheduler/celery/__init__.py b/src/llama_stack/providers/utils/job_scheduler/celery/__init__.py new file mode 100644 index 000000000..27d9d01cb --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/celery/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .scheduler import CelerySchedulerImpl + +__all__ = ["CelerySchedulerImpl"] diff --git a/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py b/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py new file mode 100644 index 000000000..0a5b6e220 --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py @@ -0,0 +1,81 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from collections.abc import Awaitable, Callable + +from ..api import JobStatus, Scheduler +from ..config import CelerySchedulerConfig + + +class CelerySchedulerImpl(Scheduler): + """ + Celery-based scheduler implementation for distributed multi-worker deployments. + + This scheduler uses Celery to distribute jobs across multiple worker processes + or machines. It provides: + - Persistent job queue (via Redis/RabbitMQ broker) + - Multi-worker coordination + - Crash recovery (jobs survive server restarts) + - Distributed task execution + + This is suitable for: + - Production deployments + - Multi-worker scenarios + - High availability requirements + """ + + def __init__(self, config: CelerySchedulerConfig): + self.config = config + self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} + # TODO: Initialize Celery app with broker and result backend + raise NotImplementedError("Celery scheduler not yet implemented") + + def register_job_executor( + self, + job_type: str, + executor: Callable[[dict], Awaitable[dict]], + ) -> None: + """Register a job executor function for a specific job type.""" + self._job_executors[job_type] = executor + + async def initialize(self) -> None: + """Initialize the Celery scheduler.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def shutdown(self) -> None: + """Gracefully shutdown the Celery scheduler.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def schedule_job( + self, + job_type: str, + job_data: dict, + metadata: dict | None = None, + ) -> str: + """Schedule a new job for execution via Celery.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def get_job_info(self, job_id: str) -> dict: + """Get complete information about a job from Celery result backend.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def cancel_job(self, job_id: str) -> bool: + """Cancel a pending or running Celery job.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def delete_job(self, job_id: str) -> bool: + """Delete a completed or cancelled job from Celery result backend.""" + raise NotImplementedError("Celery scheduler not yet implemented") + + async def list_jobs( + self, + job_type: str | None = None, + status: JobStatus | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + """List jobs from Celery result backend with optional filtering.""" + raise NotImplementedError("Celery scheduler not yet implemented") diff --git a/src/llama_stack/providers/utils/job_scheduler/config.py b/src/llama_stack/providers/utils/job_scheduler/config.py new file mode 100644 index 000000000..d8eba89fb --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/config.py @@ -0,0 +1,58 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from typing import Annotated, Literal + +from pydantic import BaseModel, Field + +from llama_stack.core.storage.datatypes import KVStoreReference + + +class SchedulerConfig(BaseModel): + """Base class for scheduler configurations.""" + + type: str + + +class InlineSchedulerConfig(SchedulerConfig): + """ + Configuration for inline (asyncio-based) scheduler. + + This scheduler runs jobs in the same process using asyncio tasks. + Suitable for development and single-worker deployments. + """ + + type: Literal["inline"] = "inline" + kvstore: KVStoreReference = Field( + description="KVStore reference for persisting job state", + ) + max_concurrent_jobs: int = Field( + default=10, + description="Maximum number of jobs that can run concurrently", + ) + + +class CelerySchedulerConfig(SchedulerConfig): + """ + Configuration for Celery-based distributed scheduler. + + This scheduler distributes jobs across multiple worker processes/machines. + Suitable for production and multi-worker deployments. + """ + + type: Literal["celery"] = "celery" + broker_url: str = Field( + description="Celery broker URL (e.g., 'redis://localhost:6379/0')", + ) + result_backend: str = Field( + description="Celery result backend URL (e.g., 'redis://localhost:6379/1')", + ) + + +SchedulerConfigUnion = Annotated[ + InlineSchedulerConfig | CelerySchedulerConfig, + Field(discriminator="type"), +] diff --git a/src/llama_stack/providers/utils/job_scheduler/inline/__init__.py b/src/llama_stack/providers/utils/job_scheduler/inline/__init__.py new file mode 100644 index 000000000..5be56af5e --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/inline/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +from .scheduler import InlineSchedulerImpl + +__all__ = ["InlineSchedulerImpl"] diff --git a/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py b/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py new file mode 100644 index 000000000..401bb0d27 --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py @@ -0,0 +1,303 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +import asyncio +import json +import traceback +import uuid +from collections.abc import Awaitable, Callable +from datetime import UTC, datetime +from typing import Any + +from llama_stack.core.utils.serialize import EnumEncoder +from llama_stack.providers.utils.kvstore import kvstore_impl +from llama_stack.providers.utils.kvstore.api import KVStore + +from ..api import JobStatus, Scheduler +from ..config import InlineSchedulerConfig + +JOB_PREFIX = "job_scheduler:jobs:" + + +class InlineSchedulerImpl(Scheduler): + """ + Inline scheduler implementation using asyncio for single-worker deployments. + + This scheduler runs jobs in the same process using asyncio tasks. Jobs and their + state are persisted to KVStore for crash recovery. + + This is suitable for: + - Development and testing + - Single-worker deployments + - Scenarios where external dependencies (Redis, RabbitMQ) are not available + """ + + def __init__(self, config: InlineSchedulerConfig): + self.config = config + self._jobs: dict[str, dict[str, Any]] = {} + self._tasks: dict[str, asyncio.Task] = {} + self._semaphore: asyncio.Semaphore + self._shutdown_event = asyncio.Event() + self._kvstore: KVStore + self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} + + async def initialize(self) -> None: + """Initialize the scheduler and load persisted jobs.""" + self._semaphore = asyncio.Semaphore(self.config.max_concurrent_jobs) + + # Initialize KVStore + self._kvstore = await kvstore_impl(self.config.kvstore) + + # Load persisted jobs from KVStore + await self._load_jobs_from_storage() + + # Resume incomplete jobs + await self._resume_incomplete_jobs() + + def register_job_executor( + self, + job_type: str, + executor: Callable[[dict], Awaitable[dict]], + ) -> None: + """Register a job executor function for a specific job type.""" + self._job_executors[job_type] = executor + + async def _load_jobs_from_storage(self) -> None: + """Load all jobs from KVStore into memory.""" + start_key = JOB_PREFIX + end_key = f"{JOB_PREFIX}\xff" + + stored_values = await self._kvstore.values_in_range(start_key, end_key) + + for value in stored_values: + job = json.loads(value) + # Deserialize datetime strings back to datetime objects + job["created_at"] = datetime.fromisoformat(job["created_at"]) + if job.get("started_at"): + job["started_at"] = datetime.fromisoformat(job["started_at"]) + if job.get("completed_at"): + job["completed_at"] = datetime.fromisoformat(job["completed_at"]) + job["status"] = JobStatus(job["status"]) + + self._jobs[job["job_id"]] = job + + async def _resume_incomplete_jobs(self) -> None: + """Resume jobs that were running when server crashed.""" + for job_id, job in self._jobs.items(): + if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: + # Reset running jobs to pending + if job["status"] == JobStatus.RUNNING: + job["status"] = JobStatus.PENDING + job["started_at"] = None + await self._save_job_to_storage(job) + + # Restart the job + task = asyncio.create_task(self._run_job(job_id)) + self._tasks[job_id] = task + + async def _save_job_to_storage(self, job: dict[str, Any]) -> None: + """Persist job to KVStore.""" + key = f"{JOB_PREFIX}{job['job_id']}" + await self._kvstore.set(key, json.dumps(job, cls=EnumEncoder)) + + async def _delete_job_from_storage(self, job_id: str) -> None: + """Delete job from KVStore.""" + key = f"{JOB_PREFIX}{job_id}" + await self._kvstore.delete(key) + + async def shutdown(self) -> None: + """Gracefully shutdown the scheduler.""" + self._shutdown_event.set() + + # Cancel all running tasks + for task in self._tasks.values(): + if not task.done(): + task.cancel() + + # Wait for all tasks to complete + if self._tasks: + await asyncio.gather(*self._tasks.values(), return_exceptions=True) + + self._tasks.clear() + + async def schedule_job( + self, + job_type: str, + job_data: dict, + metadata: dict | None = None, + ) -> str: + """Schedule a new job for execution.""" + job_id = str(uuid.uuid4()) + + job_info = { + "job_id": job_id, + "job_type": job_type, + "status": JobStatus.PENDING, + "created_at": datetime.now(UTC), + "started_at": None, + "completed_at": None, + "progress": 0.0, + "metadata": metadata or {}, + "job_data": job_data, + "error": None, + "result": None, + } + + self._jobs[job_id] = job_info + + # Persist to KVStore + await self._save_job_to_storage(job_info) + + # Create and schedule the task + task = asyncio.create_task(self._run_job(job_id)) + self._tasks[job_id] = task + + return job_id + + async def _run_job(self, job_id: str) -> None: + """Run a job asynchronously.""" + job = self._jobs[job_id] + + try: + # Acquire semaphore to limit concurrent jobs + async with self._semaphore: + # Update status to RUNNING + job["status"] = JobStatus.RUNNING + job["started_at"] = datetime.now(UTC) + await self._save_job_to_storage(job) + + # Execute the job based on job_type + result = await self._execute_job(job) + + # Mark as completed + job["status"] = JobStatus.COMPLETED + job["completed_at"] = datetime.now(UTC) + job["progress"] = 1.0 + job["result"] = result + await self._save_job_to_storage(job) + + except asyncio.CancelledError: + # Job was cancelled + job["status"] = JobStatus.CANCELLED + job["completed_at"] = datetime.now(UTC) + await self._save_job_to_storage(job) + raise + + except Exception as e: + # Job failed + job["status"] = JobStatus.FAILED + job["completed_at"] = datetime.now(UTC) + job["error"] = str(e) + job["result"] = {"error_details": traceback.format_exc()} + await self._save_job_to_storage(job) + + finally: + # Clean up task reference + if job_id in self._tasks: + del self._tasks[job_id] + + async def _execute_job(self, job: dict) -> dict: + """ + Execute a job based on its type. + + If a custom executor is registered for the job type, it will be called. + Otherwise, raises an error for unknown job types. + """ + job_type = job["job_type"] + job_data = job["job_data"] + + if job_type in self._job_executors: + executor = self._job_executors[job_type] + return await executor(job_data) + + raise ValueError(f"No executor registered for job type: {job_type}") + + async def get_job_info(self, job_id: str) -> dict: + """Get complete information about a job.""" + if job_id not in self._jobs: + raise ValueError(f"Job {job_id} not found") + + job = self._jobs[job_id].copy() + + # Remove internal job_data field from response + job.pop("job_data", None) + + return job + + async def cancel_job(self, job_id: str) -> bool: + """Cancel a pending or running job.""" + if job_id not in self._jobs: + return False + + job = self._jobs[job_id] + + # Can only cancel pending or running jobs + if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: + return False + + # Cancel the task if it exists + if job_id in self._tasks: + task = self._tasks[job_id] + if not task.done(): + task.cancel() + + # Update job status + job["status"] = JobStatus.CANCELLED + job["completed_at"] = datetime.now(UTC) + await self._save_job_to_storage(job) + + return True + + async def delete_job(self, job_id: str) -> bool: + """Delete a completed or cancelled job.""" + if job_id not in self._jobs: + return False + + job = self._jobs[job_id] + + # Can only delete completed, failed, or cancelled jobs + if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: + return False + + # Remove from memory and storage + del self._jobs[job_id] + await self._delete_job_from_storage(job_id) + + return True + + async def list_jobs( + self, + job_type: str | None = None, + status: JobStatus | None = None, + limit: int = 100, + offset: int = 0, + ) -> list[dict]: + """List jobs with optional filtering.""" + jobs = list(self._jobs.values()) + + # Filter by job_type + if job_type is not None: + jobs = [j for j in jobs if j["job_type"] == job_type] + + # Filter by status + if status is not None: + jobs = [j for j in jobs if j["status"] == status] + + # Sort by created_at (newest first) + jobs.sort(key=lambda j: j["created_at"], reverse=True) + + # Apply pagination + jobs = jobs[offset : offset + limit] + + # Convert to return format (remove internal fields) + result = [] + for job in jobs: + job_copy = job.copy() + # Remove internal job_data field + job_copy.pop("job_data", None) + result.append(job_copy) + + return result From 0a98415e25ef4672a1698a5b4cfc1af7070b9d08 Mon Sep 17 00:00:00 2001 From: Swapna Lekkala Date: Wed, 29 Oct 2025 12:04:21 -0700 Subject: [PATCH 03/10] clean --- llama_stack/providers/utils/job_scheduler/config.py | 1 - .../providers/utils/job_scheduler/inline/scheduler.py | 6 +----- .../providers/utils/job_scheduler/inline/scheduler.py | 1 - 3 files changed, 1 insertion(+), 7 deletions(-) diff --git a/llama_stack/providers/utils/job_scheduler/config.py b/llama_stack/providers/utils/job_scheduler/config.py index 325387cfd..d8eba89fb 100644 --- a/llama_stack/providers/utils/job_scheduler/config.py +++ b/llama_stack/providers/utils/job_scheduler/config.py @@ -52,7 +52,6 @@ class CelerySchedulerConfig(SchedulerConfig): ) -# Union type for all scheduler configs with discriminator SchedulerConfigUnion = Annotated[ InlineSchedulerConfig | CelerySchedulerConfig, Field(discriminator="type"), diff --git a/llama_stack/providers/utils/job_scheduler/inline/scheduler.py b/llama_stack/providers/utils/job_scheduler/inline/scheduler.py index b53621128..b253b8a59 100644 --- a/llama_stack/providers/utils/job_scheduler/inline/scheduler.py +++ b/llama_stack/providers/utils/job_scheduler/inline/scheduler.py @@ -80,7 +80,6 @@ class InlineSchedulerImpl(Scheduler): job["started_at"] = datetime.fromisoformat(job["started_at"]) if job.get("completed_at"): job["completed_at"] = datetime.fromisoformat(job["completed_at"]) - # Deserialize JobStatus enum job["status"] = JobStatus(job["status"]) self._jobs[job["job_id"]] = job @@ -171,8 +170,6 @@ class InlineSchedulerImpl(Scheduler): await self._save_job_to_storage(job) # Execute the job based on job_type - # This is where job-specific logic would be called - # For now, we'll simulate a simple job execution result = await self._execute_job(job) # Mark as completed @@ -215,9 +212,8 @@ class InlineSchedulerImpl(Scheduler): # Check if a custom executor is registered for this job type if job_type in self._job_executors: executor = self._job_executors[job_type] - return await executor(job_data) # Call the registered executor + return await executor(job_data) - # No executor registered for this job type raise ValueError(f"No executor registered for job type: {job_type}") async def get_job_info(self, job_id: str) -> dict: diff --git a/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py b/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py index 401bb0d27..68d201604 100644 --- a/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py +++ b/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py @@ -74,7 +74,6 @@ class InlineSchedulerImpl(Scheduler): for value in stored_values: job = json.loads(value) - # Deserialize datetime strings back to datetime objects job["created_at"] = datetime.fromisoformat(job["created_at"]) if job.get("started_at"): job["started_at"] = datetime.fromisoformat(job["started_at"]) From 9cf349a263bf7f3475cd048695eff3137e9f2a7a Mon Sep 17 00:00:00 2001 From: Swapna Lekkala Date: Tue, 4 Nov 2025 10:42:00 -0800 Subject: [PATCH 04/10] add readme and keep only stubs --- .../providers/utils/job_scheduler/__init__.py | 47 --- .../providers/utils/job_scheduler/api.py | 135 -------- .../utils/job_scheduler/celery/__init__.py | 9 - .../utils/job_scheduler/celery/scheduler.py | 81 ----- .../providers/utils/job_scheduler/config.py | 58 ---- .../utils/job_scheduler/inline/__init__.py | 9 - .../utils/job_scheduler/inline/scheduler.py | 304 ------------------ .../providers/utils/job_scheduler/README.md | 237 ++++++++++++++ .../providers/utils/job_scheduler/api.py | 15 +- .../utils/job_scheduler/celery/scheduler.py | 22 +- .../utils/job_scheduler/inline/scheduler.py | 246 +------------- .../providers/utils/test_job_scheduler.py | 274 ++++------------ 12 files changed, 328 insertions(+), 1109 deletions(-) delete mode 100644 llama_stack/providers/utils/job_scheduler/__init__.py delete mode 100644 llama_stack/providers/utils/job_scheduler/api.py delete mode 100644 llama_stack/providers/utils/job_scheduler/celery/__init__.py delete mode 100644 llama_stack/providers/utils/job_scheduler/celery/scheduler.py delete mode 100644 llama_stack/providers/utils/job_scheduler/config.py delete mode 100644 llama_stack/providers/utils/job_scheduler/inline/__init__.py delete mode 100644 llama_stack/providers/utils/job_scheduler/inline/scheduler.py create mode 100644 src/llama_stack/providers/utils/job_scheduler/README.md diff --git a/llama_stack/providers/utils/job_scheduler/__init__.py b/llama_stack/providers/utils/job_scheduler/__init__.py deleted file mode 100644 index e09d9f45d..000000000 --- a/llama_stack/providers/utils/job_scheduler/__init__.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -from .api import JobStatus, Scheduler -from .config import CelerySchedulerConfig, InlineSchedulerConfig, SchedulerConfig - - -async def scheduler_impl(config: SchedulerConfig) -> Scheduler: - """ - Factory function to instantiate scheduler implementations. - - Args: - config: Scheduler configuration (InlineSchedulerConfig or CelerySchedulerConfig) - - Returns: - Scheduler: An initialized scheduler instance - - Raises: - ValueError: If the config type is unknown - """ - impl: Scheduler - if isinstance(config, InlineSchedulerConfig): - from .inline import InlineSchedulerImpl - - impl = InlineSchedulerImpl(config) - elif isinstance(config, CelerySchedulerConfig): - from .celery import CelerySchedulerImpl - - impl = CelerySchedulerImpl(config) - else: - raise ValueError(f"Unknown scheduler config type: {type(config)}") - - await impl.initialize() - return impl - - -__all__ = [ - "JobStatus", - "Scheduler", - "SchedulerConfig", - "InlineSchedulerConfig", - "CelerySchedulerConfig", - "scheduler_impl", -] diff --git a/llama_stack/providers/utils/job_scheduler/api.py b/llama_stack/providers/utils/job_scheduler/api.py deleted file mode 100644 index e46da8fcb..000000000 --- a/llama_stack/providers/utils/job_scheduler/api.py +++ /dev/null @@ -1,135 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -from collections.abc import Awaitable, Callable -from enum import StrEnum -from typing import Protocol - - -class JobStatus(StrEnum): - PENDING = "pending" - RUNNING = "running" - COMPLETED = "completed" - FAILED = "failed" - CANCELLED = "cancelled" - - -class Scheduler(Protocol): - """ - Abstract scheduler protocol for managing async jobs. - - This provides a pluggable backend for job scheduling and execution. - """ - - async def schedule_job( - self, - job_type: str, - job_data: dict, - metadata: dict | None = None, - ) -> str: - """ - Schedule a new job for execution. - - Args: - job_type: Type of job (e.g., "batch_processing", "log_aggregation", "metrics_collection") - job_data: Job-specific data and parameters - metadata: Additional metadata for tracking - - Returns: - job_id: UUID for tracking the job - """ - ... - - async def get_job_info(self, job_id: str) -> dict: - """ - Get complete information about a job. - - Returns: - { - "job_id": str, - "job_type": str, - "status": JobStatus, - "created_at": datetime, - "started_at": datetime | None, - "completed_at": datetime | None, - "progress": float, # 0.0 to 1.0 - "metadata": dict, - "error": str | None, # Error message if status == FAILED - "result": dict | None, # Job result if status == COMPLETED - } - """ - ... - - async def cancel_job(self, job_id: str) -> bool: - """ - Cancel a pending or running job. - - Args: - job_id: Job to cancel - - Returns: - True if job was cancelled, False if not found or already completed - """ - ... - - async def delete_job(self, job_id: str) -> bool: - """ - Delete a completed or cancelled job. - - Args: - job_id: Job to delete - - Returns: - True if job was deleted, False if not found - """ - ... - - async def list_jobs( - self, - job_type: str | None = None, - status: JobStatus | None = None, - limit: int = 100, - offset: int = 0, - ) -> list[dict]: - """ - List jobs with optional filtering. - - Args: - job_type: Filter by job type (e.g., "batch_processing", "log_aggregation", "metrics_collection") - status: Filter by status - limit: Maximum number of jobs to return - offset: Offset for pagination - - Returns: - List of job info dictionaries - """ - ... - - def register_job_executor( - self, - job_type: str, - executor: Callable[[dict], Awaitable[dict]], - ) -> None: - """ - Register a job executor function for a specific job type. - - This allows components to register custom job execution logic with the scheduler. - When a job of the registered type is executed, the scheduler will call the - registered executor function. - - Args: - job_type: The type of job (e.g., "vector_store_file_batch") - executor: Async function that takes job_data dict and returns result dict - """ - ... - - async def initialize(self) -> None: - """Initialize the scheduler (connect to backend, etc.)""" - ... - - async def shutdown(self) -> None: - """Gracefully shutdown the scheduler""" - ... diff --git a/llama_stack/providers/utils/job_scheduler/celery/__init__.py b/llama_stack/providers/utils/job_scheduler/celery/__init__.py deleted file mode 100644 index 27d9d01cb..000000000 --- a/llama_stack/providers/utils/job_scheduler/celery/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -from .scheduler import CelerySchedulerImpl - -__all__ = ["CelerySchedulerImpl"] diff --git a/llama_stack/providers/utils/job_scheduler/celery/scheduler.py b/llama_stack/providers/utils/job_scheduler/celery/scheduler.py deleted file mode 100644 index 0a5b6e220..000000000 --- a/llama_stack/providers/utils/job_scheduler/celery/scheduler.py +++ /dev/null @@ -1,81 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -from collections.abc import Awaitable, Callable - -from ..api import JobStatus, Scheduler -from ..config import CelerySchedulerConfig - - -class CelerySchedulerImpl(Scheduler): - """ - Celery-based scheduler implementation for distributed multi-worker deployments. - - This scheduler uses Celery to distribute jobs across multiple worker processes - or machines. It provides: - - Persistent job queue (via Redis/RabbitMQ broker) - - Multi-worker coordination - - Crash recovery (jobs survive server restarts) - - Distributed task execution - - This is suitable for: - - Production deployments - - Multi-worker scenarios - - High availability requirements - """ - - def __init__(self, config: CelerySchedulerConfig): - self.config = config - self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} - # TODO: Initialize Celery app with broker and result backend - raise NotImplementedError("Celery scheduler not yet implemented") - - def register_job_executor( - self, - job_type: str, - executor: Callable[[dict], Awaitable[dict]], - ) -> None: - """Register a job executor function for a specific job type.""" - self._job_executors[job_type] = executor - - async def initialize(self) -> None: - """Initialize the Celery scheduler.""" - raise NotImplementedError("Celery scheduler not yet implemented") - - async def shutdown(self) -> None: - """Gracefully shutdown the Celery scheduler.""" - raise NotImplementedError("Celery scheduler not yet implemented") - - async def schedule_job( - self, - job_type: str, - job_data: dict, - metadata: dict | None = None, - ) -> str: - """Schedule a new job for execution via Celery.""" - raise NotImplementedError("Celery scheduler not yet implemented") - - async def get_job_info(self, job_id: str) -> dict: - """Get complete information about a job from Celery result backend.""" - raise NotImplementedError("Celery scheduler not yet implemented") - - async def cancel_job(self, job_id: str) -> bool: - """Cancel a pending or running Celery job.""" - raise NotImplementedError("Celery scheduler not yet implemented") - - async def delete_job(self, job_id: str) -> bool: - """Delete a completed or cancelled job from Celery result backend.""" - raise NotImplementedError("Celery scheduler not yet implemented") - - async def list_jobs( - self, - job_type: str | None = None, - status: JobStatus | None = None, - limit: int = 100, - offset: int = 0, - ) -> list[dict]: - """List jobs from Celery result backend with optional filtering.""" - raise NotImplementedError("Celery scheduler not yet implemented") diff --git a/llama_stack/providers/utils/job_scheduler/config.py b/llama_stack/providers/utils/job_scheduler/config.py deleted file mode 100644 index d8eba89fb..000000000 --- a/llama_stack/providers/utils/job_scheduler/config.py +++ /dev/null @@ -1,58 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -from typing import Annotated, Literal - -from pydantic import BaseModel, Field - -from llama_stack.core.storage.datatypes import KVStoreReference - - -class SchedulerConfig(BaseModel): - """Base class for scheduler configurations.""" - - type: str - - -class InlineSchedulerConfig(SchedulerConfig): - """ - Configuration for inline (asyncio-based) scheduler. - - This scheduler runs jobs in the same process using asyncio tasks. - Suitable for development and single-worker deployments. - """ - - type: Literal["inline"] = "inline" - kvstore: KVStoreReference = Field( - description="KVStore reference for persisting job state", - ) - max_concurrent_jobs: int = Field( - default=10, - description="Maximum number of jobs that can run concurrently", - ) - - -class CelerySchedulerConfig(SchedulerConfig): - """ - Configuration for Celery-based distributed scheduler. - - This scheduler distributes jobs across multiple worker processes/machines. - Suitable for production and multi-worker deployments. - """ - - type: Literal["celery"] = "celery" - broker_url: str = Field( - description="Celery broker URL (e.g., 'redis://localhost:6379/0')", - ) - result_backend: str = Field( - description="Celery result backend URL (e.g., 'redis://localhost:6379/1')", - ) - - -SchedulerConfigUnion = Annotated[ - InlineSchedulerConfig | CelerySchedulerConfig, - Field(discriminator="type"), -] diff --git a/llama_stack/providers/utils/job_scheduler/inline/__init__.py b/llama_stack/providers/utils/job_scheduler/inline/__init__.py deleted file mode 100644 index 5be56af5e..000000000 --- a/llama_stack/providers/utils/job_scheduler/inline/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -from .scheduler import InlineSchedulerImpl - -__all__ = ["InlineSchedulerImpl"] diff --git a/llama_stack/providers/utils/job_scheduler/inline/scheduler.py b/llama_stack/providers/utils/job_scheduler/inline/scheduler.py deleted file mode 100644 index b253b8a59..000000000 --- a/llama_stack/providers/utils/job_scheduler/inline/scheduler.py +++ /dev/null @@ -1,304 +0,0 @@ -# Copyright (c) Meta Platforms, Inc. and affiliates. -# All rights reserved. -# -# This source code is licensed under the terms described in the LICENSE file in -# the root directory of this source tree. - -import asyncio -import json -import traceback -import uuid -from collections.abc import Awaitable, Callable -from datetime import UTC, datetime -from typing import Any - -from llama_stack.core.utils.serialize import EnumEncoder -from llama_stack.providers.utils.kvstore import kvstore_impl -from llama_stack.providers.utils.kvstore.api import KVStore - -from ..api import JobStatus, Scheduler -from ..config import InlineSchedulerConfig - -JOB_PREFIX = "job_scheduler:jobs:" - - -class InlineSchedulerImpl(Scheduler): - """ - Inline scheduler implementation using asyncio for single-worker deployments. - - This scheduler runs jobs in the same process using asyncio tasks. Jobs and their - state are persisted to KVStore for crash recovery. - - This is suitable for: - - Development and testing - - Single-worker deployments - - Scenarios where external dependencies (Redis, RabbitMQ) are not available - """ - - def __init__(self, config: InlineSchedulerConfig): - self.config = config - self._jobs: dict[str, dict[str, Any]] = {} - self._tasks: dict[str, asyncio.Task] = {} - self._semaphore: asyncio.Semaphore - self._shutdown_event = asyncio.Event() - self._kvstore: KVStore - self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} - - async def initialize(self) -> None: - """Initialize the scheduler and load persisted jobs.""" - self._semaphore = asyncio.Semaphore(self.config.max_concurrent_jobs) - - # Initialize KVStore - self._kvstore = await kvstore_impl(self.config.kvstore) - - # Load persisted jobs from KVStore - await self._load_jobs_from_storage() - - # Resume incomplete jobs - await self._resume_incomplete_jobs() - - def register_job_executor( - self, - job_type: str, - executor: Callable[[dict], Awaitable[dict]], - ) -> None: - """Register a job executor function for a specific job type.""" - self._job_executors[job_type] = executor - - async def _load_jobs_from_storage(self) -> None: - """Load all jobs from KVStore into memory.""" - start_key = JOB_PREFIX - end_key = f"{JOB_PREFIX}\xff" - - stored_values = await self._kvstore.values_in_range(start_key, end_key) - - for value in stored_values: - job = json.loads(value) - # Deserialize datetime strings back to datetime objects - job["created_at"] = datetime.fromisoformat(job["created_at"]) - if job.get("started_at"): - job["started_at"] = datetime.fromisoformat(job["started_at"]) - if job.get("completed_at"): - job["completed_at"] = datetime.fromisoformat(job["completed_at"]) - job["status"] = JobStatus(job["status"]) - - self._jobs[job["job_id"]] = job - - async def _resume_incomplete_jobs(self) -> None: - """Resume jobs that were running when server crashed.""" - for job_id, job in self._jobs.items(): - if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: - # Reset running jobs to pending - if job["status"] == JobStatus.RUNNING: - job["status"] = JobStatus.PENDING - job["started_at"] = None - await self._save_job_to_storage(job) - - # Restart the job - task = asyncio.create_task(self._run_job(job_id)) - self._tasks[job_id] = task - - async def _save_job_to_storage(self, job: dict[str, Any]) -> None: - """Persist job to KVStore.""" - key = f"{JOB_PREFIX}{job['job_id']}" - await self._kvstore.set(key, json.dumps(job, cls=EnumEncoder)) - - async def _delete_job_from_storage(self, job_id: str) -> None: - """Delete job from KVStore.""" - key = f"{JOB_PREFIX}{job_id}" - await self._kvstore.delete(key) - - async def shutdown(self) -> None: - """Gracefully shutdown the scheduler.""" - self._shutdown_event.set() - - # Cancel all running tasks - for task in self._tasks.values(): - if not task.done(): - task.cancel() - - # Wait for all tasks to complete - if self._tasks: - await asyncio.gather(*self._tasks.values(), return_exceptions=True) - - self._tasks.clear() - - async def schedule_job( - self, - job_type: str, - job_data: dict, - metadata: dict | None = None, - ) -> str: - """Schedule a new job for execution.""" - job_id = str(uuid.uuid4()) - - job_info = { - "job_id": job_id, - "job_type": job_type, - "status": JobStatus.PENDING, - "created_at": datetime.now(UTC), - "started_at": None, - "completed_at": None, - "progress": 0.0, - "metadata": metadata or {}, - "job_data": job_data, - "error": None, - "result": None, - } - - self._jobs[job_id] = job_info - - # Persist to KVStore - await self._save_job_to_storage(job_info) - - # Create and schedule the task - task = asyncio.create_task(self._run_job(job_id)) - self._tasks[job_id] = task - - return job_id - - async def _run_job(self, job_id: str) -> None: - """Run a job asynchronously.""" - job = self._jobs[job_id] - - try: - # Acquire semaphore to limit concurrent jobs - async with self._semaphore: - # Update status to RUNNING - job["status"] = JobStatus.RUNNING - job["started_at"] = datetime.now(UTC) - await self._save_job_to_storage(job) - - # Execute the job based on job_type - result = await self._execute_job(job) - - # Mark as completed - job["status"] = JobStatus.COMPLETED - job["completed_at"] = datetime.now(UTC) - job["progress"] = 1.0 - job["result"] = result - await self._save_job_to_storage(job) - - except asyncio.CancelledError: - # Job was cancelled - job["status"] = JobStatus.CANCELLED - job["completed_at"] = datetime.now(UTC) - await self._save_job_to_storage(job) - raise - - except Exception as e: - # Job failed - job["status"] = JobStatus.FAILED - job["completed_at"] = datetime.now(UTC) - job["error"] = str(e) - job["result"] = {"error_details": traceback.format_exc()} - await self._save_job_to_storage(job) - - finally: - # Clean up task reference - if job_id in self._tasks: - del self._tasks[job_id] - - async def _execute_job(self, job: dict) -> dict: - """ - Execute a job based on its type. - - If a custom executor is registered for the job type, it will be called. - Otherwise, raises an error for unknown job types. - """ - job_type = job["job_type"] - job_data = job["job_data"] - - # Check if a custom executor is registered for this job type - if job_type in self._job_executors: - executor = self._job_executors[job_type] - return await executor(job_data) - - raise ValueError(f"No executor registered for job type: {job_type}") - - async def get_job_info(self, job_id: str) -> dict: - """Get complete information about a job.""" - if job_id not in self._jobs: - raise ValueError(f"Job {job_id} not found") - - job = self._jobs[job_id].copy() - - # Remove internal job_data field from response - job.pop("job_data", None) - - return job - - async def cancel_job(self, job_id: str) -> bool: - """Cancel a pending or running job.""" - if job_id not in self._jobs: - return False - - job = self._jobs[job_id] - - # Can only cancel pending or running jobs - if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: - return False - - # Cancel the task if it exists - if job_id in self._tasks: - task = self._tasks[job_id] - if not task.done(): - task.cancel() - - # Update job status - job["status"] = JobStatus.CANCELLED - job["completed_at"] = datetime.now(UTC) - await self._save_job_to_storage(job) - - return True - - async def delete_job(self, job_id: str) -> bool: - """Delete a completed or cancelled job.""" - if job_id not in self._jobs: - return False - - job = self._jobs[job_id] - - # Can only delete completed, failed, or cancelled jobs - if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: - return False - - # Remove from memory and storage - del self._jobs[job_id] - await self._delete_job_from_storage(job_id) - - return True - - async def list_jobs( - self, - job_type: str | None = None, - status: JobStatus | None = None, - limit: int = 100, - offset: int = 0, - ) -> list[dict]: - """List jobs with optional filtering.""" - jobs = list(self._jobs.values()) - - # Filter by job_type - if job_type is not None: - jobs = [j for j in jobs if j["job_type"] == job_type] - - # Filter by status - if status is not None: - jobs = [j for j in jobs if j["status"] == status] - - # Sort by created_at (newest first) - jobs.sort(key=lambda j: j["created_at"], reverse=True) - - # Apply pagination - jobs = jobs[offset : offset + limit] - - # Convert to return format (remove internal fields) - result = [] - for job in jobs: - job_copy = job.copy() - # Remove internal job_data field - job_copy.pop("job_data", None) - result.append(job_copy) - - return result diff --git a/src/llama_stack/providers/utils/job_scheduler/README.md b/src/llama_stack/providers/utils/job_scheduler/README.md new file mode 100644 index 000000000..a32248ecc --- /dev/null +++ b/src/llama_stack/providers/utils/job_scheduler/README.md @@ -0,0 +1,237 @@ +# Job Scheduler + +## Run Config → Adapter Init Flow + +``` +┌─────────────────────────────────────────────────────────┐ +│ 1. Run Config YAML │ +├─────────────────────────────────────────────────────────┤ +│ providers: │ +│ job_scheduler: │ +│ - provider_type: inline::scheduler │ +│ config: { kvstore: {...} } │ +│ vector_io: │ +│ - provider_type: inline::faiss │ +│ config: { persistence: {...} } │ +└─────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────┐ +│ 2. Stack.initialize() │ +│ → resolve_impls(run_config, ...) │ +│ → instantiate_providers(sorted_by_deps) │ +└─────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────┐ +│ 3. For each provider (in dependency order): │ +│ │ +│ A. Job Scheduler (no dependencies) │ +│ ├─ get_provider_impl(config, deps={}) │ +│ ├─ scheduler = InlineSchedulerImpl(config) │ +│ └─ await scheduler.initialize() │ +│ │ +│ B. Vector IO (depends on inference, job_scheduler) │ +│ ├─ deps = { │ +│ │ Api.inference: , │ +│ │ Api.job_scheduler: │ +│ │ } │ +│ ├─ get_provider_impl(config, deps) │ +│ │ ├─ adapter = FaissVectorIOAdapter( │ +│ │ │ config, │ +│ │ │ deps[Api.inference], │ +│ │ │ deps.get(Api.files), │ +│ │ │ deps.get(Api.job_scheduler) ← Here! │ +│ │ │ ) │ +│ │ └─ await adapter.initialize() │ +│ └─ return adapter │ +└─────────────────────────────────────────────────────────┘ + ↓ +┌─────────────────────────────────────────────────────────┐ +│ 4. Adapter.__init__(scheduler) │ +│ ├─ self.scheduler = scheduler │ +│ └─ scheduler.register_job_executor(...) │ +└─────────────────────────────────────────────────────────┘ +``` + +**Key Points:** +- **Dependency resolution**: Providers instantiated in topological order +- **Automatic injection**: Framework passes `deps[Api.job_scheduler]` +- **Registry declares it**: `optional_api_dependencies=[Api.job_scheduler]` +- **User configures it**: Run YAML specifies inline vs celery + +--- + +## Server Startup Flow + +## Two-Phase Initialization + +Separate scheduler initialization into two phases: +- **Phase 1 (`initialize`)**: Load jobs from storage, but don't resume them +- **Phase 2 (`start`)**: Resume jobs after all executors are registered + +``` +┌─────────────────────────────────────────────────────────┐ +│ Stack.initialize() │ +├─────────────────────────────────────────────────────────┤ +│ → resolve_impls() │ +│ ├─ Job Scheduler │ +│ │ └─ scheduler.initialize() ← Phase 1 │ +│ │ • Connect to KVStore │ +│ │ • Load jobs into memory │ +│ │ • DON'T resume jobs yet ❌ │ +│ │ │ +│ ├─ Vector IO Provider │ +│ │ ├─ __init__(): │ +│ │ │ └─ scheduler.register_job_executor(...) │ +│ │ └─ provider.initialize() │ +│ │ │ +│ └─ [All providers initialized, executors ✓] │ +│ │ +│ → Start schedulers │ +│ └─ scheduler.start() ← Phase 2 │ +│ └─ Resume incomplete jobs │ +│ • Executors are registered ✓ │ +│ • Jobs execute successfully ✓ │ +└─────────────────────────────────────────────────────────┘ +``` + +## Implementation + +### 1. Scheduler Protocol (`api.py`) + +```python +async def initialize(self) -> None: + """ + Initialize the scheduler (connect to backend, load persisted jobs). + + Note: Does NOT start processing jobs. Call start() after all + executors are registered. + """ + ... + + +async def start(self) -> None: + """ + Start processing jobs after all executors are registered. + + This resumes any incomplete jobs from storage. Must be called + after initialize() and after all job executors have been + registered via register_job_executor(). + """ + ... +``` + +### 2. Scheduler Implementation + +```python +class InlineSchedulerImpl(Scheduler): + async def initialize(self) -> None: + self._kvstore = await kvstore_impl(self.config.kvstore) + await self._load_jobs_from_storage() + # DON'T call _resume_incomplete_jobs() here! + + async def start(self) -> None: + """Call AFTER all executors are registered.""" + await self._resume_incomplete_jobs() +``` + +### 3. Stack Integration (`stack.py`) + +```python +async def initialize(self): + # ... existing code ... + impls = await resolve_impls(...) # All providers initialized + + # NEW: Start schedulers after all executors registered + if Api.job_scheduler in impls: + await impls[Api.job_scheduler].start() + + self.impls = impls +``` + +### 4. Provider Integration + +```python +class VectorIOAdapter: + def __init__(self, config, inference_api, files_api, scheduler): + # Register executor during __init__ (before scheduler.start()) + scheduler.register_job_executor( + "vector_store_file_batch", self._process_file_batch + ) +``` + +## Behavior + +### Case 1: Clean Start (No Jobs) +```python +await scheduler.start() +# _resume_incomplete_jobs() finds no jobs +# Loop completes, nothing happens ✓ +``` + +### Case 2: After Crash (Jobs Exist) +```python +await scheduler.start() +# _resume_incomplete_jobs() finds PENDING/RUNNING jobs +# Executors are registered ✓ +# Creates asyncio tasks successfully ✓ +# Jobs run in background ✓ +``` + + +## Usage: OpenAI Vector Store Mixin + +### Current Pattern (Direct asyncio tasks) +```python +class OpenAIVectorStoreMixin: + def __init__(self, files_api, kvstore): + self._file_batch_tasks: dict[str, asyncio.Task] = {} + + async def openai_create_vector_store_file_batch(self, params): + # Create background task directly + task = asyncio.create_task(self._process_file_batch_async(params)) + self._file_batch_tasks[batch_id] = task +``` + +### New Pattern (Job Scheduler) +```python +class OpenAIVectorStoreMixin: + def __init__(self, files_api, kvstore, scheduler): + self.scheduler = scheduler + # Register executor in __init__ (before scheduler.start()) + self.scheduler.register_job_executor( + "vector_store_file_batch", self._execute_file_batch + ) + + async def openai_create_vector_store_file_batch(self, params): + # Schedule job through scheduler + job_id = await self.scheduler.schedule_job( + job_type="vector_store_file_batch", + job_data={ + "batch_id": batch_id, + "vector_store_id": vector_store_id, + "file_ids": params.file_ids, + "attributes": params.attributes, + "chunking_strategy": chunking_strategy.model_dump(), + }, + metadata={"batch_id": batch_id}, + ) + return batch_object + + async def _execute_file_batch(self, job_data: dict) -> dict: + """Executor called by scheduler.""" + batch_id = job_data["batch_id"] + vector_store_id = job_data["vector_store_id"] + file_ids = job_data["file_ids"] + + # Process files (existing logic from _process_file_batch_async) + # await self._process_files_with_concurrency(...) + + return {"status": "completed", "files_processed": len(file_ids)} +``` + +### Benefits +- ✅ **Crash recovery**: Jobs survive server restarts +- ✅ **Persistence**: Job state stored in KVStore +- ✅ **Monitoring**: Query job status via `get_job_info(job_id)` +- ✅ **Cancellation**: Cancel jobs via `cancel_job(job_id)` +- ✅ **Clean separation**: Job scheduling decoupled from execution diff --git a/src/llama_stack/providers/utils/job_scheduler/api.py b/src/llama_stack/providers/utils/job_scheduler/api.py index e46da8fcb..023e77430 100644 --- a/src/llama_stack/providers/utils/job_scheduler/api.py +++ b/src/llama_stack/providers/utils/job_scheduler/api.py @@ -127,7 +127,20 @@ class Scheduler(Protocol): ... async def initialize(self) -> None: - """Initialize the scheduler (connect to backend, etc.)""" + """ + Initialize the scheduler (connect to backend, load persisted jobs). + + Note: Does NOT start processing jobs. Call start() after all executors are registered. + """ + ... + + async def start(self) -> None: + """ + Start processing jobs after all executors are registered. + + This resumes any incomplete jobs from storage. Must be called after initialize() + and after all job executors have been registered via register_job_executor(). + """ ... async def shutdown(self) -> None: diff --git a/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py b/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py index 0a5b6e220..6dcaa9e51 100644 --- a/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py +++ b/src/llama_stack/providers/utils/job_scheduler/celery/scheduler.py @@ -30,8 +30,6 @@ class CelerySchedulerImpl(Scheduler): def __init__(self, config: CelerySchedulerConfig): self.config = config self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} - # TODO: Initialize Celery app with broker and result backend - raise NotImplementedError("Celery scheduler not yet implemented") def register_job_executor( self, @@ -39,15 +37,19 @@ class CelerySchedulerImpl(Scheduler): executor: Callable[[dict], Awaitable[dict]], ) -> None: """Register a job executor function for a specific job type.""" - self._job_executors[job_type] = executor + raise NotImplementedError("Celery scheduler implementation is not yet available") async def initialize(self) -> None: """Initialize the Celery scheduler.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") + + async def start(self) -> None: + """Start processing jobs after all executors are registered.""" + raise NotImplementedError("Celery scheduler implementation is not yet available") async def shutdown(self) -> None: """Gracefully shutdown the Celery scheduler.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") async def schedule_job( self, @@ -56,19 +58,19 @@ class CelerySchedulerImpl(Scheduler): metadata: dict | None = None, ) -> str: """Schedule a new job for execution via Celery.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") async def get_job_info(self, job_id: str) -> dict: """Get complete information about a job from Celery result backend.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") async def cancel_job(self, job_id: str) -> bool: """Cancel a pending or running Celery job.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") async def delete_job(self, job_id: str) -> bool: """Delete a completed or cancelled job from Celery result backend.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") async def list_jobs( self, @@ -78,4 +80,4 @@ class CelerySchedulerImpl(Scheduler): offset: int = 0, ) -> list[dict]: """List jobs from Celery result backend with optional filtering.""" - raise NotImplementedError("Celery scheduler not yet implemented") + raise NotImplementedError("Celery scheduler implementation is not yet available") diff --git a/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py b/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py index 68d201604..3d8c5e744 100644 --- a/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py +++ b/src/llama_stack/providers/utils/job_scheduler/inline/scheduler.py @@ -4,23 +4,11 @@ # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. -import asyncio -import json -import traceback -import uuid from collections.abc import Awaitable, Callable -from datetime import UTC, datetime -from typing import Any - -from llama_stack.core.utils.serialize import EnumEncoder -from llama_stack.providers.utils.kvstore import kvstore_impl -from llama_stack.providers.utils.kvstore.api import KVStore from ..api import JobStatus, Scheduler from ..config import InlineSchedulerConfig -JOB_PREFIX = "job_scheduler:jobs:" - class InlineSchedulerImpl(Scheduler): """ @@ -37,25 +25,14 @@ class InlineSchedulerImpl(Scheduler): def __init__(self, config: InlineSchedulerConfig): self.config = config - self._jobs: dict[str, dict[str, Any]] = {} - self._tasks: dict[str, asyncio.Task] = {} - self._semaphore: asyncio.Semaphore - self._shutdown_event = asyncio.Event() - self._kvstore: KVStore - self._job_executors: dict[str, Callable[[dict], Awaitable[dict]]] = {} async def initialize(self) -> None: """Initialize the scheduler and load persisted jobs.""" - self._semaphore = asyncio.Semaphore(self.config.max_concurrent_jobs) + raise NotImplementedError("Inline scheduler implementation is not yet available") - # Initialize KVStore - self._kvstore = await kvstore_impl(self.config.kvstore) - - # Load persisted jobs from KVStore - await self._load_jobs_from_storage() - - # Resume incomplete jobs - await self._resume_incomplete_jobs() + async def start(self) -> None: + """Start processing jobs after all executors are registered.""" + raise NotImplementedError("Inline scheduler implementation is not yet available") def register_job_executor( self, @@ -63,64 +40,11 @@ class InlineSchedulerImpl(Scheduler): executor: Callable[[dict], Awaitable[dict]], ) -> None: """Register a job executor function for a specific job type.""" - self._job_executors[job_type] = executor - - async def _load_jobs_from_storage(self) -> None: - """Load all jobs from KVStore into memory.""" - start_key = JOB_PREFIX - end_key = f"{JOB_PREFIX}\xff" - - stored_values = await self._kvstore.values_in_range(start_key, end_key) - - for value in stored_values: - job = json.loads(value) - job["created_at"] = datetime.fromisoformat(job["created_at"]) - if job.get("started_at"): - job["started_at"] = datetime.fromisoformat(job["started_at"]) - if job.get("completed_at"): - job["completed_at"] = datetime.fromisoformat(job["completed_at"]) - job["status"] = JobStatus(job["status"]) - - self._jobs[job["job_id"]] = job - - async def _resume_incomplete_jobs(self) -> None: - """Resume jobs that were running when server crashed.""" - for job_id, job in self._jobs.items(): - if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: - # Reset running jobs to pending - if job["status"] == JobStatus.RUNNING: - job["status"] = JobStatus.PENDING - job["started_at"] = None - await self._save_job_to_storage(job) - - # Restart the job - task = asyncio.create_task(self._run_job(job_id)) - self._tasks[job_id] = task - - async def _save_job_to_storage(self, job: dict[str, Any]) -> None: - """Persist job to KVStore.""" - key = f"{JOB_PREFIX}{job['job_id']}" - await self._kvstore.set(key, json.dumps(job, cls=EnumEncoder)) - - async def _delete_job_from_storage(self, job_id: str) -> None: - """Delete job from KVStore.""" - key = f"{JOB_PREFIX}{job_id}" - await self._kvstore.delete(key) + raise NotImplementedError("Inline scheduler implementation is not yet available") async def shutdown(self) -> None: """Gracefully shutdown the scheduler.""" - self._shutdown_event.set() - - # Cancel all running tasks - for task in self._tasks.values(): - if not task.done(): - task.cancel() - - # Wait for all tasks to complete - if self._tasks: - await asyncio.gather(*self._tasks.values(), return_exceptions=True) - - self._tasks.clear() + raise NotImplementedError("Inline scheduler implementation is not yet available") async def schedule_job( self, @@ -129,143 +53,19 @@ class InlineSchedulerImpl(Scheduler): metadata: dict | None = None, ) -> str: """Schedule a new job for execution.""" - job_id = str(uuid.uuid4()) - - job_info = { - "job_id": job_id, - "job_type": job_type, - "status": JobStatus.PENDING, - "created_at": datetime.now(UTC), - "started_at": None, - "completed_at": None, - "progress": 0.0, - "metadata": metadata or {}, - "job_data": job_data, - "error": None, - "result": None, - } - - self._jobs[job_id] = job_info - - # Persist to KVStore - await self._save_job_to_storage(job_info) - - # Create and schedule the task - task = asyncio.create_task(self._run_job(job_id)) - self._tasks[job_id] = task - - return job_id - - async def _run_job(self, job_id: str) -> None: - """Run a job asynchronously.""" - job = self._jobs[job_id] - - try: - # Acquire semaphore to limit concurrent jobs - async with self._semaphore: - # Update status to RUNNING - job["status"] = JobStatus.RUNNING - job["started_at"] = datetime.now(UTC) - await self._save_job_to_storage(job) - - # Execute the job based on job_type - result = await self._execute_job(job) - - # Mark as completed - job["status"] = JobStatus.COMPLETED - job["completed_at"] = datetime.now(UTC) - job["progress"] = 1.0 - job["result"] = result - await self._save_job_to_storage(job) - - except asyncio.CancelledError: - # Job was cancelled - job["status"] = JobStatus.CANCELLED - job["completed_at"] = datetime.now(UTC) - await self._save_job_to_storage(job) - raise - - except Exception as e: - # Job failed - job["status"] = JobStatus.FAILED - job["completed_at"] = datetime.now(UTC) - job["error"] = str(e) - job["result"] = {"error_details": traceback.format_exc()} - await self._save_job_to_storage(job) - - finally: - # Clean up task reference - if job_id in self._tasks: - del self._tasks[job_id] - - async def _execute_job(self, job: dict) -> dict: - """ - Execute a job based on its type. - - If a custom executor is registered for the job type, it will be called. - Otherwise, raises an error for unknown job types. - """ - job_type = job["job_type"] - job_data = job["job_data"] - - if job_type in self._job_executors: - executor = self._job_executors[job_type] - return await executor(job_data) - - raise ValueError(f"No executor registered for job type: {job_type}") + raise NotImplementedError("Inline scheduler implementation is not yet available") async def get_job_info(self, job_id: str) -> dict: """Get complete information about a job.""" - if job_id not in self._jobs: - raise ValueError(f"Job {job_id} not found") - - job = self._jobs[job_id].copy() - - # Remove internal job_data field from response - job.pop("job_data", None) - - return job + raise NotImplementedError("Inline scheduler implementation is not yet available") async def cancel_job(self, job_id: str) -> bool: """Cancel a pending or running job.""" - if job_id not in self._jobs: - return False - - job = self._jobs[job_id] - - # Can only cancel pending or running jobs - if job["status"] in [JobStatus.COMPLETED, JobStatus.FAILED, JobStatus.CANCELLED]: - return False - - # Cancel the task if it exists - if job_id in self._tasks: - task = self._tasks[job_id] - if not task.done(): - task.cancel() - - # Update job status - job["status"] = JobStatus.CANCELLED - job["completed_at"] = datetime.now(UTC) - await self._save_job_to_storage(job) - - return True + raise NotImplementedError("Inline scheduler implementation is not yet available") async def delete_job(self, job_id: str) -> bool: """Delete a completed or cancelled job.""" - if job_id not in self._jobs: - return False - - job = self._jobs[job_id] - - # Can only delete completed, failed, or cancelled jobs - if job["status"] in [JobStatus.PENDING, JobStatus.RUNNING]: - return False - - # Remove from memory and storage - del self._jobs[job_id] - await self._delete_job_from_storage(job_id) - - return True + raise NotImplementedError("Inline scheduler implementation is not yet available") async def list_jobs( self, @@ -275,28 +75,4 @@ class InlineSchedulerImpl(Scheduler): offset: int = 0, ) -> list[dict]: """List jobs with optional filtering.""" - jobs = list(self._jobs.values()) - - # Filter by job_type - if job_type is not None: - jobs = [j for j in jobs if j["job_type"] == job_type] - - # Filter by status - if status is not None: - jobs = [j for j in jobs if j["status"] == status] - - # Sort by created_at (newest first) - jobs.sort(key=lambda j: j["created_at"], reverse=True) - - # Apply pagination - jobs = jobs[offset : offset + limit] - - # Convert to return format (remove internal fields) - result = [] - for job in jobs: - job_copy = job.copy() - # Remove internal job_data field - job_copy.pop("job_data", None) - result.append(job_copy) - - return result + raise NotImplementedError("Inline scheduler implementation is not yet available") diff --git a/tests/unit/providers/utils/test_job_scheduler.py b/tests/unit/providers/utils/test_job_scheduler.py index 3283d8514..a6833f8df 100644 --- a/tests/unit/providers/utils/test_job_scheduler.py +++ b/tests/unit/providers/utils/test_job_scheduler.py @@ -7,13 +7,13 @@ import asyncio import tempfile from pathlib import Path +from unittest.mock import AsyncMock, MagicMock import pytest from llama_stack.core.storage.datatypes import KVStoreReference, SqliteKVStoreConfig from llama_stack.providers.utils.job_scheduler import ( InlineSchedulerConfig, - JobStatus, scheduler_impl, ) from llama_stack.providers.utils.kvstore import register_kvstore_backends @@ -43,238 +43,72 @@ def scheduler_config(): ) -async def test_inline_scheduler_basic(scheduler_config): - """Test basic scheduler functionality.""" +async def test_scheduler_api_exists(scheduler_config): + """Test that scheduler API is properly defined.""" scheduler = await scheduler_impl(scheduler_config) - try: - # Register default executor + # Verify all required methods exist + assert hasattr(scheduler, "initialize") + assert hasattr(scheduler, "start") + assert hasattr(scheduler, "shutdown") + assert hasattr(scheduler, "register_job_executor") + assert hasattr(scheduler, "schedule_job") + assert hasattr(scheduler, "get_job_info") + assert hasattr(scheduler, "cancel_job") + assert hasattr(scheduler, "delete_job") + assert hasattr(scheduler, "list_jobs") + + +async def test_scheduler_not_implemented(scheduler_config): + """Test that scheduler methods raise NotImplementedError.""" + scheduler = await scheduler_impl(scheduler_config) + + # Test that all methods raise NotImplementedError + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.initialize() + + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.start() + + with pytest.raises(NotImplementedError, match="not yet available"): scheduler.register_job_executor("test_job", default_test_executor) - # Schedule a job - job_id = await scheduler.schedule_job( - job_type="test_job", - job_data={"test": "data"}, - metadata={"user": "test_user"}, - ) + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.schedule_job("test_job", {}) - assert job_id is not None - assert isinstance(job_id, str) + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.get_job_info("job_id") - # Wait a bit for the job to complete - await asyncio.sleep(0.2) + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.cancel_job("job_id") - # Get job info - job_info = await scheduler.get_job_info(job_id) - assert job_info["job_id"] == job_id - assert job_info["job_type"] == "test_job" - assert job_info["status"] == JobStatus.COMPLETED.value - assert job_info["metadata"]["user"] == "test_user" - assert job_info["progress"] == 1.0 - assert job_info["result"] is not None + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.delete_job("job_id") - finally: + with pytest.raises(NotImplementedError, match="not yet available"): + await scheduler.list_jobs() + + with pytest.raises(NotImplementedError, match="not yet available"): await scheduler.shutdown() -async def test_inline_scheduler_list_jobs(scheduler_config): - """Test listing jobs with filters.""" +async def test_two_phase_initialization_pattern(scheduler_config): + """Test that the two-phase initialization pattern is supported.""" scheduler = await scheduler_impl(scheduler_config) - try: - # Register executors for different job types - scheduler.register_job_executor("batch_processing", default_test_executor) - scheduler.register_job_executor("log_aggregation", default_test_executor) + # Mock the methods to test the pattern + scheduler.initialize = AsyncMock() + scheduler.start = AsyncMock() + scheduler.register_job_executor = MagicMock() - # Schedule multiple jobs - await scheduler.schedule_job( - job_type="batch_processing", - job_data={"batch": 1}, - ) - await scheduler.schedule_job( - job_type="log_aggregation", - job_data={"logs": []}, - ) - await scheduler.schedule_job( - job_type="batch_processing", - job_data={"batch": 2}, - ) + # Phase 1: Initialize (loads jobs, doesn't start them) + await scheduler.initialize() + scheduler.initialize.assert_called_once() - # Wait for jobs to complete - await asyncio.sleep(0.3) + # Register executors after initialization + scheduler.register_job_executor("test_job", default_test_executor) + scheduler.register_job_executor.assert_called_once() - # List all jobs - all_jobs = await scheduler.list_jobs() - assert len(all_jobs) == 3 - - # List jobs by type - batch_jobs = await scheduler.list_jobs(job_type="batch_processing") - assert len(batch_jobs) == 2 - - log_jobs = await scheduler.list_jobs(job_type="log_aggregation") - assert len(log_jobs) == 1 - - # List jobs by status - completed_jobs = await scheduler.list_jobs(status=JobStatus.COMPLETED) - assert len(completed_jobs) == 3 - - finally: - await scheduler.shutdown() - - -async def test_inline_scheduler_cancel_job(scheduler_config): - """Test cancelling a job.""" - scheduler = await scheduler_impl(scheduler_config) - - try: - # Register executor - scheduler.register_job_executor("long_running_job", default_test_executor) - - # Schedule a job - job_id = await scheduler.schedule_job( - job_type="long_running_job", - job_data={"duration": 10}, - ) - - # Try to cancel immediately - await scheduler.cancel_job(job_id) - - # Wait a bit - await asyncio.sleep(0.1) - - # Check job status - job_info = await scheduler.get_job_info(job_id) - # Job might be CANCELLED or COMPLETED depending on timing - assert job_info["status"] in [JobStatus.CANCELLED.value, JobStatus.COMPLETED.value] - - finally: - await scheduler.shutdown() - - -async def test_inline_scheduler_delete_job(scheduler_config): - """Test deleting a completed job.""" - scheduler = await scheduler_impl(scheduler_config) - - try: - # Register executor - scheduler.register_job_executor("test_job", default_test_executor) - - # Schedule a job - job_id = await scheduler.schedule_job( - job_type="test_job", - job_data={"test": "data"}, - ) - - # Wait for completion - await asyncio.sleep(0.2) - - # Verify job exists - job_info = await scheduler.get_job_info(job_id) - assert job_info["status"] == JobStatus.COMPLETED.value - - # Delete the job - success = await scheduler.delete_job(job_id) - assert success is True - - # Verify job is deleted - with pytest.raises(ValueError, match="not found"): - await scheduler.get_job_info(job_id) - - # Deleting again should return False - success = await scheduler.delete_job(job_id) - assert success is False - - finally: - await scheduler.shutdown() - - -async def test_inline_scheduler_concurrent_jobs(scheduler_config): - """Test running multiple jobs concurrently.""" - scheduler = await scheduler_impl(scheduler_config) - - try: - # Register executor - scheduler.register_job_executor("test_job", default_test_executor) - - # Schedule multiple jobs - job_ids = [] - for i in range(5): - job_id = await scheduler.schedule_job( - job_type="test_job", - job_data={"index": i}, - ) - job_ids.append(job_id) - - # Wait for all jobs to complete - await asyncio.sleep(0.5) - - # Verify all jobs completed - for job_id in job_ids: - job_info = await scheduler.get_job_info(job_id) - assert job_info["status"] == JobStatus.COMPLETED.value - - finally: - await scheduler.shutdown() - - -async def test_inline_scheduler_pagination(scheduler_config): - """Test job listing with pagination.""" - scheduler = await scheduler_impl(scheduler_config) - - try: - # Register executor - scheduler.register_job_executor("test_job", default_test_executor) - - # Schedule 10 jobs - for i in range(10): - await scheduler.schedule_job( - job_type="test_job", - job_data={"index": i}, - ) - - # Wait for completion - await asyncio.sleep(0.5) - - # Test pagination - page1 = await scheduler.list_jobs(limit=5, offset=0) - assert len(page1) == 5 - - page2 = await scheduler.list_jobs(limit=5, offset=5) - assert len(page2) == 5 - - page3 = await scheduler.list_jobs(limit=5, offset=10) - assert len(page3) == 0 - - finally: - await scheduler.shutdown() - - -async def test_inline_scheduler_register_job_executor(scheduler_config): - """Test registering a job executor.""" - scheduler = await scheduler_impl(scheduler_config) - - try: - # Define a custom job executor - async def custom_executor(job_data: dict) -> dict: - return {"custom": "result", "input": job_data} - - # Register the executor - scheduler.register_job_executor("custom_job_type", custom_executor) - - # Schedule a job with the custom type - job_id = await scheduler.schedule_job( - job_type="custom_job_type", - job_data={"test": "data"}, - ) - - # Wait for job to complete - await asyncio.sleep(0.2) - - # Verify the custom executor was called - job_info = await scheduler.get_job_info(job_id) - assert job_info["status"] == JobStatus.COMPLETED.value - assert job_info["result"]["custom"] == "result" - assert job_info["result"]["input"]["test"] == "data" - - finally: - await scheduler.shutdown() + # Phase 2: Start (resumes jobs after executors registered) + await scheduler.start() + scheduler.start.assert_called_once() From e6c61cf4519d5c8f48329868f7e423026e9a81c7 Mon Sep 17 00:00:00 2001 From: Swapna Lekkala Date: Tue, 4 Nov 2025 10:52:43 -0800 Subject: [PATCH 05/10] add more to readme.md --- .../providers/utils/job_scheduler/README.md | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/src/llama_stack/providers/utils/job_scheduler/README.md b/src/llama_stack/providers/utils/job_scheduler/README.md index a32248ecc..9afbec338 100644 --- a/src/llama_stack/providers/utils/job_scheduler/README.md +++ b/src/llama_stack/providers/utils/job_scheduler/README.md @@ -235,3 +235,36 @@ class OpenAIVectorStoreMixin: - ✅ **Monitoring**: Query job status via `get_job_info(job_id)` - ✅ **Cancellation**: Cancel jobs via `cancel_job(job_id)` - ✅ **Clean separation**: Job scheduling decoupled from execution + +## Single Worker vs Multi Worker + + ✅ Single Worker (Inline Scheduler - Not Implemented Yet) +``` + providers: + job_scheduler: + - provider_type: inline::scheduler + config: + kvstore: { ... } + max_concurrent_jobs: 10 + + Works because: + - Jobs run in the same process via asyncio.create_task() + - In-memory _jobs dict is shared within the process + - Crash recovery works (jobs persist to KVStore) +``` + + --- + ✅ Multi Worker (Celery Scheduler - Not Implemented Yet) +``` + providers: + job_scheduler: + - provider_type: celery::scheduler + config: + broker_url: redis://localhost:6379/0 + result_backend: redis://localhost:6379/1 +``` + Works because: + - Shared message broker (Redis/RabbitMQ) + - Celery handles distributed task queue + - Workers coordinate via broker + - Any worker can execute any job From d0796388e705bde911069fabd7f24f7c7dc0e8f7 Mon Sep 17 00:00:00 2001 From: Swapna Lekkala Date: Tue, 4 Nov 2025 10:59:03 -0800 Subject: [PATCH 06/10] clean --- docs/docs/providers/eval/index.mdx | 4 ++++ .../providers/utils/job_scheduler/README.md | 13 ++++++------- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/docs/docs/providers/eval/index.mdx b/docs/docs/providers/eval/index.mdx index 3543db246..e413a4394 100644 --- a/docs/docs/providers/eval/index.mdx +++ b/docs/docs/providers/eval/index.mdx @@ -2,7 +2,11 @@ description: | Evaluations +<<<<<<< HEAD Llama Stack Evaluation API for running evaluations on model and agent candidates. +======= + Llama Stack Evaluation API for running evaluations on model and agent candidates." +>>>>>>> eb10a349 (clean) sidebar_label: Eval title: Eval --- diff --git a/src/llama_stack/providers/utils/job_scheduler/README.md b/src/llama_stack/providers/utils/job_scheduler/README.md index 9afbec338..068ad1023 100644 --- a/src/llama_stack/providers/utils/job_scheduler/README.md +++ b/src/llama_stack/providers/utils/job_scheduler/README.md @@ -62,7 +62,7 @@ ## Server Startup Flow -## Two-Phase Initialization +### Two-Phase Initialization Separate scheduler initialization into two phases: - **Phase 1 (`initialize`)**: Load jobs from storage, but don't resume them @@ -159,7 +159,7 @@ class VectorIOAdapter: ) ``` -## Behavior +### Behavior ### Case 1: Clean Start (No Jobs) ```python @@ -246,12 +246,11 @@ class OpenAIVectorStoreMixin: config: kvstore: { ... } max_concurrent_jobs: 10 - - Works because: - - Jobs run in the same process via asyncio.create_task() - - In-memory _jobs dict is shared within the process - - Crash recovery works (jobs persist to KVStore) ``` +Works because: +- Jobs run in the same process via asyncio.create_task() +- In-memory _jobs dict is shared within the process +- Crash recovery works (jobs persist to KVStore) --- ✅ Multi Worker (Celery Scheduler - Not Implemented Yet) From 97ff9b5ef856daeaa86862c79789768242be9ca5 Mon Sep 17 00:00:00 2001 From: Swapna Lekkala Date: Tue, 4 Nov 2025 11:14:20 -0800 Subject: [PATCH 07/10] fix uts --- docs/docs/providers/eval/index.mdx | 6 +++- .../providers/utils/job_scheduler/README.md | 2 +- .../providers/utils/job_scheduler/__init__.py | 31 ------------------- .../providers/utils/test_job_scheduler.py | 17 +++++----- 4 files changed, 16 insertions(+), 40 deletions(-) diff --git a/docs/docs/providers/eval/index.mdx b/docs/docs/providers/eval/index.mdx index e413a4394..0e58b8064 100644 --- a/docs/docs/providers/eval/index.mdx +++ b/docs/docs/providers/eval/index.mdx @@ -2,11 +2,15 @@ description: | Evaluations +<<<<<<< HEAD <<<<<<< HEAD Llama Stack Evaluation API for running evaluations on model and agent candidates. ======= Llama Stack Evaluation API for running evaluations on model and agent candidates." >>>>>>> eb10a349 (clean) +======= +Llama Stack Evaluation API for running evaluations on model and agent candidates." +>>>>>>> 94479abf (fix uts) sidebar_label: Eval title: Eval --- @@ -17,6 +21,6 @@ title: Eval Evaluations - Llama Stack Evaluation API for running evaluations on model and agent candidates. +Llama Stack Evaluation API for running evaluations on model and agent candidates. This section contains documentation for all available providers for the **eval** API. diff --git a/src/llama_stack/providers/utils/job_scheduler/README.md b/src/llama_stack/providers/utils/job_scheduler/README.md index 068ad1023..7b80a2c47 100644 --- a/src/llama_stack/providers/utils/job_scheduler/README.md +++ b/src/llama_stack/providers/utils/job_scheduler/README.md @@ -32,7 +32,7 @@ │ B. Vector IO (depends on inference, job_scheduler) │ │ ├─ deps = { │ │ │ Api.inference: , │ -│ │ Api.job_scheduler: │ +│ │ Api.job_scheduler: │ │ │ } │ │ ├─ get_provider_impl(config, deps) │ │ │ ├─ adapter = FaissVectorIOAdapter( │ diff --git a/src/llama_stack/providers/utils/job_scheduler/__init__.py b/src/llama_stack/providers/utils/job_scheduler/__init__.py index e09d9f45d..f86019f48 100644 --- a/src/llama_stack/providers/utils/job_scheduler/__init__.py +++ b/src/llama_stack/providers/utils/job_scheduler/__init__.py @@ -7,41 +7,10 @@ from .api import JobStatus, Scheduler from .config import CelerySchedulerConfig, InlineSchedulerConfig, SchedulerConfig - -async def scheduler_impl(config: SchedulerConfig) -> Scheduler: - """ - Factory function to instantiate scheduler implementations. - - Args: - config: Scheduler configuration (InlineSchedulerConfig or CelerySchedulerConfig) - - Returns: - Scheduler: An initialized scheduler instance - - Raises: - ValueError: If the config type is unknown - """ - impl: Scheduler - if isinstance(config, InlineSchedulerConfig): - from .inline import InlineSchedulerImpl - - impl = InlineSchedulerImpl(config) - elif isinstance(config, CelerySchedulerConfig): - from .celery import CelerySchedulerImpl - - impl = CelerySchedulerImpl(config) - else: - raise ValueError(f"Unknown scheduler config type: {type(config)}") - - await impl.initialize() - return impl - - __all__ = [ "JobStatus", "Scheduler", "SchedulerConfig", "InlineSchedulerConfig", "CelerySchedulerConfig", - "scheduler_impl", ] diff --git a/tests/unit/providers/utils/test_job_scheduler.py b/tests/unit/providers/utils/test_job_scheduler.py index a6833f8df..9b63de5d6 100644 --- a/tests/unit/providers/utils/test_job_scheduler.py +++ b/tests/unit/providers/utils/test_job_scheduler.py @@ -12,10 +12,7 @@ from unittest.mock import AsyncMock, MagicMock import pytest from llama_stack.core.storage.datatypes import KVStoreReference, SqliteKVStoreConfig -from llama_stack.providers.utils.job_scheduler import ( - InlineSchedulerConfig, - scheduler_impl, -) +from llama_stack.providers.utils.job_scheduler import InlineSchedulerConfig from llama_stack.providers.utils.kvstore import register_kvstore_backends @@ -45,7 +42,9 @@ def scheduler_config(): async def test_scheduler_api_exists(scheduler_config): """Test that scheduler API is properly defined.""" - scheduler = await scheduler_impl(scheduler_config) + from llama_stack.providers.utils.job_scheduler.inline import InlineSchedulerImpl + + scheduler = InlineSchedulerImpl(scheduler_config) # Verify all required methods exist assert hasattr(scheduler, "initialize") @@ -61,7 +60,9 @@ async def test_scheduler_api_exists(scheduler_config): async def test_scheduler_not_implemented(scheduler_config): """Test that scheduler methods raise NotImplementedError.""" - scheduler = await scheduler_impl(scheduler_config) + from llama_stack.providers.utils.job_scheduler.inline import InlineSchedulerImpl + + scheduler = InlineSchedulerImpl(scheduler_config) # Test that all methods raise NotImplementedError with pytest.raises(NotImplementedError, match="not yet available"): @@ -94,7 +95,9 @@ async def test_scheduler_not_implemented(scheduler_config): async def test_two_phase_initialization_pattern(scheduler_config): """Test that the two-phase initialization pattern is supported.""" - scheduler = await scheduler_impl(scheduler_config) + from llama_stack.providers.utils.job_scheduler.inline import InlineSchedulerImpl + + scheduler = InlineSchedulerImpl(scheduler_config) # Mock the methods to test the pattern scheduler.initialize = AsyncMock() From 55a01e14794590d706a49a583994717defdcd64e Mon Sep 17 00:00:00 2001 From: Swapna Lekkala Date: Tue, 4 Nov 2025 11:15:30 -0800 Subject: [PATCH 08/10] clean --- docs/docs/providers/eval/index.mdx | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/docs/providers/eval/index.mdx b/docs/docs/providers/eval/index.mdx index 0e58b8064..544a621e4 100644 --- a/docs/docs/providers/eval/index.mdx +++ b/docs/docs/providers/eval/index.mdx @@ -2,6 +2,7 @@ description: | Evaluations +<<<<<<< HEAD <<<<<<< HEAD <<<<<<< HEAD Llama Stack Evaluation API for running evaluations on model and agent candidates. @@ -11,6 +12,9 @@ description: | ======= Llama Stack Evaluation API for running evaluations on model and agent candidates." >>>>>>> 94479abf (fix uts) +======= + Llama Stack Evaluation API for running evaluations on model and agent candidates." +>>>>>>> e0c3381a (clean) sidebar_label: Eval title: Eval --- @@ -21,6 +25,6 @@ title: Eval Evaluations -Llama Stack Evaluation API for running evaluations on model and agent candidates. + Llama Stack Evaluation API for running evaluations on model and agent candidates. This section contains documentation for all available providers for the **eval** API. From c26510ea9a5f17ae76cdc1733d1317f4157c7eab Mon Sep 17 00:00:00 2001 From: Swapna Lekkala Date: Mon, 17 Nov 2025 15:51:36 -0800 Subject: [PATCH 09/10] clean --- docs/docs/providers/eval/index.mdx | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/docs/docs/providers/eval/index.mdx b/docs/docs/providers/eval/index.mdx index 544a621e4..3543db246 100644 --- a/docs/docs/providers/eval/index.mdx +++ b/docs/docs/providers/eval/index.mdx @@ -2,19 +2,7 @@ description: | Evaluations -<<<<<<< HEAD -<<<<<<< HEAD -<<<<<<< HEAD Llama Stack Evaluation API for running evaluations on model and agent candidates. -======= - Llama Stack Evaluation API for running evaluations on model and agent candidates." ->>>>>>> eb10a349 (clean) -======= -Llama Stack Evaluation API for running evaluations on model and agent candidates." ->>>>>>> 94479abf (fix uts) -======= - Llama Stack Evaluation API for running evaluations on model and agent candidates." ->>>>>>> e0c3381a (clean) sidebar_label: Eval title: Eval --- From 773b2c1c297c7293c1036508f7e0a032d0391928 Mon Sep 17 00:00:00 2001 From: Swapna Lekkala Date: Mon, 17 Nov 2025 16:01:40 -0800 Subject: [PATCH 10/10] update job status --- src/llama_stack/providers/utils/job_scheduler/api.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/llama_stack/providers/utils/job_scheduler/api.py b/src/llama_stack/providers/utils/job_scheduler/api.py index 023e77430..5b3e289e1 100644 --- a/src/llama_stack/providers/utils/job_scheduler/api.py +++ b/src/llama_stack/providers/utils/job_scheduler/api.py @@ -10,10 +10,13 @@ from typing import Protocol class JobStatus(StrEnum): - PENDING = "pending" - RUNNING = "running" - COMPLETED = "completed" + VALIDATING = "validating" FAILED = "failed" + IN_PROGRESS = "in_progress" + FINALIZING = "finalizing" + COMPLETED = "completed" + EXPIRED = "expired" + CANCELLING = "cancelling" CANCELLED = "cancelled"