feat: Add optional idempotency support to batches API (#3171)
Some checks failed
Integration Auth Tests / test-matrix (oauth2_token) (push) Failing after 4s
Test External Providers Installed via Module / test-external-providers-from-module (venv) (push) Has been skipped
Integration Tests (Replay) / Integration Tests (, , , client=, vision=) (push) Failing after 0s
Test Llama Stack Build / build-single-provider (push) Failing after 2s
Pre-commit / pre-commit (push) Failing after 4s
SqlStore Integration Tests / test-postgres (3.13) (push) Failing after 5s
Test Llama Stack Build / build-ubi9-container-distribution (push) Failing after 3s
Test Llama Stack Build / generate-matrix (push) Failing after 5s
Test Llama Stack Build / build (push) Has been skipped
Vector IO Integration Tests / test-matrix (push) Failing after 6s
Test Llama Stack Build / build-custom-container-distribution (push) Failing after 5s
Python Package Build Test / build (3.13) (push) Failing after 4s
Test External API and Providers / test-external (venv) (push) Failing after 4s
Unit Tests / unit-tests (3.12) (push) Failing after 4s
Update ReadTheDocs / update-readthedocs (push) Failing after 4s
Python Package Build Test / build (3.12) (push) Failing after 7s
Unit Tests / unit-tests (3.13) (push) Failing after 5s
UI Tests / ui-tests (22) (push) Failing after 6s
SqlStore Integration Tests / test-postgres (3.12) (push) Failing after 14s

Implements optional idempotency for batch creation using `idem_tok`
parameter:

* **Core idempotency**: Same token + parameters returns existing batch
* **Conflict detection**: Same token + different parameters raises HTTP
409 ConflictError
* **Metadata order independence**: Different key ordering doesn't affect
idempotency

**API changes:**
- Add optional `idem_tok` parameter to `create_batch()` method
- Enhanced API documentation with idempotency extensions

**Implementation:**
- Reference provider supports idempotent batch creation
- ConflictError for proper HTTP 409 status code mapping
- Comprehensive parameter validation

**Testing:**
- Unit tests: focused tests covering core scenarios with parametrized
conflict detection
- Integration tests: tests validating real OpenAI client behavior

This enables client-side retry safety and prevents duplicate batch
creation when using the same idempotency token, following REST API

closes #3144
This commit is contained in:
Matthew Farrellee 2025-08-22 17:50:40 -05:00 committed by GitHub
parent 7519b73fcc
commit cffc4edf47
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 351 additions and 64 deletions

View file

@ -2,12 +2,15 @@
## Overview ## Overview
Protocol for batch processing API operations.
The Batches API enables efficient processing of multiple requests in a single operation, The Batches API enables efficient processing of multiple requests in a single operation,
particularly useful for processing large datasets, batch evaluation workflows, and particularly useful for processing large datasets, batch evaluation workflows, and
cost-effective inference at scale. cost-effective inference at scale.
The API is designed to allow use of openai client libraries for seamless integration.
This API provides the following extensions:
- idempotent batch creation
Note: This API is currently under active development and may undergo changes. Note: This API is currently under active development and may undergo changes.
This section contains documentation for all available providers for the **batches** API. This section contains documentation for all available providers for the **batches** API.

View file

@ -29,12 +29,16 @@ class ListBatchesResponse(BaseModel):
@runtime_checkable @runtime_checkable
class Batches(Protocol): class Batches(Protocol):
"""Protocol for batch processing API operations. """
The Batches API enables efficient processing of multiple requests in a single operation, The Batches API enables efficient processing of multiple requests in a single operation,
particularly useful for processing large datasets, batch evaluation workflows, and particularly useful for processing large datasets, batch evaluation workflows, and
cost-effective inference at scale. cost-effective inference at scale.
The API is designed to allow use of openai client libraries for seamless integration.
This API provides the following extensions:
- idempotent batch creation
Note: This API is currently under active development and may undergo changes. Note: This API is currently under active development and may undergo changes.
""" """
@ -45,6 +49,7 @@ class Batches(Protocol):
endpoint: str, endpoint: str,
completion_window: Literal["24h"], completion_window: Literal["24h"],
metadata: dict[str, str] | None = None, metadata: dict[str, str] | None = None,
idempotency_key: str | None = None,
) -> BatchObject: ) -> BatchObject:
"""Create a new batch for processing multiple API requests. """Create a new batch for processing multiple API requests.
@ -52,6 +57,7 @@ class Batches(Protocol):
:param endpoint: The endpoint to be used for all requests in the batch. :param endpoint: The endpoint to be used for all requests in the batch.
:param completion_window: The time window within which the batch should be processed. :param completion_window: The time window within which the batch should be processed.
:param metadata: Optional metadata for the batch. :param metadata: Optional metadata for the batch.
:param idempotency_key: Optional idempotency key. When provided, enables idempotent behavior.
:returns: The created batch object. :returns: The created batch object.
""" """
... ...

