feat: Add optional idempotency support to batches API

Implements optional idempotency for batch creation using `idem_tok` parameter:

* **Core idempotency**: Same token + parameters returns existing batch
* **Conflict detection**: Same token + different parameters raises HTTP 409 ConflictError
* **Metadata order independence**: Different key ordering doesn't affect idempotency

**API changes:**
- Add optional `idem_tok` parameter to `create_batch()` method
- Enhanced API documentation with idempotency extensions

**Implementation:**
- Reference provider supports idempotent batch creation
- ConflictError for proper HTTP 409 status code mapping
- Comprehensive parameter validation

**Testing:**
- Unit tests: focused tests covering core scenarios with parametrized conflict detection
- Integration tests: tests validating real OpenAI client behavior

This enables client-side retry safety and prevents duplicate batch creation
when using the same idempotency token, following REST API
This commit is contained in:
Matthew Farrellee 2025-08-08 08:08:08 -04:00
parent 5e7c2250be
commit 68877f331e
7 changed files with 339 additions and 64 deletions

View file

@ -2,12 +2,15 @@
## Overview
Protocol for batch processing API operations.
The Batches API enables efficient processing of multiple requests in a single operation,
The Batches API enables efficient processing of multiple requests in a single operation,
particularly useful for processing large datasets, batch evaluation workflows, and
cost-effective inference at scale.
The API is designed to allow use of openai client libraries for seamless integration.
This API provides the following extensions:
- idempotent batch creation
Note: This API is currently under active development and may undergo changes.
This section contains documentation for all available providers for the **batches** API.

View file

