clean up clean up

This commit is contained in:
Swapna Lekkala 2025-10-02 14:55:55 -07:00
parent f15b2383e7
commit a5b71c2cc7
8 changed files with 20 additions and 17 deletions

View file

@ -12,6 +12,8 @@ import uuid
from abc import ABC, abstractmethod
from typing import Any
from pydantic import TypeAdapter
from llama_stack.apis.common.errors import VectorStoreNotFoundError
from llama_stack.apis.files import Files, OpenAIFileObject
from llama_stack.apis.vector_dbs import VectorDB
@ -50,6 +52,7 @@ logger = get_logger(name=__name__, category="providers::utils")
# Constants for OpenAI vector stores
CHUNK_MULTIPLIER = 5
FILE_BATCH_CLEANUP_INTERVAL_SECONDS = 24 * 60 * 60 # 1 day in seconds
VERSION = "v3"
VECTOR_DBS_PREFIX = f"vector_dbs:{VERSION}::"
@ -73,7 +76,7 @@ class OpenAIVectorStoreMixin(ABC):
# KV store for persisting OpenAI vector store metadata
kvstore: KVStore | None
# Track last cleanup time to throttle cleanup operations
_last_cleanup_time: int
_last_file_batch_cleanup_time: int
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
"""Save vector store metadata to persistent storage."""
@ -215,16 +218,6 @@ class OpenAIVectorStoreMixin(ABC):
if expired_count > 0:
logger.info(f"Cleaned up {expired_count} expired file batches")
async def _cleanup_expired_file_batches_if_needed(self) -> None:
"""Run cleanup if enough time has passed since the last cleanup."""
current_time = int(time.time())
cleanup_interval = 24 * 60 * 60 # 1 day in seconds
if current_time - self._last_cleanup_time >= cleanup_interval:
logger.info("Running throttled cleanup of expired file batches")
await self._cleanup_expired_file_batches()
self._last_cleanup_time = current_time
async def _resume_incomplete_batches(self) -> None:
"""Resume processing of incomplete file batches after server restart."""
for batch_id, batch_info in self.openai_file_batches.items():
@ -238,7 +231,7 @@ class OpenAIVectorStoreMixin(ABC):
self.openai_vector_stores = await self._load_openai_vector_stores()
self.openai_file_batches = await self._load_openai_vector_store_file_batches()
await self._resume_incomplete_batches()
self._last_cleanup_time = 0
self._last_file_batch_cleanup_time = 0
@abstractmethod
async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None:
@ -945,7 +938,11 @@ class OpenAIVectorStoreMixin(ABC):
asyncio.create_task(self._process_file_batch_async(batch_id, batch_info))
# Run cleanup if needed (throttled to once every 1 day)
asyncio.create_task(self._cleanup_expired_file_batches_if_needed())
current_time = int(time.time())
if current_time - self._last_file_batch_cleanup_time >= FILE_BATCH_CLEANUP_INTERVAL_SECONDS:
logger.info("Running throttled cleanup of expired file batches")
asyncio.create_task(self._cleanup_expired_file_batches())
self._last_file_batch_cleanup_time = current_time
return batch_object
@ -962,11 +959,10 @@ class OpenAIVectorStoreMixin(ABC):
for file_id in file_ids:
try:
chunking_strategy_obj = (
VectorStoreChunkingStrategyStatic(**chunking_strategy)
if chunking_strategy.get("type") == "static"
else VectorStoreChunkingStrategyAuto(**chunking_strategy)
chunking_strategy_adapter: TypeAdapter[VectorStoreChunkingStrategy] = TypeAdapter(
VectorStoreChunkingStrategy
)
chunking_strategy_obj = chunking_strategy_adapter.validate_python(chunking_strategy)
await self.openai_attach_file_to_vector_store(
vector_store_id=vector_store_id,
file_id=file_id,