View file

@ -5,6 +5,7 @@
# the root directory of this source tree. # the root directory of this source tree.
import asyncio import asyncio
import hashlib
import itertools import itertools
import json import json
import time import time
@ -136,28 +137,45 @@ class ReferenceBatchesImpl(Batches):
endpoint: str, endpoint: str,
completion_window: Literal["24h"], completion_window: Literal["24h"],
metadata: dict[str, str] | None = None, metadata: dict[str, str] | None = None,
idempotency_key: str | None = None,
) -> BatchObject: ) -> BatchObject:
""" """
Create a new batch for processing multiple API requests. Create a new batch for processing multiple API requests.
Error handling by levels - This implementation provides optional idempotency: when an idempotency key
0. Input param handling, results in 40x errors before processing, e.g. (idempotency_key) is provided, a deterministic ID is generated based on the input
- Wrong completion_window parameters. If a batch with the same parameters already exists, it will be
- Invalid metadata types returned instead of creating a duplicate. Without an idempotency key,
- Unknown endpoint each request creates a new batch with a unique ID.
-> no batch created
1. Errors preventing processing, result in BatchErrors aggregated in process_batch, e.g. Args:
- input_file_id missing input_file_id: The ID of an uploaded file containing requests for the batch.
- invalid json in file endpoint: The endpoint to be used for all requests in the batch.
- missing custom_id, method, url, body completion_window: The time window within which the batch should be processed.
- invalid model metadata: Optional metadata for the batch.
- streaming idempotency_key: Optional idempotency key for enabling idempotent behavior.
-> batch created, validation sends to failed status
2. Processing errors, result in error_file_id entries, e.g. Returns:
- Any error returned from inference endpoint The created or existing batch object.
-> batch created, goes to completed status
""" """
# Error handling by levels -
# 0. Input param handling, results in 40x errors before processing, e.g.
# - Wrong completion_window
# - Invalid metadata types
# - Unknown endpoint
# -> no batch created
# 1. Errors preventing processing, result in BatchErrors aggregated in process_batch, e.g.
# - input_file_id missing
# - invalid json in file
# - missing custom_id, method, url, body
# - invalid model
# - streaming
# -> batch created, validation sends to failed status
# 2. Processing errors, result in error_file_id entries, e.g.
# - Any error returned from inference endpoint
# -> batch created, goes to completed status
# TODO: set expiration time for garbage collection # TODO: set expiration time for garbage collection
if endpoint not in ["/v1/chat/completions"]: if endpoint not in ["/v1/chat/completions"]:
@ -171,6 +189,35 @@ class ReferenceBatchesImpl(Batches):
) )
batch_id = f"batch_{uuid.uuid4().hex[:16]}" batch_id = f"batch_{uuid.uuid4().hex[:16]}"
# For idempotent requests, use the idempotency key for the batch ID
# This ensures the same key always maps to the same batch ID,
# allowing us to detect parameter conflicts
if idempotency_key is not None:
hash_input = idempotency_key.encode("utf-8")
hash_digest = hashlib.sha256(hash_input).hexdigest()[:24]
batch_id = f"batch_{hash_digest}"
try:
existing_batch = await self.retrieve_batch(batch_id)
if (
existing_batch.input_file_id != input_file_id
or existing_batch.endpoint != endpoint
or existing_batch.completion_window != completion_window
or existing_batch.metadata != metadata
):
raise ConflictError(
f"Idempotency key '{idempotency_key}' was previously used with different parameters. "
"Either use a new idempotency key or ensure all parameters match the original request."
)
logger.info(f"Returning existing batch with ID: {batch_id}")
return existing_batch
except ResourceNotFoundError:
# Batch doesn't exist, continue with creation
pass
current_time = int(time.time()) current_time = int(time.time())
batch = BatchObject( batch = BatchObject(
@ -185,6 +232,7 @@ class ReferenceBatchesImpl(Batches):
) )
await self.kvstore.set(f"batch:{batch_id}", batch.to_json()) await self.kvstore.set(f"batch:{batch_id}", batch.to_json())
logger.info(f"Created new batch with ID: {batch_id}")
if self.process_batches: if self.process_batches:
task = asyncio.create_task(self._process_batch(batch_id)) task = asyncio.create_task(self._process_batch(batch_id))

