mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-03 01:48:05 +00:00
race condition in concurrent file attachment to vector stores
This commit is contained in:
parent
1de6d49064
commit
b3403cdd84
1 changed files with 18 additions and 9 deletions
|
|
@ -92,6 +92,13 @@ class OpenAIVectorStoreMixin(ABC):
|
||||||
self.kvstore = kvstore
|
self.kvstore = kvstore
|
||||||
self._last_file_batch_cleanup_time = 0
|
self._last_file_batch_cleanup_time = 0
|
||||||
self._file_batch_tasks: dict[str, asyncio.Task[None]] = {}
|
self._file_batch_tasks: dict[str, asyncio.Task[None]] = {}
|
||||||
|
self._vector_store_locks: dict[str, asyncio.Lock] = {}
|
||||||
|
|
||||||
|
def _get_vector_store_lock(self, vector_store_id: str) -> asyncio.Lock:
|
||||||
|
"""Get or create a lock for a specific vector store."""
|
||||||
|
if vector_store_id not in self._vector_store_locks:
|
||||||
|
self._vector_store_locks[vector_store_id] = asyncio.Lock()
|
||||||
|
return self._vector_store_locks[vector_store_id]
|
||||||
|
|
||||||
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
|
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
|
||||||
"""Save vector store metadata to persistent storage."""
|
"""Save vector store metadata to persistent storage."""
|
||||||
|
|
@ -831,16 +838,18 @@ class OpenAIVectorStoreMixin(ABC):
|
||||||
await self._save_openai_vector_store_file(vector_store_id, file_id, file_info, dict_chunks)
|
await self._save_openai_vector_store_file(vector_store_id, file_id, file_info, dict_chunks)
|
||||||
|
|
||||||
# Update file_ids and file_counts in vector store metadata
|
# Update file_ids and file_counts in vector store metadata
|
||||||
store_info = self.openai_vector_stores[vector_store_id].copy()
|
# Use lock to prevent race condition when multiple files are attached concurrently
|
||||||
store_info["file_ids"].append(file_id)
|
async with self._get_vector_store_lock(vector_store_id):
|
||||||
store_info["file_counts"]["total"] += 1
|
store_info = self.openai_vector_stores[vector_store_id].copy()
|
||||||
store_info["file_counts"][vector_store_file_object.status] += 1
|
# Deep copy file_counts to avoid mutating shared dict
|
||||||
|
store_info["file_counts"] = store_info["file_counts"].copy()
|
||||||
|
store_info["file_ids"] = store_info["file_ids"].copy()
|
||||||
|
store_info["file_ids"].append(file_id)
|
||||||
|
store_info["file_counts"]["total"] += 1
|
||||||
|
store_info["file_counts"][vector_store_file_object.status] += 1
|
||||||
|
|
||||||
# Save updated vector store to persistent storage
|
# Save updated vector store to persistent storage
|
||||||
await self._save_openai_vector_store(vector_store_id, store_info)
|
await self._save_openai_vector_store(vector_store_id, store_info)
|
||||||
|
|
||||||
# Update vector store in-memory cache
|
|
||||||
self.openai_vector_stores[vector_store_id] = store_info
|
|
||||||
|
|
||||||
return vector_store_file_object
|
return vector_store_file_object
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue