From b3403cdd84a8ecd6f5af7c3b1385f0f5f0f93e9b Mon Sep 17 00:00:00 2001 From: kimbwook Date: Tue, 2 Dec 2025 20:17:19 +0900 Subject: [PATCH] race condition in concurrent file attachment to vector stores --- .../utils/memory/openai_vector_store_mixin.py | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/src/llama_stack/providers/utils/memory/openai_vector_store_mixin.py b/src/llama_stack/providers/utils/memory/openai_vector_store_mixin.py index bbfd60e25..dcf1286c0 100644 --- a/src/llama_stack/providers/utils/memory/openai_vector_store_mixin.py +++ b/src/llama_stack/providers/utils/memory/openai_vector_store_mixin.py @@ -92,6 +92,13 @@ class OpenAIVectorStoreMixin(ABC): self.kvstore = kvstore self._last_file_batch_cleanup_time = 0 self._file_batch_tasks: dict[str, asyncio.Task[None]] = {} + self._vector_store_locks: dict[str, asyncio.Lock] = {} + + def _get_vector_store_lock(self, vector_store_id: str) -> asyncio.Lock: + """Get or create a lock for a specific vector store.""" + if vector_store_id not in self._vector_store_locks: + self._vector_store_locks[vector_store_id] = asyncio.Lock() + return self._vector_store_locks[vector_store_id] async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None: """Save vector store metadata to persistent storage.""" @@ -831,16 +838,18 @@ class OpenAIVectorStoreMixin(ABC): await self._save_openai_vector_store_file(vector_store_id, file_id, file_info, dict_chunks) # Update file_ids and file_counts in vector store metadata - store_info = self.openai_vector_stores[vector_store_id].copy() - store_info["file_ids"].append(file_id) - store_info["file_counts"]["total"] += 1 - store_info["file_counts"][vector_store_file_object.status] += 1 + # Use lock to prevent race condition when multiple files are attached concurrently + async with self._get_vector_store_lock(vector_store_id): + store_info = self.openai_vector_stores[vector_store_id].copy() + # Deep copy file_counts to avoid mutating shared dict + store_info["file_counts"] = store_info["file_counts"].copy() + store_info["file_ids"] = store_info["file_ids"].copy() + store_info["file_ids"].append(file_id) + store_info["file_counts"]["total"] += 1 + store_info["file_counts"][vector_store_file_object.status] += 1 - # Save updated vector store to persistent storage - await self._save_openai_vector_store(vector_store_id, store_info) - - # Update vector store in-memory cache - self.openai_vector_stores[vector_store_id] = store_info + # Save updated vector store to persistent storage + await self._save_openai_vector_store(vector_store_id, store_info) return vector_store_file_object