View file

@ -0,0 +1,91 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""
Integration tests for batch idempotency functionality using the OpenAI client library.
This module tests the idempotency feature in the batches API using the OpenAI-compatible
client interface. These tests verify that the idempotency key (idempotency_key) works correctly
in a real client-server environment.
Test Categories:
1. Successful Idempotency: Same key returns same batch with identical parameters
- test_idempotent_batch_creation_successful: Verifies that requests with the same
idempotency key return identical batches, even with different metadata order
2. Conflict Detection: Same key with conflicting parameters raises HTTP 409 errors
- test_idempotency_conflict_with_different_params: Verifies that reusing an idempotency key
with truly conflicting parameters (both file ID and metadata values) raises ConflictError
"""
import time
import pytest
from openai import ConflictError
class TestBatchesIdempotencyIntegration:
"""Integration tests for batch idempotency using OpenAI client."""
def test_idempotent_batch_creation_successful(self, openai_client):
"""Test that identical requests with same idempotency key return the same batch."""
batch1 = openai_client.batches.create(
input_file_id="bogus-id",
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={
"test_type": "idempotency_success",
"purpose": "integration_test",
},
extra_body={"idempotency_key": "test-idempotency-token-1"},
)
# sleep to ensure different timestamps
time.sleep(1)
batch2 = openai_client.batches.create(
input_file_id="bogus-id",
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={
"purpose": "integration_test",
"test_type": "idempotency_success",
}, # Different order
extra_body={"idempotency_key": "test-idempotency-token-1"},
)
assert batch1.id == batch2.id
assert batch1.input_file_id == batch2.input_file_id
assert batch1.endpoint == batch2.endpoint
assert batch1.completion_window == batch2.completion_window
assert batch1.metadata == batch2.metadata
assert batch1.created_at == batch2.created_at
def test_idempotency_conflict_with_different_params(self, openai_client):
"""Test that using same idempotency key with different params raises conflict error."""
batch1 = openai_client.batches.create(
input_file_id="bogus-id-1",
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={"test_type": "conflict_test_1"},
extra_body={"idempotency_key": "conflict-token"},
)
with pytest.raises(ConflictError) as exc_info:
openai_client.batches.create(
input_file_id="bogus-id-2", # Different file ID
endpoint="/v1/chat/completions",
completion_window="24h",
metadata={"test_type": "conflict_test_2"}, # Different metadata
extra_body={"idempotency_key": "conflict-token"}, # Same token
)
assert exc_info.value.status_code == 409
assert "conflict" in str(exc_info.value).lower()
retrieved_batch = openai_client.batches.retrieve(batch1.id)
assert retrieved_batch.id == batch1.id
assert retrieved_batch.input_file_id == "bogus-id-1"

View file

@ -0,0 +1,54 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""Shared fixtures for batches provider unit tests."""
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock
import pytest
from llama_stack.providers.inline.batches.reference.batches import ReferenceBatchesImpl
from llama_stack.providers.inline.batches.reference.config import ReferenceBatchesImplConfig
from llama_stack.providers.utils.kvstore import kvstore_impl
from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig
@pytest.fixture
async def provider():
"""Create a test provider instance with temporary database."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_batches.db"
kvstore_config = SqliteKVStoreConfig(db_path=str(db_path))
config = ReferenceBatchesImplConfig(kvstore=kvstore_config)
# Create kvstore and mock APIs
kvstore = await kvstore_impl(config.kvstore)
mock_inference = AsyncMock()
mock_files = AsyncMock()
mock_models = AsyncMock()
provider = ReferenceBatchesImpl(config, mock_inference, mock_files, mock_models, kvstore)
await provider.initialize()
# unit tests should not require background processing
provider.process_batches = False
yield provider
await provider.shutdown()
@pytest.fixture
def sample_batch_data():
"""Sample batch data for testing."""
return {
"input_file_id": "file_abc123",
"endpoint": "/v1/chat/completions",
"completion_window": "24h",
"metadata": {"test": "true", "priority": "high"},
}

View file

@ -54,60 +54,17 @@ dependencies like inference, files, and models APIs.
""" """
import json import json
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock
import pytest import pytest
from llama_stack.apis.batches import BatchObject from llama_stack.apis.batches import BatchObject
from llama_stack.apis.common.errors import ConflictError, ResourceNotFoundError from llama_stack.apis.common.errors import ConflictError, ResourceNotFoundError
from llama_stack.providers.inline.batches.reference.batches import ReferenceBatchesImpl
from llama_stack.providers.inline.batches.reference.config import ReferenceBatchesImplConfig
from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig
class TestReferenceBatchesImpl: class TestReferenceBatchesImpl:
"""Test the reference implementation of the Batches API.""" """Test the reference implementation of the Batches API."""
@pytest.fixture
async def provider(self):
"""Create a test provider instance with temporary database."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test_batches.db"
kvstore_config = SqliteKVStoreConfig(db_path=str(db_path))
config = ReferenceBatchesImplConfig(kvstore=kvstore_config)
# Create kvstore and mock APIs
from unittest.mock import AsyncMock
from llama_stack.providers.utils.kvstore import kvstore_impl
kvstore = await kvstore_impl(config.kvstore)
mock_inference = AsyncMock()
mock_files = AsyncMock()
mock_models = AsyncMock()
provider = ReferenceBatchesImpl(config, mock_inference, mock_files, mock_models, kvstore)
await provider.initialize()
# unit tests should not require background processing
provider.process_batches = False
yield provider
await provider.shutdown()
@pytest.fixture
def sample_batch_data(self):
"""Sample batch data for testing."""
return {
"input_file_id": "file_abc123",
"endpoint": "/v1/chat/completions",
"completion_window": "24h",
"metadata": {"test": "true", "priority": "high"},
}
def _validate_batch_type(self, batch, expected_metadata=None): def _validate_batch_type(self, batch, expected_metadata=None):
""" """
Helper function to validate batch object structure and field types. Helper function to validate batch object structure and field types.

