mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-08-21 09:23:13 +00:00
Add complete batches API implementation with protocol, providers, and tests: Core Infrastructure: - Add batches API protocol using OpenAI Batch types directly - Add Api.batches enum value and protocol mapping in resolver - Add OpenAI "batch" file purpose support - Include proper error handling (ConflictError, ResourceNotFoundError) Reference Provider: - Add ReferenceBatchesImpl with full CRUD operations (create, retrieve, cancel, list) - Implement background batch processing with configurable concurrency - Add SQLite KVStore backend for persistence - Support /v1/chat/completions endpoint with request validation Comprehensive Test Suite: - Add unit tests for provider implementation with validation - Add integration tests for end-to-end batch processing workflows - Add error handling tests for validation, malformed inputs, and edge cases Configuration: - Add max_concurrent_batches and max_concurrent_requests_per_batch options - Add provider documentation with sample configurations Test with - ``` $ uv run llama stack build --image-type venv --providers inference=YOU_PICK,files=inline::localfs,batches=inline::reference --run & $ LLAMA_STACK_CONFIG=http://localhost:8321 uv run pytest tests/unit/providers/batches tests/integration/batches --text-model YOU_PICK ``` addresses #3066 --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Ashwin Bharambe <ashwin.bharambe@gmail.com>
89 lines
3.2 KiB
Python
89 lines
3.2 KiB
Python
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
|
# All rights reserved.
|
|
#
|
|
# This source code is licensed under the terms described in the LICENSE file in
|
|
# the root directory of this source tree.
|
|
|
|
from typing import Literal, Protocol, runtime_checkable
|
|
|
|
from pydantic import BaseModel, Field
|
|
|
|
from llama_stack.schema_utils import json_schema_type, webmethod
|
|
|
|
try:
|
|
from openai.types import Batch as BatchObject
|
|
except ImportError as e:
|
|
raise ImportError("OpenAI package is required for batches API. Please install it with: pip install openai") from e
|
|
|
|
|
|
@json_schema_type
|
|
class ListBatchesResponse(BaseModel):
|
|
"""Response containing a list of batch objects."""
|
|
|
|
object: Literal["list"] = "list"
|
|
data: list[BatchObject] = Field(..., description="List of batch objects")
|
|
first_id: str | None = Field(default=None, description="ID of the first batch in the list")
|
|
last_id: str | None = Field(default=None, description="ID of the last batch in the list")
|
|
has_more: bool = Field(default=False, description="Whether there are more batches available")
|
|
|
|
|
|
@runtime_checkable
|
|
class Batches(Protocol):
|
|
"""Protocol for batch processing API operations.
|
|
|
|
The Batches API enables efficient processing of multiple requests in a single operation,
|
|
particularly useful for processing large datasets, batch evaluation workflows, and
|
|
cost-effective inference at scale.
|
|
|
|
Note: This API is currently under active development and may undergo changes.
|
|
"""
|
|
|
|
@webmethod(route="/openai/v1/batches", method="POST")
|
|
async def create_batch(
|
|
self,
|
|
input_file_id: str,
|
|
endpoint: str,
|
|
completion_window: Literal["24h"],
|
|
metadata: dict[str, str] | None = None,
|
|
) -> BatchObject:
|
|
"""Create a new batch for processing multiple API requests.
|
|
|
|
:param input_file_id: The ID of an uploaded file containing requests for the batch.
|
|
:param endpoint: The endpoint to be used for all requests in the batch.
|
|
:param completion_window: The time window within which the batch should be processed.
|
|
:param metadata: Optional metadata for the batch.
|
|
:returns: The created batch object.
|
|
"""
|
|
...
|
|
|
|
@webmethod(route="/openai/v1/batches/{batch_id}", method="GET")
|
|
async def retrieve_batch(self, batch_id: str) -> BatchObject:
|
|
"""Retrieve information about a specific batch.
|
|
|
|
:param batch_id: The ID of the batch to retrieve.
|
|
:returns: The batch object.
|
|
"""
|
|
...
|
|
|
|
@webmethod(route="/openai/v1/batches/{batch_id}/cancel", method="POST")
|
|
async def cancel_batch(self, batch_id: str) -> BatchObject:
|
|
"""Cancel a batch that is in progress.
|
|
|
|
:param batch_id: The ID of the batch to cancel.
|
|
:returns: The updated batch object.
|
|
"""
|
|
...
|
|
|
|
@webmethod(route="/openai/v1/batches", method="GET")
|
|
async def list_batches(
|
|
self,
|
|
after: str | None = None,
|
|
limit: int = 20,
|
|
) -> ListBatchesResponse:
|
|
"""List all batches for the current user.
|
|
|
|
:param after: A cursor for pagination; returns batches after this batch ID.
|
|
:param limit: Number of batches to return (default 20, max 100).
|
|
:returns: A list of batch objects.
|
|
"""
|
|
...
|