mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-10-07 12:47:37 +00:00
feat(api): Add vector store file batches api (#3642)
Some checks failed
SqlStore Integration Tests / test-postgres (3.12) (push) Failing after 0s
Integration Auth Tests / test-matrix (oauth2_token) (push) Failing after 1s
Test External Providers Installed via Module / test-external-providers-from-module (venv) (push) Has been skipped
Integration Tests (Replay) / Integration Tests (, , , client=, ) (push) Failing after 2s
Python Package Build Test / build (3.13) (push) Failing after 0s
Python Package Build Test / build (3.12) (push) Failing after 2s
SqlStore Integration Tests / test-postgres (3.13) (push) Failing after 5s
Vector IO Integration Tests / test-matrix (push) Failing after 4s
API Conformance Tests / check-schema-compatibility (push) Successful in 9s
Unit Tests / unit-tests (3.12) (push) Failing after 3s
Test External API and Providers / test-external (venv) (push) Failing after 5s
Unit Tests / unit-tests (3.13) (push) Failing after 3s
UI Tests / ui-tests (22) (push) Successful in 40s
Pre-commit / pre-commit (push) Successful in 1m28s
Some checks failed
SqlStore Integration Tests / test-postgres (3.12) (push) Failing after 0s
Integration Auth Tests / test-matrix (oauth2_token) (push) Failing after 1s
Test External Providers Installed via Module / test-external-providers-from-module (venv) (push) Has been skipped
Integration Tests (Replay) / Integration Tests (, , , client=, ) (push) Failing after 2s
Python Package Build Test / build (3.13) (push) Failing after 0s
Python Package Build Test / build (3.12) (push) Failing after 2s
SqlStore Integration Tests / test-postgres (3.13) (push) Failing after 5s
Vector IO Integration Tests / test-matrix (push) Failing after 4s
API Conformance Tests / check-schema-compatibility (push) Successful in 9s
Unit Tests / unit-tests (3.12) (push) Failing after 3s
Test External API and Providers / test-external (venv) (push) Failing after 5s
Unit Tests / unit-tests (3.13) (push) Failing after 3s
UI Tests / ui-tests (22) (push) Successful in 40s
Pre-commit / pre-commit (push) Successful in 1m28s
# What does this PR do? Add Open AI Compatible vector store file batches api. This functionality is needed to attach many files to a vector store as a batch. https://github.com/llamastack/llama-stack/issues/3533 API Stubs have been merged https://github.com/llamastack/llama-stack/pull/3615 Adds persistence for file batches as discussed in diff https://github.com/llamastack/llama-stack/pull/3544 (Used claude code for generation and reviewed by me) ## Test Plan 1. Unit tests pass 2. Also verified the cc-vec integration with LLamaStackClient works with the file batches api. https://github.com/raghotham/cc-vec 2. Integration tests pass
This commit is contained in:
parent
597d405e13
commit
bba9957edd
37 changed files with 10322 additions and 53 deletions
|
@ -12,6 +12,8 @@ import uuid
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import Any
|
||||
|
||||
from pydantic import TypeAdapter
|
||||
|
||||
from llama_stack.apis.common.errors import VectorStoreNotFoundError
|
||||
from llama_stack.apis.files import Files, OpenAIFileObject
|
||||
from llama_stack.apis.vector_dbs import VectorDB
|
||||
|
@ -50,12 +52,16 @@ logger = get_logger(name=__name__, category="providers::utils")
|
|||
|
||||
# Constants for OpenAI vector stores
|
||||
CHUNK_MULTIPLIER = 5
|
||||
FILE_BATCH_CLEANUP_INTERVAL_SECONDS = 24 * 60 * 60 # 1 day in seconds
|
||||
MAX_CONCURRENT_FILES_PER_BATCH = 3 # Maximum concurrent file processing within a batch
|
||||
FILE_BATCH_CHUNK_SIZE = 10 # Process files in chunks of this size
|
||||
|
||||
VERSION = "v3"
|
||||
VECTOR_DBS_PREFIX = f"vector_dbs:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX = f"openai_vector_stores_file_batches:{VERSION}::"
|
||||
|
||||
|
||||
class OpenAIVectorStoreMixin(ABC):
|
||||
|
@ -65,11 +71,15 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
an openai_vector_stores in-memory cache.
|
||||
"""
|
||||
|
||||
# These should be provided by the implementing class
|
||||
openai_vector_stores: dict[str, dict[str, Any]]
|
||||
files_api: Files | None
|
||||
# KV store for persisting OpenAI vector store metadata
|
||||
kvstore: KVStore | None
|
||||
# Implementing classes should call super().__init__() in their __init__ method
|
||||
# to properly initialize the mixin attributes.
|
||||
def __init__(self, files_api: Files | None = None, kvstore: KVStore | None = None):
|
||||
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
|
||||
self.openai_file_batches: dict[str, dict[str, Any]] = {}
|
||||
self.files_api = files_api
|
||||
self.kvstore = kvstore
|
||||
self._last_file_batch_cleanup_time = 0
|
||||
self._file_batch_tasks: dict[str, asyncio.Task[None]] = {}
|
||||
|
||||
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
|
||||
"""Save vector store metadata to persistent storage."""
|
||||
|
@ -159,9 +169,129 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
for idx in range(len(raw_items)):
|
||||
await self.kvstore.delete(f"{contents_prefix}{idx}")
|
||||
|
||||
async def _save_openai_vector_store_file_batch(self, batch_id: str, batch_info: dict[str, Any]) -> None:
|
||||
"""Save file batch metadata to persistent storage."""
|
||||
assert self.kvstore
|
||||
key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}{batch_id}"
|
||||
await self.kvstore.set(key=key, value=json.dumps(batch_info))
|
||||
# update in-memory cache
|
||||
self.openai_file_batches[batch_id] = batch_info
|
||||
|
||||
async def _load_openai_vector_store_file_batches(self) -> dict[str, dict[str, Any]]:
|
||||
"""Load all file batch metadata from persistent storage."""
|
||||
assert self.kvstore
|
||||
start_key = OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX
|
||||
end_key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}\xff"
|
||||
stored_data = await self.kvstore.values_in_range(start_key, end_key)
|
||||
|
||||
batches: dict[str, dict[str, Any]] = {}
|
||||
for item in stored_data:
|
||||
info = json.loads(item)
|
||||
batches[info["id"]] = info
|
||||
return batches
|
||||
|
||||
async def _delete_openai_vector_store_file_batch(self, batch_id: str) -> None:
|
||||
"""Delete file batch metadata from persistent storage and in-memory cache."""
|
||||
assert self.kvstore
|
||||
key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}{batch_id}"
|
||||
await self.kvstore.delete(key)
|
||||
# remove from in-memory cache
|
||||
self.openai_file_batches.pop(batch_id, None)
|
||||
|
||||
async def _cleanup_expired_file_batches(self) -> None:
|
||||
"""Clean up expired file batches from persistent storage."""
|
||||
assert self.kvstore
|
||||
start_key = OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX
|
||||
end_key = f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}\xff"
|
||||
stored_data = await self.kvstore.values_in_range(start_key, end_key)
|
||||
|
||||
current_time = int(time.time())
|
||||
expired_count = 0
|
||||
|
||||
for item in stored_data:
|
||||
info = json.loads(item)
|
||||
expires_at = info.get("expires_at")
|
||||
if expires_at and current_time > expires_at:
|
||||
logger.info(f"Cleaning up expired file batch: {info['id']}")
|
||||
await self.kvstore.delete(f"{OPENAI_VECTOR_STORES_FILE_BATCHES_PREFIX}{info['id']}")
|
||||
# Remove from in-memory cache if present
|
||||
self.openai_file_batches.pop(info["id"], None)
|
||||
expired_count += 1
|
||||
|
||||
if expired_count > 0:
|
||||
logger.info(f"Cleaned up {expired_count} expired file batches")
|
||||
|
||||
async def _get_completed_files_in_batch(self, vector_store_id: str, file_ids: list[str]) -> set[str]:
|
||||
"""Determine which files in a batch are actually completed by checking vector store file_ids."""
|
||||
if vector_store_id not in self.openai_vector_stores:
|
||||
return set()
|
||||
|
||||
store_info = self.openai_vector_stores[vector_store_id]
|
||||
completed_files = set(file_ids) & set(store_info["file_ids"])
|
||||
return completed_files
|
||||
|
||||
async def _analyze_batch_completion_on_resume(self, batch_id: str, batch_info: dict[str, Any]) -> list[str]:
|
||||
"""Analyze batch completion status and return remaining files to process.
|
||||
|
||||
Returns:
|
||||
List of file IDs that still need processing. Empty list if batch is complete.
|
||||
"""
|
||||
vector_store_id = batch_info["vector_store_id"]
|
||||
all_file_ids = batch_info["file_ids"]
|
||||
|
||||
# Find files that are actually completed
|
||||
completed_files = await self._get_completed_files_in_batch(vector_store_id, all_file_ids)
|
||||
remaining_files = [file_id for file_id in all_file_ids if file_id not in completed_files]
|
||||
|
||||
completed_count = len(completed_files)
|
||||
total_count = len(all_file_ids)
|
||||
remaining_count = len(remaining_files)
|
||||
|
||||
# Update file counts to reflect actual state
|
||||
batch_info["file_counts"] = {
|
||||
"completed": completed_count,
|
||||
"failed": 0, # We don't track failed files during resume - they'll be retried
|
||||
"in_progress": remaining_count,
|
||||
"cancelled": 0,
|
||||
"total": total_count,
|
||||
}
|
||||
|
||||
# If all files are already completed, mark batch as completed
|
||||
if remaining_count == 0:
|
||||
batch_info["status"] = "completed"
|
||||
logger.info(f"Batch {batch_id} is already fully completed, updating status")
|
||||
|
||||
# Save updated batch info
|
||||
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
|
||||
|
||||
return remaining_files
|
||||
|
||||
async def _resume_incomplete_batches(self) -> None:
|
||||
"""Resume processing of incomplete file batches after server restart."""
|
||||
for batch_id, batch_info in self.openai_file_batches.items():
|
||||
if batch_info["status"] == "in_progress":
|
||||
logger.info(f"Analyzing incomplete file batch: {batch_id}")
|
||||
|
||||
remaining_files = await self._analyze_batch_completion_on_resume(batch_id, batch_info)
|
||||
|
||||
# Check if batch is now completed after analysis
|
||||
if batch_info["status"] == "completed":
|
||||
continue
|
||||
|
||||
if remaining_files:
|
||||
logger.info(f"Resuming batch {batch_id} with {len(remaining_files)} remaining files")
|
||||
# Restart the background processing task with only remaining files
|
||||
task = asyncio.create_task(self._process_file_batch_async(batch_id, batch_info, remaining_files))
|
||||
self._file_batch_tasks[batch_id] = task
|
||||
|
||||
async def initialize_openai_vector_stores(self) -> None:
|
||||
"""Load existing OpenAI vector stores into the in-memory cache."""
|
||||
"""Load existing OpenAI vector stores and file batches into the in-memory cache."""
|
||||
self.openai_vector_stores = await self._load_openai_vector_stores()
|
||||
self.openai_file_batches = await self._load_openai_vector_store_file_batches()
|
||||
self._file_batch_tasks = {}
|
||||
# TODO: Resume only works for single worker deployment. Jobs with multiple workers will need to be handled differently.
|
||||
await self._resume_incomplete_batches()
|
||||
self._last_file_batch_cleanup_time = 0
|
||||
|
||||
@abstractmethod
|
||||
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
|
||||
|
@ -570,6 +700,14 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
if vector_store_id not in self.openai_vector_stores:
|
||||
raise VectorStoreNotFoundError(vector_store_id)
|
||||
|
||||
# Check if file is already attached to this vector store
|
||||
store_info = self.openai_vector_stores[vector_store_id]
|
||||
if file_id in store_info["file_ids"]:
|
||||
logger.warning(f"File {file_id} is already attached to vector store {vector_store_id}, skipping")
|
||||
# Return existing file object
|
||||
file_info = await self._load_openai_vector_store_file(vector_store_id, file_id)
|
||||
return VectorStoreFileObject(**file_info)
|
||||
|
||||
attributes = attributes or {}
|
||||
chunking_strategy = chunking_strategy or VectorStoreChunkingStrategyAuto()
|
||||
created_at = int(time.time())
|
||||
|
@ -615,7 +753,6 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
chunk_overlap_tokens,
|
||||
attributes,
|
||||
)
|
||||
|
||||
if not chunks:
|
||||
vector_store_file_object.status = "failed"
|
||||
vector_store_file_object.last_error = VectorStoreFileLastError(
|
||||
|
@ -828,7 +965,230 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
chunking_strategy: VectorStoreChunkingStrategy | None = None,
|
||||
) -> VectorStoreFileBatchObject:
|
||||
"""Create a vector store file batch."""
|
||||
raise NotImplementedError("openai_create_vector_store_file_batch is not implemented yet")
|
||||
if vector_store_id not in self.openai_vector_stores:
|
||||
raise VectorStoreNotFoundError(vector_store_id)
|
||||
|
||||
chunking_strategy = chunking_strategy or VectorStoreChunkingStrategyAuto()
|
||||
|
||||
created_at = int(time.time())
|
||||
batch_id = f"batch_{uuid.uuid4()}"
|
||||
# File batches expire after 7 days
|
||||
expires_at = created_at + (7 * 24 * 60 * 60)
|
||||
|
||||
# Initialize batch file counts - all files start as in_progress
|
||||
file_counts = VectorStoreFileCounts(
|
||||
completed=0,
|
||||
cancelled=0,
|
||||
failed=0,
|
||||
in_progress=len(file_ids),
|
||||
total=len(file_ids),
|
||||
)
|
||||
|
||||
# Create batch object immediately with in_progress status
|
||||
batch_object = VectorStoreFileBatchObject(
|
||||
id=batch_id,
|
||||
created_at=created_at,
|
||||
vector_store_id=vector_store_id,
|
||||
status="in_progress",
|
||||
file_counts=file_counts,
|
||||
)
|
||||
|
||||
batch_info = {
|
||||
**batch_object.model_dump(),
|
||||
"file_ids": file_ids,
|
||||
"attributes": attributes,
|
||||
"chunking_strategy": chunking_strategy.model_dump(),
|
||||
"expires_at": expires_at,
|
||||
}
|
||||
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
|
||||
|
||||
# Start background processing of files
|
||||
task = asyncio.create_task(self._process_file_batch_async(batch_id, batch_info))
|
||||
self._file_batch_tasks[batch_id] = task
|
||||
|
||||
# Run cleanup if needed (throttled to once every 1 day)
|
||||
current_time = int(time.time())
|
||||
if current_time - self._last_file_batch_cleanup_time >= FILE_BATCH_CLEANUP_INTERVAL_SECONDS:
|
||||
logger.info("Running throttled cleanup of expired file batches")
|
||||
asyncio.create_task(self._cleanup_expired_file_batches())
|
||||
self._last_file_batch_cleanup_time = current_time
|
||||
|
||||
return batch_object
|
||||
|
||||
async def _process_files_with_concurrency(
|
||||
self,
|
||||
file_ids: list[str],
|
||||
vector_store_id: str,
|
||||
attributes: dict[str, Any],
|
||||
chunking_strategy_obj: Any,
|
||||
batch_id: str,
|
||||
batch_info: dict[str, Any],
|
||||
) -> None:
|
||||
"""Process files with controlled concurrency and chunking."""
|
||||
semaphore = asyncio.Semaphore(MAX_CONCURRENT_FILES_PER_BATCH)
|
||||
|
||||
async def process_single_file(file_id: str) -> tuple[str, bool]:
|
||||
"""Process a single file with concurrency control."""
|
||||
async with semaphore:
|
||||
try:
|
||||
vector_store_file_object = await self.openai_attach_file_to_vector_store(
|
||||
vector_store_id=vector_store_id,
|
||||
file_id=file_id,
|
||||
attributes=attributes,
|
||||
chunking_strategy=chunking_strategy_obj,
|
||||
)
|
||||
return file_id, vector_store_file_object.status == "completed"
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process file {file_id} in batch {batch_id}: {e}")
|
||||
return file_id, False
|
||||
|
||||
# Process files in chunks to avoid creating too many tasks at once
|
||||
total_files = len(file_ids)
|
||||
for chunk_start in range(0, total_files, FILE_BATCH_CHUNK_SIZE):
|
||||
chunk_end = min(chunk_start + FILE_BATCH_CHUNK_SIZE, total_files)
|
||||
chunk = file_ids[chunk_start:chunk_end]
|
||||
|
||||
chunk_num = chunk_start // FILE_BATCH_CHUNK_SIZE + 1
|
||||
total_chunks = (total_files + FILE_BATCH_CHUNK_SIZE - 1) // FILE_BATCH_CHUNK_SIZE
|
||||
logger.info(
|
||||
f"Processing chunk {chunk_num} of {total_chunks} ({len(chunk)} files, {chunk_start + 1}-{chunk_end} of {total_files} total files)"
|
||||
)
|
||||
|
||||
async with asyncio.TaskGroup() as tg:
|
||||
chunk_tasks = [tg.create_task(process_single_file(file_id)) for file_id in chunk]
|
||||
|
||||
chunk_results = [task.result() for task in chunk_tasks]
|
||||
|
||||
# Update counts after each chunk for progressive feedback
|
||||
for _, success in chunk_results:
|
||||
self._update_file_counts(batch_info, success=success)
|
||||
|
||||
# Save progress after each chunk
|
||||
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
|
||||
|
||||
def _update_file_counts(self, batch_info: dict[str, Any], success: bool) -> None:
|
||||
"""Update file counts based on processing result."""
|
||||
if success:
|
||||
batch_info["file_counts"]["completed"] += 1
|
||||
else:
|
||||
batch_info["file_counts"]["failed"] += 1
|
||||
batch_info["file_counts"]["in_progress"] -= 1
|
||||
|
||||
def _update_batch_status(self, batch_info: dict[str, Any]) -> None:
|
||||
"""Update final batch status based on file processing results."""
|
||||
if batch_info["file_counts"]["failed"] == 0:
|
||||
batch_info["status"] = "completed"
|
||||
elif batch_info["file_counts"]["completed"] == 0:
|
||||
batch_info["status"] = "failed"
|
||||
else:
|
||||
batch_info["status"] = "completed" # Partial success counts as completed
|
||||
|
||||
async def _process_file_batch_async(
|
||||
self,
|
||||
batch_id: str,
|
||||
batch_info: dict[str, Any],
|
||||
override_file_ids: list[str] | None = None,
|
||||
) -> None:
|
||||
"""Process files in a batch asynchronously in the background."""
|
||||
file_ids = override_file_ids if override_file_ids is not None else batch_info["file_ids"]
|
||||
attributes = batch_info["attributes"]
|
||||
chunking_strategy = batch_info["chunking_strategy"]
|
||||
vector_store_id = batch_info["vector_store_id"]
|
||||
chunking_strategy_adapter: TypeAdapter[VectorStoreChunkingStrategy] = TypeAdapter(VectorStoreChunkingStrategy)
|
||||
chunking_strategy_obj = chunking_strategy_adapter.validate_python(chunking_strategy)
|
||||
|
||||
try:
|
||||
# Process all files with controlled concurrency
|
||||
await self._process_files_with_concurrency(
|
||||
file_ids=file_ids,
|
||||
vector_store_id=vector_store_id,
|
||||
attributes=attributes,
|
||||
chunking_strategy_obj=chunking_strategy_obj,
|
||||
batch_id=batch_id,
|
||||
batch_info=batch_info,
|
||||
)
|
||||
|
||||
# Update final batch status
|
||||
self._update_batch_status(batch_info)
|
||||
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
|
||||
|
||||
logger.info(f"File batch {batch_id} processing completed with status: {batch_info['status']}")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"File batch {batch_id} processing was cancelled")
|
||||
# Clean up task reference if it still exists
|
||||
self._file_batch_tasks.pop(batch_id, None)
|
||||
raise # Re-raise to ensure proper cancellation propagation
|
||||
finally:
|
||||
# Always clean up task reference when processing ends
|
||||
self._file_batch_tasks.pop(batch_id, None)
|
||||
|
||||
def _get_and_validate_batch(self, batch_id: str, vector_store_id: str) -> dict[str, Any]:
|
||||
"""Get and validate batch exists and belongs to vector store."""
|
||||
if vector_store_id not in self.openai_vector_stores:
|
||||
raise VectorStoreNotFoundError(vector_store_id)
|
||||
|
||||
if batch_id not in self.openai_file_batches:
|
||||
raise ValueError(f"File batch {batch_id} not found")
|
||||
|
||||
batch_info = self.openai_file_batches[batch_id]
|
||||
|
||||
# Check if batch has expired (read-only check)
|
||||
expires_at = batch_info.get("expires_at")
|
||||
if expires_at:
|
||||
current_time = int(time.time())
|
||||
if current_time > expires_at:
|
||||
raise ValueError(f"File batch {batch_id} has expired after 7 days from creation")
|
||||
|
||||
if batch_info["vector_store_id"] != vector_store_id:
|
||||
raise ValueError(f"File batch {batch_id} does not belong to vector store {vector_store_id}")
|
||||
|
||||
return batch_info
|
||||
|
||||
def _paginate_objects(
|
||||
self,
|
||||
objects: list[Any],
|
||||
limit: int | None = 20,
|
||||
after: str | None = None,
|
||||
before: str | None = None,
|
||||
) -> tuple[list[Any], bool, str | None, str | None]:
|
||||
"""Apply pagination to a list of objects with id fields."""
|
||||
limit = min(limit or 20, 100) # Cap at 100 as per OpenAI
|
||||
|
||||
# Find start index
|
||||
start_idx = 0
|
||||
if after:
|
||||
for i, obj in enumerate(objects):
|
||||
if obj.id == after:
|
||||
start_idx = i + 1
|
||||
break
|
||||
|
||||
# Find end index
|
||||
end_idx = start_idx + limit
|
||||
if before:
|
||||
for i, obj in enumerate(objects[start_idx:], start_idx):
|
||||
if obj.id == before:
|
||||
end_idx = i
|
||||
break
|
||||
|
||||
# Apply pagination
|
||||
paginated_objects = objects[start_idx:end_idx]
|
||||
|
||||
# Determine pagination info
|
||||
has_more = end_idx < len(objects)
|
||||
first_id = paginated_objects[0].id if paginated_objects else None
|
||||
last_id = paginated_objects[-1].id if paginated_objects else None
|
||||
|
||||
return paginated_objects, has_more, first_id, last_id
|
||||
|
||||
async def openai_retrieve_vector_store_file_batch(
|
||||
self,
|
||||
batch_id: str,
|
||||
vector_store_id: str,
|
||||
) -> VectorStoreFileBatchObject:
|
||||
"""Retrieve a vector store file batch."""
|
||||
batch_info = self._get_and_validate_batch(batch_id, vector_store_id)
|
||||
return VectorStoreFileBatchObject(**batch_info)
|
||||
|
||||
async def openai_list_files_in_vector_store_file_batch(
|
||||
self,
|
||||
|
@ -841,15 +1201,39 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
order: str | None = "desc",
|
||||
) -> VectorStoreFilesListInBatchResponse:
|
||||
"""Returns a list of vector store files in a batch."""
|
||||
raise NotImplementedError("openai_list_files_in_vector_store_file_batch is not implemented yet")
|
||||
batch_info = self._get_and_validate_batch(batch_id, vector_store_id)
|
||||
batch_file_ids = batch_info["file_ids"]
|
||||
|
||||
async def openai_retrieve_vector_store_file_batch(
|
||||
self,
|
||||
batch_id: str,
|
||||
vector_store_id: str,
|
||||
) -> VectorStoreFileBatchObject:
|
||||
"""Retrieve a vector store file batch."""
|
||||
raise NotImplementedError("openai_retrieve_vector_store_file_batch is not implemented yet")
|
||||
# Load file objects for files in this batch
|
||||
batch_file_objects = []
|
||||
|
||||
for file_id in batch_file_ids:
|
||||
try:
|
||||
file_info = await self._load_openai_vector_store_file(vector_store_id, file_id)
|
||||
file_object = VectorStoreFileObject(**file_info)
|
||||
|
||||
# Apply status filter if provided
|
||||
if filter and file_object.status != filter:
|
||||
continue
|
||||
|
||||
batch_file_objects.append(file_object)
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not load file {file_id} from batch {batch_id}: {e}")
|
||||
continue
|
||||
|
||||
# Sort by created_at
|
||||
reverse_order = order == "desc"
|
||||
batch_file_objects.sort(key=lambda x: x.created_at, reverse=reverse_order)
|
||||
|
||||
# Apply pagination using helper
|
||||
paginated_files, has_more, first_id, last_id = self._paginate_objects(batch_file_objects, limit, after, before)
|
||||
|
||||
return VectorStoreFilesListInBatchResponse(
|
||||
data=paginated_files,
|
||||
first_id=first_id,
|
||||
last_id=last_id,
|
||||
has_more=has_more,
|
||||
)
|
||||
|
||||
async def openai_cancel_vector_store_file_batch(
|
||||
self,
|
||||
|
@ -857,4 +1241,24 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
vector_store_id: str,
|
||||
) -> VectorStoreFileBatchObject:
|
||||
"""Cancel a vector store file batch."""
|
||||
raise NotImplementedError("openai_cancel_vector_store_file_batch is not implemented yet")
|
||||
batch_info = self._get_and_validate_batch(batch_id, vector_store_id)
|
||||
|
||||
if batch_info["status"] not in ["in_progress"]:
|
||||
raise ValueError(f"Cannot cancel batch {batch_id} with status {batch_info['status']}")
|
||||
|
||||
# Cancel the actual processing task if it exists
|
||||
if batch_id in self._file_batch_tasks:
|
||||
task = self._file_batch_tasks[batch_id]
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
logger.info(f"Cancelled processing task for file batch: {batch_id}")
|
||||
# Remove from task tracking
|
||||
del self._file_batch_tasks[batch_id]
|
||||
|
||||
batch_info["status"] = "cancelled"
|
||||
|
||||
await self._save_openai_vector_store_file_batch(batch_id, batch_info)
|
||||
|
||||
updated_batch = VectorStoreFileBatchObject(**batch_info)
|
||||
|
||||
return updated_batch
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue