From cffc4edf47ab205cd1045144aa78187f7534068b Mon Sep 17 00:00:00 2001 From: Matthew Farrellee Date: Fri, 22 Aug 2025 17:50:40 -0500 Subject: [PATCH] feat: Add optional idempotency support to batches API (#3171) Implements optional idempotency for batch creation using `idem_tok` parameter: * **Core idempotency**: Same token + parameters returns existing batch * **Conflict detection**: Same token + different parameters raises HTTP 409 ConflictError * **Metadata order independence**: Different key ordering doesn't affect idempotency **API changes:** - Add optional `idem_tok` parameter to `create_batch()` method - Enhanced API documentation with idempotency extensions **Implementation:** - Reference provider supports idempotent batch creation - ConflictError for proper HTTP 409 status code mapping - Comprehensive parameter validation **Testing:** - Unit tests: focused tests covering core scenarios with parametrized conflict detection - Integration tests: tests validating real OpenAI client behavior This enables client-side retry safety and prevents duplicate batch creation when using the same idempotency token, following REST API closes #3144 --- docs/source/providers/batches/index.md | 9 +- llama_stack/apis/batches/batches.py | 10 +- .../inline/batches/reference/batches.py | 80 ++++++++--- .../batches/test_batches_idempotency.py | 91 +++++++++++++ tests/unit/providers/batches/conftest.py | 54 ++++++++ .../unit/providers/batches/test_reference.py | 43 ------ .../batches/test_reference_idempotency.py | 128 ++++++++++++++++++ 7 files changed, 351 insertions(+), 64 deletions(-) create mode 100644 tests/integration/batches/test_batches_idempotency.py create mode 100644 tests/unit/providers/batches/conftest.py create mode 100644 tests/unit/providers/batches/test_reference_idempotency.py diff --git a/docs/source/providers/batches/index.md b/docs/source/providers/batches/index.md index 2a39a626c..d6d2fa9a3 100644 --- a/docs/source/providers/batches/index.md +++ b/docs/source/providers/batches/index.md @@ -2,12 +2,15 @@ ## Overview -Protocol for batch processing API operations. - - The Batches API enables efficient processing of multiple requests in a single operation, +The Batches API enables efficient processing of multiple requests in a single operation, particularly useful for processing large datasets, batch evaluation workflows, and cost-effective inference at scale. + The API is designed to allow use of openai client libraries for seamless integration. + + This API provides the following extensions: + - idempotent batch creation + Note: This API is currently under active development and may undergo changes. This section contains documentation for all available providers for the **batches** API. diff --git a/llama_stack/apis/batches/batches.py b/llama_stack/apis/batches/batches.py index 9297d8597..c6bbd92eb 100644 --- a/llama_stack/apis/batches/batches.py +++ b/llama_stack/apis/batches/batches.py @@ -29,12 +29,16 @@ class ListBatchesResponse(BaseModel): @runtime_checkable class Batches(Protocol): - """Protocol for batch processing API operations. - + """ The Batches API enables efficient processing of multiple requests in a single operation, particularly useful for processing large datasets, batch evaluation workflows, and cost-effective inference at scale. + The API is designed to allow use of openai client libraries for seamless integration. + + This API provides the following extensions: + - idempotent batch creation + Note: This API is currently under active development and may undergo changes. """ @@ -45,6 +49,7 @@ class Batches(Protocol): endpoint: str, completion_window: Literal["24h"], metadata: dict[str, str] | None = None, + idempotency_key: str | None = None, ) -> BatchObject: """Create a new batch for processing multiple API requests. @@ -52,6 +57,7 @@ class Batches(Protocol): :param endpoint: The endpoint to be used for all requests in the batch. :param completion_window: The time window within which the batch should be processed. :param metadata: Optional metadata for the batch. + :param idempotency_key: Optional idempotency key. When provided, enables idempotent behavior. :returns: The created batch object. """ ... diff --git a/llama_stack/providers/inline/batches/reference/batches.py b/llama_stack/providers/inline/batches/reference/batches.py index 1ff554e70..26f0ad15a 100644 --- a/llama_stack/providers/inline/batches/reference/batches.py +++ b/llama_stack/providers/inline/batches/reference/batches.py @@ -5,6 +5,7 @@ # the root directory of this source tree. import asyncio +import hashlib import itertools import json import time @@ -136,28 +137,45 @@ class ReferenceBatchesImpl(Batches): endpoint: str, completion_window: Literal["24h"], metadata: dict[str, str] | None = None, + idempotency_key: str | None = None, ) -> BatchObject: """ Create a new batch for processing multiple API requests. - Error handling by levels - - 0. Input param handling, results in 40x errors before processing, e.g. - - Wrong completion_window - - Invalid metadata types - - Unknown endpoint - -> no batch created - 1. Errors preventing processing, result in BatchErrors aggregated in process_batch, e.g. - - input_file_id missing - - invalid json in file - - missing custom_id, method, url, body - - invalid model - - streaming - -> batch created, validation sends to failed status - 2. Processing errors, result in error_file_id entries, e.g. - - Any error returned from inference endpoint - -> batch created, goes to completed status + This implementation provides optional idempotency: when an idempotency key + (idempotency_key) is provided, a deterministic ID is generated based on the input + parameters. If a batch with the same parameters already exists, it will be + returned instead of creating a duplicate. Without an idempotency key, + each request creates a new batch with a unique ID. + + Args: + input_file_id: The ID of an uploaded file containing requests for the batch. + endpoint: The endpoint to be used for all requests in the batch. + completion_window: The time window within which the batch should be processed. + metadata: Optional metadata for the batch. + idempotency_key: Optional idempotency key for enabling idempotent behavior. + + Returns: + The created or existing batch object. """ + # Error handling by levels - + # 0. Input param handling, results in 40x errors before processing, e.g. + # - Wrong completion_window + # - Invalid metadata types + # - Unknown endpoint + # -> no batch created + # 1. Errors preventing processing, result in BatchErrors aggregated in process_batch, e.g. + # - input_file_id missing + # - invalid json in file + # - missing custom_id, method, url, body + # - invalid model + # - streaming + # -> batch created, validation sends to failed status + # 2. Processing errors, result in error_file_id entries, e.g. + # - Any error returned from inference endpoint + # -> batch created, goes to completed status + # TODO: set expiration time for garbage collection if endpoint not in ["/v1/chat/completions"]: @@ -171,6 +189,35 @@ class ReferenceBatchesImpl(Batches): ) batch_id = f"batch_{uuid.uuid4().hex[:16]}" + + # For idempotent requests, use the idempotency key for the batch ID + # This ensures the same key always maps to the same batch ID, + # allowing us to detect parameter conflicts + if idempotency_key is not None: + hash_input = idempotency_key.encode("utf-8") + hash_digest = hashlib.sha256(hash_input).hexdigest()[:24] + batch_id = f"batch_{hash_digest}" + + try: + existing_batch = await self.retrieve_batch(batch_id) + + if ( + existing_batch.input_file_id != input_file_id + or existing_batch.endpoint != endpoint + or existing_batch.completion_window != completion_window + or existing_batch.metadata != metadata + ): + raise ConflictError( + f"Idempotency key '{idempotency_key}' was previously used with different parameters. " + "Either use a new idempotency key or ensure all parameters match the original request." + ) + + logger.info(f"Returning existing batch with ID: {batch_id}") + return existing_batch + except ResourceNotFoundError: + # Batch doesn't exist, continue with creation + pass + current_time = int(time.time()) batch = BatchObject( @@ -185,6 +232,7 @@ class ReferenceBatchesImpl(Batches): ) await self.kvstore.set(f"batch:{batch_id}", batch.to_json()) + logger.info(f"Created new batch with ID: {batch_id}") if self.process_batches: task = asyncio.create_task(self._process_batch(batch_id)) diff --git a/tests/integration/batches/test_batches_idempotency.py b/tests/integration/batches/test_batches_idempotency.py new file mode 100644 index 000000000..b101bb3dc --- /dev/null +++ b/tests/integration/batches/test_batches_idempotency.py @@ -0,0 +1,91 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +""" +Integration tests for batch idempotency functionality using the OpenAI client library. + +This module tests the idempotency feature in the batches API using the OpenAI-compatible +client interface. These tests verify that the idempotency key (idempotency_key) works correctly +in a real client-server environment. + +Test Categories: +1. Successful Idempotency: Same key returns same batch with identical parameters + - test_idempotent_batch_creation_successful: Verifies that requests with the same + idempotency key return identical batches, even with different metadata order + +2. Conflict Detection: Same key with conflicting parameters raises HTTP 409 errors + - test_idempotency_conflict_with_different_params: Verifies that reusing an idempotency key + with truly conflicting parameters (both file ID and metadata values) raises ConflictError +""" + +import time + +import pytest +from openai import ConflictError + + +class TestBatchesIdempotencyIntegration: + """Integration tests for batch idempotency using OpenAI client.""" + + def test_idempotent_batch_creation_successful(self, openai_client): + """Test that identical requests with same idempotency key return the same batch.""" + batch1 = openai_client.batches.create( + input_file_id="bogus-id", + endpoint="/v1/chat/completions", + completion_window="24h", + metadata={ + "test_type": "idempotency_success", + "purpose": "integration_test", + }, + extra_body={"idempotency_key": "test-idempotency-token-1"}, + ) + + # sleep to ensure different timestamps + time.sleep(1) + + batch2 = openai_client.batches.create( + input_file_id="bogus-id", + endpoint="/v1/chat/completions", + completion_window="24h", + metadata={ + "purpose": "integration_test", + "test_type": "idempotency_success", + }, # Different order + extra_body={"idempotency_key": "test-idempotency-token-1"}, + ) + + assert batch1.id == batch2.id + assert batch1.input_file_id == batch2.input_file_id + assert batch1.endpoint == batch2.endpoint + assert batch1.completion_window == batch2.completion_window + assert batch1.metadata == batch2.metadata + assert batch1.created_at == batch2.created_at + + def test_idempotency_conflict_with_different_params(self, openai_client): + """Test that using same idempotency key with different params raises conflict error.""" + batch1 = openai_client.batches.create( + input_file_id="bogus-id-1", + endpoint="/v1/chat/completions", + completion_window="24h", + metadata={"test_type": "conflict_test_1"}, + extra_body={"idempotency_key": "conflict-token"}, + ) + + with pytest.raises(ConflictError) as exc_info: + openai_client.batches.create( + input_file_id="bogus-id-2", # Different file ID + endpoint="/v1/chat/completions", + completion_window="24h", + metadata={"test_type": "conflict_test_2"}, # Different metadata + extra_body={"idempotency_key": "conflict-token"}, # Same token + ) + + assert exc_info.value.status_code == 409 + assert "conflict" in str(exc_info.value).lower() + + retrieved_batch = openai_client.batches.retrieve(batch1.id) + assert retrieved_batch.id == batch1.id + assert retrieved_batch.input_file_id == "bogus-id-1" diff --git a/tests/unit/providers/batches/conftest.py b/tests/unit/providers/batches/conftest.py new file mode 100644 index 000000000..df37141b5 --- /dev/null +++ b/tests/unit/providers/batches/conftest.py @@ -0,0 +1,54 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +"""Shared fixtures for batches provider unit tests.""" + +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock + +import pytest + +from llama_stack.providers.inline.batches.reference.batches import ReferenceBatchesImpl +from llama_stack.providers.inline.batches.reference.config import ReferenceBatchesImplConfig +from llama_stack.providers.utils.kvstore import kvstore_impl +from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig + + +@pytest.fixture +async def provider(): + """Create a test provider instance with temporary database.""" + with tempfile.TemporaryDirectory() as tmpdir: + db_path = Path(tmpdir) / "test_batches.db" + kvstore_config = SqliteKVStoreConfig(db_path=str(db_path)) + config = ReferenceBatchesImplConfig(kvstore=kvstore_config) + + # Create kvstore and mock APIs + kvstore = await kvstore_impl(config.kvstore) + mock_inference = AsyncMock() + mock_files = AsyncMock() + mock_models = AsyncMock() + + provider = ReferenceBatchesImpl(config, mock_inference, mock_files, mock_models, kvstore) + await provider.initialize() + + # unit tests should not require background processing + provider.process_batches = False + + yield provider + + await provider.shutdown() + + +@pytest.fixture +def sample_batch_data(): + """Sample batch data for testing.""" + return { + "input_file_id": "file_abc123", + "endpoint": "/v1/chat/completions", + "completion_window": "24h", + "metadata": {"test": "true", "priority": "high"}, + } diff --git a/tests/unit/providers/batches/test_reference.py b/tests/unit/providers/batches/test_reference.py index 9fe0cc710..0ca866f7b 100644 --- a/tests/unit/providers/batches/test_reference.py +++ b/tests/unit/providers/batches/test_reference.py @@ -54,60 +54,17 @@ dependencies like inference, files, and models APIs. """ import json -import tempfile -from pathlib import Path from unittest.mock import AsyncMock, MagicMock import pytest from llama_stack.apis.batches import BatchObject from llama_stack.apis.common.errors import ConflictError, ResourceNotFoundError -from llama_stack.providers.inline.batches.reference.batches import ReferenceBatchesImpl -from llama_stack.providers.inline.batches.reference.config import ReferenceBatchesImplConfig -from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig class TestReferenceBatchesImpl: """Test the reference implementation of the Batches API.""" - @pytest.fixture - async def provider(self): - """Create a test provider instance with temporary database.""" - with tempfile.TemporaryDirectory() as tmpdir: - db_path = Path(tmpdir) / "test_batches.db" - kvstore_config = SqliteKVStoreConfig(db_path=str(db_path)) - config = ReferenceBatchesImplConfig(kvstore=kvstore_config) - - # Create kvstore and mock APIs - from unittest.mock import AsyncMock - - from llama_stack.providers.utils.kvstore import kvstore_impl - - kvstore = await kvstore_impl(config.kvstore) - mock_inference = AsyncMock() - mock_files = AsyncMock() - mock_models = AsyncMock() - - provider = ReferenceBatchesImpl(config, mock_inference, mock_files, mock_models, kvstore) - await provider.initialize() - - # unit tests should not require background processing - provider.process_batches = False - - yield provider - - await provider.shutdown() - - @pytest.fixture - def sample_batch_data(self): - """Sample batch data for testing.""" - return { - "input_file_id": "file_abc123", - "endpoint": "/v1/chat/completions", - "completion_window": "24h", - "metadata": {"test": "true", "priority": "high"}, - } - def _validate_batch_type(self, batch, expected_metadata=None): """ Helper function to validate batch object structure and field types. diff --git a/tests/unit/providers/batches/test_reference_idempotency.py b/tests/unit/providers/batches/test_reference_idempotency.py new file mode 100644 index 000000000..e6cb29b9b --- /dev/null +++ b/tests/unit/providers/batches/test_reference_idempotency.py @@ -0,0 +1,128 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the terms described in the LICENSE file in +# the root directory of this source tree. + +""" +Tests for idempotency functionality in the reference batches provider. + +This module tests the optional idempotency feature that allows clients to provide +an idempotency key (idempotency_key) to ensure that repeated requests with the same key +and parameters return the same batch, while requests with the same key but different +parameters result in a conflict error. + +Test Categories: +1. Core Idempotency: Same parameters with same key return same batch +2. Parameter Independence: Different parameters without keys create different batches +3. Conflict Detection: Same key with different parameters raises ConflictError + +Tests by Category: + +1. Core Idempotency: + - test_idempotent_batch_creation_same_params + - test_idempotent_batch_creation_metadata_order_independence + +2. Parameter Independence: + - test_non_idempotent_behavior_without_key + - test_different_idempotency_keys_create_different_batches + +3. Conflict Detection: + - test_same_idempotency_key_different_params_conflict (parametrized: input_file_id, metadata values, metadata None vs {}) + +Key Behaviors Tested: +- Idempotent batch creation when idempotency_key provided with identical parameters +- Metadata order independence for consistent batch ID generation +- Non-idempotent behavior when no idempotency_key provided (random UUIDs) +- Conflict detection for parameter mismatches with same idempotency key +- Deterministic ID generation based solely on idempotency key +- Proper error handling with detailed conflict messages including key and error codes +- Protection against idempotency key reuse with different request parameters +""" + +import asyncio + +import pytest + +from llama_stack.apis.common.errors import ConflictError + + +class TestReferenceBatchesIdempotency: + """Test suite for idempotency functionality in the reference implementation.""" + + async def test_idempotent_batch_creation_same_params(self, provider, sample_batch_data): + """Test that creating batches with identical parameters returns the same batch when idempotency_key is provided.""" + + del sample_batch_data["metadata"] + + batch1 = await provider.create_batch( + **sample_batch_data, + metadata={"test": "value1", "other": "value2"}, + idempotency_key="unique-token-1", + ) + + # sleep for 1 second to allow created_at timestamps to be different + await asyncio.sleep(1) + + batch2 = await provider.create_batch( + **sample_batch_data, + metadata={"other": "value2", "test": "value1"}, # Different order + idempotency_key="unique-token-1", + ) + + assert batch1.id == batch2.id + assert batch1.input_file_id == batch2.input_file_id + assert batch1.metadata == batch2.metadata + assert batch1.created_at == batch2.created_at + + async def test_different_idempotency_keys_create_different_batches(self, provider, sample_batch_data): + """Test that different idempotency keys create different batches even with same params.""" + batch1 = await provider.create_batch( + **sample_batch_data, + idempotency_key="token-A", + ) + + batch2 = await provider.create_batch( + **sample_batch_data, + idempotency_key="token-B", + ) + + assert batch1.id != batch2.id + + async def test_non_idempotent_behavior_without_key(self, provider, sample_batch_data): + """Test that batches without idempotency key create unique batches even with identical parameters.""" + batch1 = await provider.create_batch(**sample_batch_data) + + batch2 = await provider.create_batch(**sample_batch_data) + + assert batch1.id != batch2.id + assert batch1.input_file_id == batch2.input_file_id + assert batch1.endpoint == batch2.endpoint + assert batch1.completion_window == batch2.completion_window + assert batch1.metadata == batch2.metadata + + @pytest.mark.parametrize( + "param_name,first_value,second_value", + [ + ("input_file_id", "file_001", "file_002"), + ("metadata", {"test": "value1"}, {"test": "value2"}), + ("metadata", None, {}), + ], + ) + async def test_same_idempotency_key_different_params_conflict( + self, provider, sample_batch_data, param_name, first_value, second_value + ): + """Test that same idempotency_key with different parameters raises conflict error.""" + sample_batch_data["idempotency_key"] = "same-token" + + sample_batch_data[param_name] = first_value + + batch1 = await provider.create_batch(**sample_batch_data) + + with pytest.raises(ConflictError, match="Idempotency key.*was previously used with different parameters"): + sample_batch_data[param_name] = second_value + await provider.create_batch(**sample_batch_data) + + retrieved_batch = await provider.retrieve_batch(batch1.id) + assert retrieved_batch.id == batch1.id + assert getattr(retrieved_batch, param_name) == first_value