@ -29,12 +29,16 @@ class ListBatchesResponse(BaseModel):
@runtime_checkable
class Batches(Protocol):
"""Protocol for batch processing API operations.
"""
The Batches API enables efficient processing of multiple requests in a single operation,
particularly useful for processing large datasets, batch evaluation workflows, and
cost-effective inference at scale.
The API is designed to allow use of openai client libraries for seamless integration.
This API provides the following extensions:
- idempotent batch creation
Note: This API is currently under active development and may undergo changes.
"""
@ -45,6 +49,7 @@ class Batches(Protocol):
endpoint: str,
completion_window: Literal["24h"],
metadata: dict[str, str] | None = None,
idem_tok: str | None = None, # intentionally bad name
) -> BatchObject:
"""Create a new batch for processing multiple API requests.
@ -52,6 +57,7 @@ class Batches(Protocol):
:param endpoint: The endpoint to be used for all requests in the batch.
:param completion_window: The time window within which the batch should be processed.
:param metadata: Optional metadata for the batch.
:param idem_tok: Optional idempotency token. When provided, enables idempotent behavior.
:returns: The created batch object.
"""
...

View file

@ -5,6 +5,7 @@
# the root directory of this source tree.
import asyncio
import hashlib
import itertools
import json
import time
@ -136,28 +137,45 @@ class ReferenceBatchesImpl(Batches):
endpoint: str,
completion_window: Literal["24h"],
metadata: dict[str, str] | None = None,
idem_tok: str | None = None,
) -> BatchObject:
"""
Create a new batch for processing multiple API requests.
Error handling by levels -
0. Input param handling, results in 40x errors before processing, e.g.
- Wrong completion_window
- Invalid metadata types
- Unknown endpoint
-> no batch created
1. Errors preventing processing, result in BatchErrors aggregated in process_batch, e.g.
- input_file_id missing
- invalid json in file
- missing custom_id, method, url, body
- invalid model
- streaming
-> batch created, validation sends to failed status
2. Processing errors, result in error_file_id entries, e.g.
- Any error returned from inference endpoint
-> batch created, goes to completed status
This implementation provides optional idempotency: when an idempotency token
(idem_tok) is provided, a deterministic ID is generated based on the input
parameters. If a batch with the same parameters already exists, it will be
returned instead of creating a duplicate. Without an idempotency token,
each request creates a new batch with a unique ID.
Args:
input_file_id: The ID of an uploaded file containing requests for the batch.
endpoint: The endpoint to be used for all requests in the batch.
completion_window: The time window within which the batch should be processed.
metadata: Optional metadata for the batch.
idem_tok: Optional idempotency token for enabling idempotent behavior.
Returns:
The created or existing batch object.
"""
# Error handling by levels -
# 0. Input param handling, results in 40x errors before processing, e.g.
# - Wrong completion_window
# - Invalid metadata types
# - Unknown endpoint
# -> no batch created
# 1. Errors preventing processing, result in BatchErrors aggregated in process_batch, e.g.
# - input_file_id missing
# - invalid json in file
# - missing custom_id, method, url, body
# - invalid model
# - streaming
# -> batch created, validation sends to failed status
# 2. Processing errors, result in error_file_id entries, e.g.
# - Any error returned from inference endpoint
# -> batch created, goes to completed status
# TODO: set expiration time for garbage collection
if endpoint not in ["/v1/chat/completions"]:
@ -171,6 +189,35 @@ class ReferenceBatchesImpl(Batches):
)
batch_id = f"batch_{uuid.uuid4().hex[:16]}"
# For idempotent requests, use the idempotency token for the batch ID
# This ensures the same token always maps to the same batch ID,
# allowing us to detect parameter conflicts
if idem_tok is not None:
hash_input = idem_tok.encode("utf-8")
hash_digest = hashlib.sha256(hash_input).hexdigest()[:24]
batch_id = f"batch_{hash_digest}"
try:
existing_batch = await self.retrieve_batch(batch_id)
if (
existing_batch.input_file_id != input_file_id
or existing_batch.endpoint != endpoint
or existing_batch.completion_window != completion_window
or existing_batch.metadata != metadata
):
raise ConflictError(
f"Idempotency token '{idem_tok}' was previously used with different parameters. "
"Either use a new idempotency token or ensure all parameters match the original request."
)
logger.info(f"Returning existing batch with ID: {batch_id}")
return existing_batch
except ResourceNotFoundError:
# Batch doesn't exist, continue with creation
pass
current_time = int(time.time())
batch = BatchObject(
@ -185,6 +232,7 @@ class ReferenceBatchesImpl(Batches):
)
await self.kvstore.set(f"batch:{batch_id}", batch.to_json())
logger.info(f"Created new batch with ID: {batch_id}")
if self.process_batches:
task = asyncio.create_task(self._process_batch(batch_id))

View file

@ -0,0 +1,91 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""
Integration tests for batch idempotency functionality using the OpenAI client library.
This module tests the idempotency feature in the batches API using the OpenAI-compatible
client interface. These tests verify that the idempotency token (idem_tok) works correctly
in a real client-server environment.
Test Categories:
1. Successful Idempotency: Same token returns same batch with identical parameters
- test_idempotent_batch_creation_successful: Verifies that requests with the same
idempotency token return identical batches, even with different metadata order
2. Conflict Detection: Same token with conflicting parameters raises HTTP 409 errors
- test_idempotency_conflict_with_different_params: Verifies that reusing an idempotency token
with truly conflicting parameters (both file ID and metadata values) raises ConflictError
"""
import time
import pytest
from openai import ConflictError
class TestBatchesIdempotencyIntegration:
"""Integration tests for batch idempotency using OpenAI client."""
def test_idempotent_batch_creation_successful(self, openai_client):
"""Test that identical requests with same idempotency token return the same batch."""
batch1 = openai_client.batches.create(
input_file_id="bogus-id",
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={
"test_type": "idempotency_success",
"purpose": "integration_test",
},
extra_body={"idem_tok": "test-idempotency-token-1"},
)
# sleep to ensure different timestamps
time.sleep(1)
batch2 = openai_client.batches.create(
input_file_id="bogus-id",
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={
"purpose": "integration_test",
"test_type": "idempotency_success",
}, # Different order
extra_body={"idem_tok": "test-idempotency-token-1"},
)
assert batch1.id == batch2.id
assert batch1.input_file_id == batch2.input_file_id
assert batch1.endpoint == batch2.endpoint
assert batch1.completion_window == batch2.completion_window
assert batch1.metadata == batch2.metadata
assert batch1.created_at == batch2.created_at
def test_idempotency_conflict_with_different_params(self, openai_client):
"""Test that using same idempotency token with different params raises conflict error."""
batch1 = openai_client.batches.create(
input_file_id="bogus-id-1",
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={"test_type": "conflict_test_1"},
extra_body={"idem_tok": "conflict-token"},
)
with pytest.raises(ConflictError) as exc_info:
openai_client.batches.create(
input_file_id="bogus-id-2", # Different file ID
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={"test_type": "conflict_test_2"}, # Different metadata
extra_body={"idem_tok": "conflict-token"}, # Same token
)
assert exc_info.value.status_code == 409
assert "conflict" in str(exc_info.value).lower()
retrieved_batch = openai_client.batches.retrieve(batch1.id)
assert retrieved_batch.id == batch1.id
assert retrieved_batch.input_file_id == "bogus-id-1"