View file

@ -0,0 +1,128 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""
Tests for idempotency functionality in the reference batches provider.
This module tests the optional idempotency feature that allows clients to provide
an idempotency key (idempotency_key) to ensure that repeated requests with the same key
and parameters return the same batch, while requests with the same key but different
parameters result in a conflict error.
Test Categories:
1. Core Idempotency: Same parameters with same key return same batch
2. Parameter Independence: Different parameters without keys create different batches
3. Conflict Detection: Same key with different parameters raises ConflictError
Tests by Category:
1. Core Idempotency:
- test_idempotent_batch_creation_same_params
- test_idempotent_batch_creation_metadata_order_independence
2. Parameter Independence:
- test_non_idempotent_behavior_without_key
- test_different_idempotency_keys_create_different_batches
3. Conflict Detection:
- test_same_idempotency_key_different_params_conflict (parametrized: input_file_id, metadata values, metadata None vs {})
Key Behaviors Tested:
- Idempotent batch creation when idempotency_key provided with identical parameters
- Metadata order independence for consistent batch ID generation
- Non-idempotent behavior when no idempotency_key provided (random UUIDs)
- Conflict detection for parameter mismatches with same idempotency key
- Deterministic ID generation based solely on idempotency key
- Proper error handling with detailed conflict messages including key and error codes
- Protection against idempotency key reuse with different request parameters
"""
import asyncio
import pytest
from llama_stack.apis.common.errors import ConflictError
class TestReferenceBatchesIdempotency:
"""Test suite for idempotency functionality in the reference implementation."""
async def test_idempotent_batch_creation_same_params(self, provider, sample_batch_data):
"""Test that creating batches with identical parameters returns the same batch when idempotency_key is provided."""
del sample_batch_data["metadata"]
batch1 = await provider.create_batch(
**sample_batch_data,
metadata={"test": "value1", "other": "value2"},
idempotency_key="unique-token-1",
)
# sleep for 1 second to allow created_at timestamps to be different
await asyncio.sleep(1)
batch2 = await provider.create_batch(
**sample_batch_data,
metadata={"other": "value2", "test": "value1"}, # Different order
idempotency_key="unique-token-1",
)
assert batch1.id == batch2.id
assert batch1.input_file_id == batch2.input_file_id
assert batch1.metadata == batch2.metadata
assert batch1.created_at == batch2.created_at
async def test_different_idempotency_keys_create_different_batches(self, provider, sample_batch_data):
"""Test that different idempotency keys create different batches even with same params."""
batch1 = await provider.create_batch(
**sample_batch_data,
idempotency_key="token-A",
)
batch2 = await provider.create_batch(
**sample_batch_data,
idempotency_key="token-B",
)
assert batch1.id != batch2.id
async def test_non_idempotent_behavior_without_key(self, provider, sample_batch_data):
"""Test that batches without idempotency key create unique batches even with identical parameters."""
batch1 = await provider.create_batch(**sample_batch_data)
batch2 = await provider.create_batch(**sample_batch_data)
assert batch1.id != batch2.id
assert batch1.input_file_id == batch2.input_file_id
assert batch1.endpoint == batch2.endpoint
assert batch1.completion_window == batch2.completion_window
assert batch1.metadata == batch2.metadata
@pytest.mark.parametrize(
"param_name,first_value,second_value",
[
("input_file_id", "file_001", "file_002"),
("metadata", {"test": "value1"}, {"test": "value2"}),
("metadata", None, {}),
],
)
async def test_same_idempotency_key_different_params_conflict(
self, provider, sample_batch_data, param_name, first_value, second_value
):
"""Test that same idempotency_key with different parameters raises conflict error."""
sample_batch_data["idempotency_key"] = "same-token"
sample_batch_data[param_name] = first_value
batch1 = await provider.create_batch(**sample_batch_data)
with pytest.raises(ConflictError, match="Idempotency key.*was previously used with different parameters"):
sample_batch_data[param_name] = second_value
await provider.create_batch(**sample_batch_data)
retrieved_batch = await provider.retrieve_batch(batch1.id)
assert retrieved_batch.id == batch1.id
assert getattr(retrieved_batch, param_name) == first_value