feat: add batches API with OpenAI compatibility

Add complete batches API implementation with protocol, providers, and tests:

Core Infrastructure:
- Add batches API protocol using OpenAI Batch types directly
- Add Api.batches enum value and protocol mapping in resolver
- Add OpenAI "batch" file purpose support
- Include proper error handling (ConflictError, ResourceNotFoundError)

Reference Provider:
- Add ReferenceBatchesImpl with full CRUD operations (create, retrieve, cancel, list)
- Implement background batch processing with configurable concurrency
- Add SQLite KVStore backend for persistence
- Support /v1/chat/completions endpoint with request validation

Comprehensive Test Suite:
- Add unit tests for provider implementation with validation
- Add integration tests for end-to-end batch processing workflows
- Add error handling tests for validation, malformed inputs, and edge cases

Configuration:
- Add max_concurrent_batches and max_concurrent_requests_per_batch options
- Add provider documentation with sample configurations

Test with -

```
$ uv run llama stack build --image-type venv --providers inference=YOU_PICK,files=inline::localfs,batches=inline::reference --run &
$ LLAMA_STACK_CONFIG=http://localhost:8321 uv run pytest tests/unit/providers/batches tests/integration/batches --text-model YOU_PICK
```
This commit is contained in:
Matthew Farrellee 2025-08-08 08:08:08 -04:00
parent 1677d6bffd
commit 8e678912ec
21 changed files with 2664 additions and 2 deletions

View file

@ -0,0 +1,5 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

View file

@ -0,0 +1,36 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Any
from llama_stack.apis.files import Files
from llama_stack.apis.inference import Inference
from llama_stack.apis.models import Models
from llama_stack.core.datatypes import AccessRule, Api
from llama_stack.providers.utils.kvstore import kvstore_impl
from .batches import ReferenceBatchesImpl
from .config import ReferenceBatchesImplConfig
__all__ = ["ReferenceBatchesImpl", "ReferenceBatchesImplConfig"]
async def get_provider_impl(config: ReferenceBatchesImplConfig, deps: dict[Api, Any], policy: list[AccessRule]):
kvstore = await kvstore_impl(config.kvstore)
inference_api: Inference | None = deps.get(Api.inference)
files_api: Files | None = deps.get(Api.files)
models_api: Models | None = deps.get(Api.models)
if inference_api is None:
raise ValueError("Inference API is required but not provided in dependencies")
if files_api is None:
raise ValueError("Files API is required but not provided in dependencies")
if models_api is None:
raise ValueError("Models API is required but not provided in dependencies")
impl = ReferenceBatchesImpl(config, inference_api, files_api, models_api, kvstore)
await impl.initialize()
return impl

View file

@ -0,0 +1,553 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
import itertools
import json
import time
import uuid
from io import BytesIO
from typing import Any
from openai.types.batch import BatchError, Errors
from pydantic import BaseModel
from llama_stack.apis.batches import Batches, BatchObject, ListBatchesResponse
from llama_stack.apis.common.errors import ConflictError, ResourceNotFoundError
from llama_stack.apis.files import Files, OpenAIFilePurpose
from llama_stack.apis.inference import Inference
from llama_stack.apis.models import Models
from llama_stack.log import get_logger
from llama_stack.providers.utils.kvstore import KVStore
from .config import ReferenceBatchesImplConfig
BATCH_PREFIX = "batch:"
logger = get_logger(__name__)
class AsyncBytesIO:
"""
Async-compatible BytesIO wrapper to allow async file-like operations.
We use this when uploading files to the Files API, as it expects an
async file-like object.
"""
def __init__(self, data: bytes):
self._buffer = BytesIO(data)
async def read(self, n=-1):
return self._buffer.read(n)
async def seek(self, pos, whence=0):
return self._buffer.seek(pos, whence)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._buffer.close()
def __getattr__(self, name):
return getattr(self._buffer, name)
class BatchRequest(BaseModel):
line_num: int
custom_id: str
method: str
url: str
body: dict[str, Any]
class ReferenceBatchesImpl(Batches):
"""Reference implementation of the Batches API.
This implementation processes batch files by making individual requests
to the inference API and generates output files with results.
"""
def __init__(
self,
config: ReferenceBatchesImplConfig,
inference_api: Inference,
files_api: Files,
models_api: Models,
kvstore: KVStore,
) -> None:
self.config = config
self.kvstore = kvstore
self.inference_api = inference_api
self.files_api = files_api
self.models_api = models_api
self._processing_tasks: dict[str, asyncio.Task] = {}
self._batch_semaphore = asyncio.Semaphore(config.max_concurrent_batches)
self._update_batch_lock = asyncio.Lock()
# this is to allow tests to disable background processing
self.process_batches = True
async def initialize(self) -> None:
# TODO: start background processing of existing tasks
pass
async def shutdown(self) -> None:
"""Shutdown the batches provider."""
if self._processing_tasks:
# don't cancel tasks - just let them stop naturally on shutdown
# cancelling would mark batches as "cancelled" in the database
logger.info(f"Shutdown initiated with {len(self._processing_tasks)} active batch processing tasks")
# TODO (SECURITY): this currently works w/ configured api keys, not with x-llamastack-provider-data or with user policy restrictions
async def create_batch(
self,
input_file_id: str,
endpoint: str,
completion_window: str,
metadata: dict[str, str] | None = None,
) -> BatchObject:
"""
Create a new batch for processing multiple API requests.
Error handling by levels -
0. Input param handling, results in 40x errors before processing, e.g.
- Wrong completion_window
- Invalid metadata types
- Unknown endpoint
-> no batch created
1. Errors preventing processing, result in BatchErrors aggregated in process_batch, e.g.
- input_file_id missing
- invalid json in file
- missing custom_id, method, url, body
- invalid model
- streaming
-> batch created, validation sends to failed status
2. Processing errors, result in error_file_id entries, e.g.
- Any error returned from inference endpoint
-> batch created, goes to completed status
"""
# TODO: set expiration time for garbage collection
if endpoint not in ["/v1/chat/completions"]:
raise ValueError(
f"Invalid endpoint: {endpoint}. Supported values: /v1/chat/completions. Code: invalid_value. Param: endpoint",
)
if completion_window != "24h":
raise ValueError(
f"Invalid completion_window: {completion_window}. Supported values are: 24h. Code: invalid_value. Param: completion_window",
)
batch_id = f"batch_{uuid.uuid4().hex[:16]}"
current_time = int(time.time())
batch = BatchObject(
id=batch_id,
object="batch",
endpoint=endpoint,
input_file_id=input_file_id,
completion_window=completion_window,
status="validating",
created_at=current_time,
metadata=metadata,
)
await self.kvstore.set(f"batch:{batch_id}", batch.to_json())
if self.process_batches:
task = asyncio.create_task(self._process_batch(batch_id))
self._processing_tasks[batch_id] = task
return batch
async def cancel_batch(self, batch_id: str) -> BatchObject:
"""Cancel a batch that is in progress."""
batch = await self.retrieve_batch(batch_id)
if batch.status in ["cancelled", "cancelling"]:
return batch
if batch.status in ["completed", "failed", "expired"]:
raise ConflictError(f"Cannot cancel batch '{batch_id}' with status '{batch.status}'")
await self._update_batch(batch_id, status="cancelling", cancelling_at=int(time.time()))
if batch_id in self._processing_tasks:
self._processing_tasks[batch_id].cancel()
# note: task removal and status="cancelled" handled in finally block of _process_batch
return await self.retrieve_batch(batch_id)
async def list_batches(
self,
after: str | None = None,
limit: int = 20,
) -> ListBatchesResponse:
"""
List all batches, eventually only for the current user.
With no notion of user, we return all batches.
"""
batch_values = await self.kvstore.values_in_range("batch:", "batch:\xff")
batches = []
for batch_data in batch_values:
if batch_data:
batches.append(BatchObject.model_validate_json(batch_data))
batches.sort(key=lambda b: b.created_at, reverse=True)
start_idx = 0
if after:
for i, batch in enumerate(batches):
if batch.id == after:
start_idx = i + 1
break
page_batches = batches[start_idx : start_idx + limit]
has_more = (start_idx + limit) < len(batches)
first_id = page_batches[0].id if page_batches else None
last_id = page_batches[-1].id if page_batches else None
return ListBatchesResponse(
data=page_batches,
first_id=first_id,
last_id=last_id,
has_more=has_more,
)
async def retrieve_batch(self, batch_id: str) -> BatchObject:
"""Retrieve information about a specific batch."""
batch_data = await self.kvstore.get(f"batch:{batch_id}")
if not batch_data:
raise ResourceNotFoundError(batch_id, "Batch", "batches.list()")
return BatchObject.model_validate_json(batch_data)
async def _update_batch(self, batch_id: str, **updates) -> None:
"""Update batch fields in kvstore."""
async with self._update_batch_lock:
try:
batch = await self.retrieve_batch(batch_id)
# batch processing is async. once cancelling, only allow "cancelled" status updates
if batch.status == "cancelling" and updates.get("status") != "cancelled":
logger.info(
f"Skipping status update for cancelled batch {batch_id}: attempted {updates.get('status')}"
)
return
if "errors" in updates:
updates["errors"] = updates["errors"].model_dump()
batch_dict = batch.model_dump()
batch_dict.update(updates)
await self.kvstore.set(f"batch:{batch_id}", json.dumps(batch_dict))
except Exception as e:
logger.error(f"Failed to update batch {batch_id}: {e}")
async def _validate_input(self, batch: BatchObject) -> tuple[list[BatchError], list[BatchRequest]]:
"""
Read & validate input, return errors and valid input.
Validation of
- input_file_id existance
- valid json
- custom_id, method, url, body presence and valid
- no streaming
"""
requests: list[BatchRequest] = []
errors: list[BatchError] = []
try:
await self.files_api.openai_retrieve_file(batch.input_file_id)
except Exception:
errors.append(
BatchError(
code="invalid_request",
line=None,
message=f"Cannot find file {batch.input_file_id}.",
param="input_file_id",
)
)
return errors, requests
# TODO(SECURITY): do something about large files
file_content_response = await self.files_api.openai_retrieve_file_content(batch.input_file_id)
file_content = file_content_response.body.decode("utf-8")
for line_num, line in enumerate(file_content.strip().split("\n"), 1):
if line.strip(): # skip empty lines
try:
request = json.loads(line)
if not isinstance(request, dict):
errors.append(
BatchError(
code="invalid_request",
line=line_num,
message="Each line must be a JSON dictionary object",
)
)
continue
valid = True
for param, expected_type, type_string in [
("custom_id", str, "string"),
("method", str, "string"),
("url", str, "string"),
("body", dict, "JSON dictionary object"),
]:
if param not in request:
errors.append(
BatchError(
code="missing_required_parameter",
line=line_num,
message=f"Missing required parameter: {param}",
param=param,
)
)
valid = False
elif not isinstance(request[param], expected_type):
param_name = "URL" if param == "url" else param.capitalize()
errors.append(
BatchError(
code="invalid_request",
line=line_num,
message=f"{param_name} must be a {type_string}",
param=param,
)
)
valid = False
if (url := request.get("url")) and isinstance(url, str) and url != batch.endpoint:
errors.append(
BatchError(
code="invalid_url",
line=line_num,
message="URL provided for this request does not match the batch endpoint",
param="url",
)
)
valid = False
if (body := request.get("body")) and isinstance(body, dict):
if body.get("stream", False):
errors.append(
BatchError(
code="streaming_unsupported",
line=line_num,
message="Streaming is not supported in batch processing",
param="body.stream",
)
)
valid = False
for param, expected_type, type_string in [
("model", str, "a string"),
# messages is specific to /v1/chat/completions
# we could skip validating messages here and let inference fail. however,
# that would be a very expensive way to find out messages is wrong.
("messages", list, "an array"), # TODO: allow messages to be a string?
]:
if param not in body:
errors.append(
BatchError(
code="invalid_request",
line=line_num,
message=f"{param.capitalize()} parameter is required",
param=f"body.{param}",
)
)
valid = False
elif not isinstance(body[param], expected_type):
errors.append(
BatchError(
code="invalid_request",
line=line_num,
message=f"{param.capitalize()} must be {type_string}",
param=f"body.{param}",
)
)
valid = False
if "model" in body and isinstance(body["model"], str):
try:
await self.models_api.get_model(body["model"])
except Exception:
errors.append(
BatchError(
code="model_not_found",
line=line_num,
message=f"Model '{body['model']}' does not exist or is not supported",
param="body.model",
)
)
valid = False
if valid:
assert isinstance(url, str), "URL must be a string" # for mypy
assert isinstance(body, dict), "Body must be a dictionary" # for mypy
requests.append(
BatchRequest(
line_num=line_num,
url=url,
method=request["method"],
custom_id=request["custom_id"],
body=body,
),
)
except json.JSONDecodeError:
errors.append(
BatchError(
code="invalid_json_line",
line=line_num,
message="This line is not parseable as valid JSON.",
)
)
return errors, requests
async def _process_batch(self, batch_id: str) -> None:
"""Background task to process a batch of requests."""
try:
logger.info(f"Starting batch processing for {batch_id}")
async with self._batch_semaphore: # semaphore to limit concurrency
logger.info(f"Acquired semaphore for batch {batch_id}")
await self._process_batch_impl(batch_id)
except asyncio.CancelledError:
logger.info(f"Batch processing cancelled for {batch_id}")
await self._update_batch(batch_id, status="cancelled", cancelled_at=int(time.time()))
except Exception as e:
logger.error(f"Batch processing failed for {batch_id}: {e}")
await self._update_batch(
batch_id,
status="failed",
failed_at=int(time.time()),
errors=Errors(data=[BatchError(code="internal_error", message=str(e))]),
)
finally:
self._processing_tasks.pop(batch_id, None)
async def _process_batch_impl(self, batch_id: str) -> None:
"""Implementation of batch processing logic."""
errors: list[BatchError] = []
batch = await self.retrieve_batch(batch_id)
errors, requests = await self._validate_input(batch)
if errors:
await self._update_batch(batch_id, status="failed", failed_at=int(time.time()), errors=Errors(data=errors))
logger.info(f"Batch validation failed for {batch_id} with {len(errors)} errors")
return
logger.info(f"Processing {len(requests)} requests for batch {batch_id}")
total_requests = len(requests)
await self._update_batch(
batch_id,
status="in_progress",
request_counts={"total": total_requests, "completed": 0, "failed": 0},
)
error_results = []
success_results = []
completed_count = 0
failed_count = 0
for chunk in itertools.batched(requests, self.config.max_concurrent_requests_per_batch):
# we use a TaskGroup to ensure all process-single-request tasks are canceled when process-batch is cancelled
async with asyncio.TaskGroup() as tg:
chunk_tasks = [tg.create_task(self._process_single_request(batch_id, request)) for request in chunk]
chunk_results = await asyncio.gather(*chunk_tasks, return_exceptions=True)
for result in chunk_results:
if isinstance(result, dict) and result.get("error") is not None: # error response from inference
failed_count += 1
error_results.append(result)
elif isinstance(result, dict) and result.get("response") is not None: # successful inference
completed_count += 1
success_results.append(result)
else: # unexpected result
failed_count += 1
errors.append(BatchError(code="internal_error", message=f"Unexpected result: {result}"))
await self._update_batch(
batch_id,
request_counts={"total": total_requests, "completed": completed_count, "failed": failed_count},
)
if errors:
await self._update_batch(
batch_id, status="failed", failed_at=int(time.time()), errors=Errors(data=errors)
)
return
try:
output_file_id = await self._create_output_file(batch_id, success_results, "success")
await self._update_batch(batch_id, output_file_id=output_file_id)
error_file_id = await self._create_output_file(batch_id, error_results, "error")
await self._update_batch(batch_id, error_file_id=error_file_id)
await self._update_batch(batch_id, status="completed", completed_at=int(time.time()))
logger.info(
f"Batch processing completed for {batch_id}: {completed_count} completed, {failed_count} failed"
)
except Exception as e:
# note: errors is empty at this point, so we don't lose anything by ignoring it
await self._update_batch(
batch_id,
status="failed",
failed_at=int(time.time()),
errors=Errors(data=[BatchError(code="output_failed", message=str(e))]),
)
async def _process_single_request(self, batch_id: str, request: BatchRequest) -> dict:
"""Process a single request from the batch."""
request_id = f"batch_req_{batch_id}_{request.line_num}"
try:
# TODO(SECURITY): review body for security issues
chat_response = await self.inference_api.openai_chat_completion(**request.body)
# this is for mypy, we don't allow streaming so we'll get the right type
assert hasattr(chat_response, "model_dump_json"), "Chat response must have model_dump_json method"
return {
"id": request_id,
"custom_id": request.custom_id,
"response": {
"status_code": 200,
"request_id": request_id, # TODO: should this be different?
"body": chat_response.model_dump_json(),
},
}
except Exception as e:
logger.info(f"Error processing request {request.custom_id} in batch {batch_id}: {e}")
return {
"id": request_id,
"custom_id": request.custom_id,
"error": {"type": "request_failed", "message": str(e)},
}
async def _create_output_file(self, batch_id: str, results: list[dict], file_type: str) -> str:
"""
Create an output file with batch results.
This function filters results based on the specified file_type
and uploads the file to the Files API.
"""
output_lines = [json.dumps(result) for result in results]
with AsyncBytesIO("\n".join(output_lines).encode("utf-8")) as file_buffer:
file_buffer.filename = f"{batch_id}_{file_type}.jsonl"
uploaded_file = await self.files_api.openai_upload_file(file=file_buffer, purpose=OpenAIFilePurpose.BATCH)
return uploaded_file.id