View file

@ -0,0 +1,54 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""Shared fixtures for batches provider unit tests."""
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock
import pytest
from llama_stack.providers.inline.batches.reference.batches import ReferenceBatchesImpl
from llama_stack.providers.inline.batches.reference.config import ReferenceBatchesImplConfig
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig
@pytest.fixture
async def provider():
"""Create a test provider instance with temporary database."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_batches.db"
kvstore_config = SqliteKVStoreConfig(db_path=str(db_path))
config = ReferenceBatchesImplConfig(kvstore=kvstore_config)
# Create kvstore and mock APIs
kvstore = await kvstore_impl(config.kvstore)
mock_inference = AsyncMock()
mock_files = AsyncMock()
mock_models = AsyncMock()
provider = ReferenceBatchesImpl(config, mock_inference, mock_files, mock_models, kvstore)
await provider.initialize()
# unit tests should not require background processing
provider.process_batches = False
yield provider
await provider.shutdown()
@pytest.fixture
def sample_batch_data():
"""Sample batch data for testing."""
return {
"input_file_id": "file_abc123",
"endpoint": "/v1/chat/completions",
"completion_window": "24h",
"metadata": {"test": "true", "priority": "high"},
}

View file

@ -54,60 +54,17 @@ dependencies like inference, files, and models APIs.
"""
import json
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from llama_stack.apis.batches import BatchObject
from llama_stack.apis.common.errors import ConflictError, ResourceNotFoundError
from llama_stack.providers.inline.batches.reference.batches import ReferenceBatchesImpl
from llama_stack.providers.inline.batches.reference.config import ReferenceBatchesImplConfig
from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig
class TestReferenceBatchesImpl:
"""Test the reference implementation of the Batches API."""
@pytest.fixture
async def provider(self):
"""Create a test provider instance with temporary database."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_batches.db"
kvstore_config = SqliteKVStoreConfig(db_path=str(db_path))
config = ReferenceBatchesImplConfig(kvstore=kvstore_config)
# Create kvstore and mock APIs
from unittest.mock import AsyncMock
from llama_stack.providers.utils.kvstore import kvstore_impl
kvstore = await kvstore_impl(config.kvstore)
mock_inference = AsyncMock()
mock_files = AsyncMock()
mock_models = AsyncMock()
provider = ReferenceBatchesImpl(config, mock_inference, mock_files, mock_models, kvstore)
await provider.initialize()
# unit tests should not require background processing
provider.process_batches = False
yield provider
await provider.shutdown()
@pytest.fixture
def sample_batch_data(self):
"""Sample batch data for testing."""
return {
"input_file_id": "file_abc123",
"endpoint": "/v1/chat/completions",
"completion_window": "24h",
"metadata": {"test": "true", "priority": "high"},
}
def _validate_batch_type(self, batch, expected_metadata=None):
"""
Helper function to validate batch object structure and field types.

View file

@ -0,0 +1,116 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""
Tests for idempotency functionality in the reference batches provider.
This module tests the optional idempotency feature that allows clients to provide
an idempotency token (idem_tok) to ensure that repeated requests with the same token
and parameters return the same batch, while requests with the same token but different
parameters result in a conflict error.
Test Categories:
1. Core Idempotency: Same parameters with same token return same batch
2. Parameter Independence: Different parameters without tokens create different batches
3. Conflict Detection: Same token with different parameters raises ConflictError
Tests by Category:
1. Core Idempotency:
- test_idempotent_batch_creation_same_params
- test_idempotent_batch_creation_metadata_order_independence
2. Parameter Independence:
- test_non_idempotent_behavior_without_token
- test_different_idempotency_tokens_create_different_batches
3. Conflict Detection:
- test_same_idem_tok_different_params_conflict (parametrized: input_file_id, metadata values, metadata None vs {})
Key Behaviors Tested:
- Idempotent batch creation when idem_tok provided with identical parameters
- Metadata order independence for consistent batch ID generation
- Non-idempotent behavior when no idem_tok provided (random UUIDs)
- Conflict detection for parameter mismatches with same idempotency token
- Deterministic ID generation based solely on idempotency token
- Proper error handling with detailed conflict messages including token and error codes
- Protection against idempotency token reuse with different request parameters
"""
import asyncio
import pytest
from llama_stack.apis.common.errors import ConflictError
class TestReferenceBatchesIdempotency:
"""Test suite for idempotency functionality in the reference implementation."""
async def test_idempotent_batch_creation_same_params(self, provider, sample_batch_data):
"""Test that creating batches with identical parameters returns the same batch when idem_tok is provided."""
del sample_batch_data["metadata"]
batch1 = await provider.create_batch(
**sample_batch_data,
metadata={"test": "value1", "other": "value2"},
idem_tok="unique-token-1",
)
# sleep for 1 second to allow created_at timestamps to be different
await asyncio.sleep(1)
batch2 = await provider.create_batch(
**sample_batch_data,
metadata={"other": "value2", "test": "value1"}, # Different order
idem_tok="unique-token-1",
)
assert batch1.id == batch2.id
assert batch1.input_file_id == batch2.input_file_id
assert batch1.metadata == batch2.metadata
assert batch1.created_at == batch2.created_at
async def test_different_idempotency_tokens_create_different_batches(self, provider, sample_batch_data):
"""Test that different idempotency tokens create different batches even with same params."""
batch1 = await provider.create_batch(
**sample_batch_data,
idem_tok="token-A",
)
batch2 = await provider.create_batch(
**sample_batch_data,
idem_tok="token-B",
)
assert batch1.id != batch2.id
@pytest.mark.parametrize(
"param_name,first_value,second_value",
[
("input_file_id", "file_001", "file_002"),
("metadata", {"test": "value1"}, {"test": "value2"}),
("metadata", None, {}),
],
)
async def test_same_idem_tok_different_params_conflict(
self, provider, sample_batch_data, param_name, first_value, second_value
):
"""Test that same idem_tok with different parameters raises conflict error."""
sample_batch_data["idem_tok"] = "same-token"
sample_batch_data[param_name] = first_value
batch1 = await provider.create_batch(**sample_batch_data)
with pytest.raises(ConflictError, match="Idempotency token.*was previously used with different parameters"):
sample_batch_data[param_name] = second_value
await provider.create_batch(**sample_batch_data)
retrieved_batch = await provider.retrieve_batch(batch1.id)
assert retrieved_batch.id == batch1.id
assert getattr(retrieved_batch, param_name) == first_value