View file

@ -0,0 +1,40 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from pydantic import BaseModel, Field
from llama_stack.providers.utils.kvstore.config import KVStoreConfig, SqliteKVStoreConfig
class ReferenceBatchesImplConfig(BaseModel):
"""Configuration for the Reference Batches implementation."""
kvstore: KVStoreConfig = Field(
description="Configuration for the key-value store backend.",
)
max_concurrent_batches: int = Field(
default=1,
description="Maximum number of concurrent batches to process simultaneously.",
ge=1,
)
max_concurrent_requests_per_batch: int = Field(
default=10,
description="Maximum number of concurrent requests to process per batch.",
ge=1,
)
# TODO: add a max requests per second rate limiter
@classmethod
def sample_run_config(cls, __distro_dir__: str) -> dict:
return {
"kvstore": SqliteKVStoreConfig.sample_run_config(
__distro_dir__=__distro_dir__,
db_name="batches.db",
),
}

View file

@ -0,0 +1,26 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from llama_stack.providers.datatypes import Api, InlineProviderSpec, ProviderSpec
def available_providers() -> list[ProviderSpec]:
return [
InlineProviderSpec(
api=Api.batches,
provider_type="inline::reference",
pip_packages=["openai"],
module="llama_stack.providers.inline.batches.reference",
config_class="llama_stack.providers.inline.batches.reference.config.ReferenceBatchesImplConfig",
api_dependencies=[
Api.inference,
Api.files,
Api.models,
],
description="Reference implementation of batches API with KVStore persistence.",
),
]