From 3d901178914fefcedeccb8467aaed014fa8b637a Mon Sep 17 00:00:00 2001 From: Ashwin Bharambe Date: Tue, 12 Aug 2025 16:15:53 -0700 Subject: [PATCH] chore(tests): fix responses and vector_io tests (#3119) Some fixes to MCP tests. And a bunch of fixes for Vector providers. I also enabled a bunch of Vector IO tests to be used with `LlamaStackLibraryClient` ## Test Plan Run Responses tests with llama stack library client: ``` pytest -s -v tests/integration/non_ci/responses/ --stack-config=server:starter \ --text-model openai/gpt-4o \ --embedding-model=sentence-transformers/all-MiniLM-L6-v2 \ -k "client_with_models" ``` Do the same with `-k openai_client` The rest should be taken care of by CI. --- .github/actions/setup-runner/action.yml | 2 +- .github/workflows/integration-tests.yml | 3 +- .../workflows/integration-vector-io-tests.yml | 4 +- llama_stack/core/build.py | 2 +- llama_stack/core/routers/inference.py | 3 +- llama_stack/core/routing_tables/models.py | 2 + llama_stack/log.py | 1 + .../models/llama/llama3/chat_format.py | 1 + .../agents/meta_reference/openai_responses.py | 4 ++ .../providers/inline/vector_io/faiss/faiss.py | 19 ++++--- .../inline/vector_io/sqlite_vec/sqlite_vec.py | 25 +++++---- llama_stack/providers/registry/vector_io.py | 4 ++ .../remote/inference/fireworks/fireworks.py | 3 + .../remote/vector_io/chroma/chroma.py | 17 ++++-- .../remote/vector_io/milvus/milvus.py | 16 +++--- .../remote/vector_io/pgvector/pgvector.py | 12 ++-- .../remote/vector_io/qdrant/qdrant.py | 18 +++--- .../remote/vector_io/weaviate/weaviate.py | 11 ++-- .../utils/memory/openai_vector_store_mixin.py | 32 ++++++++--- .../providers/utils/memory/vector_store.py | 15 ++++- tests/common/mcp.py | 12 +--- tests/integration/fixtures/common.py | 2 +- .../fixtures/test_cases/responses.yaml | 6 +- .../non_ci/responses/test_responses.py | 17 ++++-- .../vector_io/test_openai_vector_stores.py | 56 ++++++++----------- 25 files changed, 175 insertions(+), 112 deletions(-) diff --git a/.github/actions/setup-runner/action.yml b/.github/actions/setup-runner/action.yml index 0be999fe2..1ca02bbff 100644 --- a/.github/actions/setup-runner/action.yml +++ b/.github/actions/setup-runner/action.yml @@ -28,7 +28,7 @@ runs: # Install llama-stack-client-python based on the client-version input if [ "${{ inputs.client-version }}" = "latest" ]; then echo "Installing latest llama-stack-client-python from main branch" - uv pip install git+https://github.com/meta-llama/llama-stack-client-python.git@main + uv pip install git+https://github.com/llamastack/llama-stack-client-python.git@main elif [ "${{ inputs.client-version }}" = "published" ]; then echo "Installing published llama-stack-client-python from PyPI" uv pip install llama-stack-client diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index a38d4971a..9ef49fba3 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -52,7 +52,8 @@ jobs: run: | # Get test directories dynamically, excluding non-test directories # NOTE: we are excluding post_training since the tests take too long - TEST_TYPES=$(find tests/integration -maxdepth 1 -mindepth 1 -type d -printf "%f\n" | + TEST_TYPES=$(find tests/integration -maxdepth 1 -mindepth 1 -type d | + sed 's|tests/integration/||' | grep -Ev "^(__pycache__|fixtures|test_cases|recordings|non_ci|post_training)$" | sort | jq -R -s -c 'split("\n")[:-1]') echo "test-types=$TEST_TYPES" >> $GITHUB_OUTPUT diff --git a/.github/workflows/integration-vector-io-tests.yml b/.github/workflows/integration-vector-io-tests.yml index aa239572b..f4d28e407 100644 --- a/.github/workflows/integration-vector-io-tests.yml +++ b/.github/workflows/integration-vector-io-tests.yml @@ -164,9 +164,9 @@ jobs: ENABLE_WEAVIATE: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'true' || '' }} WEAVIATE_CLUSTER_URL: ${{ matrix.vector-io-provider == 'remote::weaviate' && 'localhost:8080' || '' }} run: | - uv run pytest -sv --stack-config="inference=inline::sentence-transformers,vector_io=${{ matrix.vector-io-provider }}" \ + uv run pytest -sv --stack-config="files=inline::localfs,inference=inline::sentence-transformers,vector_io=${{ matrix.vector-io-provider }}" \ tests/integration/vector_io \ - --embedding-model sentence-transformers/all-MiniLM-L6-v2 + --embedding-model inline::sentence-transformers/all-MiniLM-L6-v2 - name: Check Storage and Memory Available After Tests if: ${{ always() }} diff --git a/llama_stack/core/build.py b/llama_stack/core/build.py index b3e35ecef..4b20588fd 100644 --- a/llama_stack/core/build.py +++ b/llama_stack/core/build.py @@ -91,7 +91,7 @@ def get_provider_dependencies( def print_pip_install_help(config: BuildConfig): - normal_deps, special_deps = get_provider_dependencies(config) + normal_deps, special_deps, _ = get_provider_dependencies(config) cprint( f"Please install needed dependencies using the following commands:\n\nuv pip install {' '.join(normal_deps)}", diff --git a/llama_stack/core/routers/inference.py b/llama_stack/core/routers/inference.py index 52581cc9d..6a3f07247 100644 --- a/llama_stack/core/routers/inference.py +++ b/llama_stack/core/routers/inference.py @@ -65,7 +65,7 @@ from llama_stack.providers.datatypes import HealthResponse, HealthStatus, Routin from llama_stack.providers.utils.inference.inference_store import InferenceStore from llama_stack.providers.utils.telemetry.tracing import get_current_span -logger = get_logger(name=__name__, category="core") +logger = get_logger(name=__name__, category="inference") class InferenceRouter(Inference): @@ -854,4 +854,5 @@ class InferenceRouter(Inference): model=model.identifier, object="chat.completion", ) + logger.debug(f"InferenceRouter.completion_response: {final_response}") await self.store.store_chat_completion(final_response, messages) diff --git a/llama_stack/core/routing_tables/models.py b/llama_stack/core/routing_tables/models.py index c76619271..34c431e00 100644 --- a/llama_stack/core/routing_tables/models.py +++ b/llama_stack/core/routing_tables/models.py @@ -63,6 +63,8 @@ class ModelsRoutingTable(CommonRoutingTableImpl, Models): async def get_provider_impl(self, model_id: str) -> Any: model = await lookup_model(self, model_id) + if model.provider_id not in self.impls_by_provider_id: + raise ValueError(f"Provider {model.provider_id} not found in the routing table") return self.impls_by_provider_id[model.provider_id] async def register_model( diff --git a/llama_stack/log.py b/llama_stack/log.py index 0a2d63ef6..7507aface 100644 --- a/llama_stack/log.py +++ b/llama_stack/log.py @@ -32,6 +32,7 @@ CATEGORIES = [ "tools", "client", "telemetry", + "openai_responses", ] # Initialize category levels with default level diff --git a/llama_stack/models/llama/llama3/chat_format.py b/llama_stack/models/llama/llama3/chat_format.py index 0a973cf0c..1f88a1699 100644 --- a/llama_stack/models/llama/llama3/chat_format.py +++ b/llama_stack/models/llama/llama3/chat_format.py @@ -236,6 +236,7 @@ class ChatFormat: arguments_json=json.dumps(tool_arguments), ) ) + content = "" return RawMessage( role="assistant", diff --git a/llama_stack/providers/inline/agents/meta_reference/openai_responses.py b/llama_stack/providers/inline/agents/meta_reference/openai_responses.py index b98ca114f..347954908 100644 --- a/llama_stack/providers/inline/agents/meta_reference/openai_responses.py +++ b/llama_stack/providers/inline/agents/meta_reference/openai_responses.py @@ -488,8 +488,12 @@ class OpenAIResponsesImpl: # Convert collected chunks to complete response if chat_response_tool_calls: tool_calls = [chat_response_tool_calls[i] for i in sorted(chat_response_tool_calls.keys())] + + # when there are tool calls, we need to clear the content + chat_response_content = [] else: tool_calls = None + assistant_message = OpenAIAssistantMessageParam( content="".join(chat_response_content), tool_calls=tool_calls, diff --git a/llama_stack/providers/inline/vector_io/faiss/faiss.py b/llama_stack/providers/inline/vector_io/faiss/faiss.py index 5a063592c..af61da59b 100644 --- a/llama_stack/providers/inline/vector_io/faiss/faiss.py +++ b/llama_stack/providers/inline/vector_io/faiss/faiss.py @@ -33,6 +33,7 @@ from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.kvstore.api import KVStore from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.vector_store import ( + ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex, ) @@ -128,11 +129,12 @@ class FaissIndex(EmbeddingIndex): # Save updated index await self._save_index() - async def delete_chunk(self, chunk_id: str) -> None: - if chunk_id not in self.chunk_ids: + async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None: + chunk_ids = [c.chunk_id for c in chunks_for_deletion] + if not set(chunk_ids).issubset(self.chunk_ids): return - async with self.chunk_id_lock: + def remove_chunk(chunk_id: str): index = self.chunk_ids.index(chunk_id) self.index.remove_ids(np.array([index])) @@ -146,6 +148,10 @@ class FaissIndex(EmbeddingIndex): self.chunk_by_index = new_chunk_by_index self.chunk_ids.pop(index) + async with self.chunk_id_lock: + for chunk_id in chunk_ids: + remove_chunk(chunk_id) + await self._save_index() async def query_vector( @@ -297,8 +303,7 @@ class FaissVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPr return await index.query_chunks(query, params) - async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None: - """Delete a chunk from a faiss index""" + async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: + """Delete chunks from a faiss index""" faiss_index = self.cache[store_id].index - for chunk_id in chunk_ids: - await faiss_index.delete_chunk(chunk_id) + await faiss_index.delete_chunks(chunks_for_deletion) diff --git a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py index 1fff7b484..cc1982f3b 100644 --- a/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py +++ b/llama_stack/providers/inline/vector_io/sqlite_vec/sqlite_vec.py @@ -31,6 +31,7 @@ from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIV from llama_stack.providers.utils.memory.vector_store import ( RERANKER_TYPE_RRF, RERANKER_TYPE_WEIGHTED, + ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex, ) @@ -426,34 +427,36 @@ class SQLiteVecIndex(EmbeddingIndex): return QueryChunksResponse(chunks=chunks, scores=scores) - async def delete_chunk(self, chunk_id: str) -> None: + async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None: """Remove a chunk from the SQLite vector store.""" + chunk_ids = [c.chunk_id for c in chunks_for_deletion] - def _delete_chunk(): + def _delete_chunks(): connection = _create_sqlite_connection(self.db_path) cur = connection.cursor() try: cur.execute("BEGIN TRANSACTION") # Delete from metadata table - cur.execute(f"DELETE FROM {self.metadata_table} WHERE id = ?", (chunk_id,)) + placeholders = ",".join("?" * len(chunk_ids)) + cur.execute(f"DELETE FROM {self.metadata_table} WHERE id IN ({placeholders})", chunk_ids) # Delete from vector table - cur.execute(f"DELETE FROM {self.vector_table} WHERE id = ?", (chunk_id,)) + cur.execute(f"DELETE FROM {self.vector_table} WHERE id IN ({placeholders})", chunk_ids) # Delete from FTS table - cur.execute(f"DELETE FROM {self.fts_table} WHERE id = ?", (chunk_id,)) + cur.execute(f"DELETE FROM {self.fts_table} WHERE id IN ({placeholders})", chunk_ids) connection.commit() except Exception as e: connection.rollback() - logger.error(f"Error deleting chunk {chunk_id}: {e}") + logger.error(f"Error deleting chunks: {e}") raise finally: cur.close() connection.close() - await asyncio.to_thread(_delete_chunk) + await asyncio.to_thread(_delete_chunks) class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate): @@ -551,12 +554,10 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc raise VectorStoreNotFoundError(vector_db_id) return await index.query_chunks(query, params) - async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None: - """Delete a chunk from a sqlite_vec index.""" + async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: + """Delete chunks from a sqlite_vec index.""" index = await self._get_and_cache_vector_db_index(store_id) if not index: raise VectorStoreNotFoundError(store_id) - for chunk_id in chunk_ids: - # Use the index's delete_chunk method - await index.index.delete_chunk(chunk_id) + await index.index.delete_chunks(chunks_for_deletion) diff --git a/llama_stack/providers/registry/vector_io.py b/llama_stack/providers/registry/vector_io.py index ed170b508..70148eb15 100644 --- a/llama_stack/providers/registry/vector_io.py +++ b/llama_stack/providers/registry/vector_io.py @@ -342,6 +342,7 @@ See [Chroma's documentation](https://docs.trychroma.com/docs/overview/introducti """, ), api_dependencies=[Api.inference], + optional_api_dependencies=[Api.files], ), InlineProviderSpec( api=Api.vector_io, @@ -350,6 +351,7 @@ See [Chroma's documentation](https://docs.trychroma.com/docs/overview/introducti module="llama_stack.providers.inline.vector_io.chroma", config_class="llama_stack.providers.inline.vector_io.chroma.ChromaVectorIOConfig", api_dependencies=[Api.inference], + optional_api_dependencies=[Api.files], description=""" [Chroma](https://www.trychroma.com/) is an inline and remote vector database provider for Llama Stack. It allows you to store and query vectors directly within a Chroma database. @@ -464,6 +466,7 @@ See [Weaviate's documentation](https://weaviate.io/developers/weaviate) for more """, ), api_dependencies=[Api.inference], + optional_api_dependencies=[Api.files], ), InlineProviderSpec( api=Api.vector_io, @@ -731,6 +734,7 @@ For more details on TLS configuration, refer to the [TLS setup guide](https://mi """, ), api_dependencies=[Api.inference], + optional_api_dependencies=[Api.files], ), InlineProviderSpec( api=Api.vector_io, diff --git a/llama_stack/providers/remote/inference/fireworks/fireworks.py b/llama_stack/providers/remote/inference/fireworks/fireworks.py index ca4c7b578..bd86f7238 100644 --- a/llama_stack/providers/remote/inference/fireworks/fireworks.py +++ b/llama_stack/providers/remote/inference/fireworks/fireworks.py @@ -235,6 +235,7 @@ class FireworksInferenceAdapter(ModelRegistryHelper, Inference, NeedsRequestProv llama_model = self.get_llama_model(request.model) if isinstance(request, ChatCompletionRequest): + # TODO: tools are never added to the request, so we need to add them here if media_present or not llama_model: input_dict["messages"] = [ await convert_message_to_openai_dict(m, download=True) for m in request.messages @@ -378,6 +379,7 @@ class FireworksInferenceAdapter(ModelRegistryHelper, Inference, NeedsRequestProv # Fireworks chat completions OpenAI-compatible API does not support # tool calls properly. llama_model = self.get_llama_model(model_obj.provider_resource_id) + if llama_model: return await OpenAIChatCompletionToLlamaStackMixin.openai_chat_completion( self, @@ -431,4 +433,5 @@ class FireworksInferenceAdapter(ModelRegistryHelper, Inference, NeedsRequestProv user=user, ) + logger.debug(f"fireworks params: {params}") return await self._get_openai_client().chat.completions.create(model=model_obj.provider_resource_id, **params) diff --git a/llama_stack/providers/remote/vector_io/chroma/chroma.py b/llama_stack/providers/remote/vector_io/chroma/chroma.py index 26aeaedfb..8f252711b 100644 --- a/llama_stack/providers/remote/vector_io/chroma/chroma.py +++ b/llama_stack/providers/remote/vector_io/chroma/chroma.py @@ -26,6 +26,7 @@ from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.kvstore.api import KVStore from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.vector_store import ( + ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex, ) @@ -115,8 +116,10 @@ class ChromaIndex(EmbeddingIndex): ) -> QueryChunksResponse: raise NotImplementedError("Keyword search is not supported in Chroma") - async def delete_chunk(self, chunk_id: str) -> None: - raise NotImplementedError("delete_chunk is not supported in Chroma") + async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None: + """Delete a single chunk from the Chroma collection by its ID.""" + ids = [f"{chunk.document_id}:{chunk.chunk_id}" for chunk in chunks_for_deletion] + await maybe_await(self.collection.delete(ids=ids)) async def query_hybrid( self, @@ -144,6 +147,7 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP self.cache = {} self.kvstore: KVStore | None = None self.vector_db_store = None + self.files_api = files_api async def initialize(self) -> None: self.kvstore = await kvstore_impl(self.config.kvstore) @@ -227,5 +231,10 @@ class ChromaVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP self.cache[vector_db_id] = index return index - async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None: - raise NotImplementedError("OpenAI Vector Stores API is not supported in Chroma") + async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: + """Delete chunks from a Chroma vector store.""" + index = await self._get_and_cache_vector_db_index(store_id) + if not index: + raise ValueError(f"Vector DB {store_id} not found") + + await index.index.delete_chunks(chunks_for_deletion) diff --git a/llama_stack/providers/remote/vector_io/milvus/milvus.py b/llama_stack/providers/remote/vector_io/milvus/milvus.py index b09edb65c..0eaae81b3 100644 --- a/llama_stack/providers/remote/vector_io/milvus/milvus.py +++ b/llama_stack/providers/remote/vector_io/milvus/milvus.py @@ -28,6 +28,7 @@ from llama_stack.providers.utils.kvstore.api import KVStore from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.vector_store import ( RERANKER_TYPE_WEIGHTED, + ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex, ) @@ -287,14 +288,17 @@ class MilvusIndex(EmbeddingIndex): return QueryChunksResponse(chunks=filtered_chunks, scores=filtered_scores) - async def delete_chunk(self, chunk_id: str) -> None: + async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None: """Remove a chunk from the Milvus collection.""" + chunk_ids = [c.chunk_id for c in chunks_for_deletion] try: + # Use IN clause with square brackets and single quotes for VARCHAR field + chunk_ids_str = ", ".join(f"'{chunk_id}'" for chunk_id in chunk_ids) await asyncio.to_thread( - self.client.delete, collection_name=self.collection_name, filter=f'chunk_id == "{chunk_id}"' + self.client.delete, collection_name=self.collection_name, filter=f"chunk_id in [{chunk_ids_str}]" ) except Exception as e: - logger.error(f"Error deleting chunk {chunk_id} from Milvus collection {self.collection_name}: {e}") + logger.error(f"Error deleting chunks from Milvus collection {self.collection_name}: {e}") raise @@ -420,12 +424,10 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP return await index.query_chunks(query, params) - async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None: + async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: """Delete a chunk from a milvus vector store.""" index = await self._get_and_cache_vector_db_index(store_id) if not index: raise VectorStoreNotFoundError(store_id) - for chunk_id in chunk_ids: - # Use the index's delete_chunk method - await index.index.delete_chunk(chunk_id) + await index.index.delete_chunks(chunks_for_deletion) diff --git a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py index b1645ac5a..d2a5d910b 100644 --- a/llama_stack/providers/remote/vector_io/pgvector/pgvector.py +++ b/llama_stack/providers/remote/vector_io/pgvector/pgvector.py @@ -27,6 +27,7 @@ from llama_stack.providers.utils.kvstore import kvstore_impl from llama_stack.providers.utils.kvstore.api import KVStore from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.vector_store import ( + ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex, ) @@ -163,10 +164,11 @@ class PGVectorIndex(EmbeddingIndex): with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: cur.execute(f"DROP TABLE IF EXISTS {self.table_name}") - async def delete_chunk(self, chunk_id: str) -> None: + async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None: """Remove a chunk from the PostgreSQL table.""" + chunk_ids = [c.chunk_id for c in chunks_for_deletion] with self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: - cur.execute(f"DELETE FROM {self.table_name} WHERE id = %s", (chunk_id,)) + cur.execute(f"DELETE FROM {self.table_name} WHERE id = ANY(%s)", (chunk_ids,)) class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolPrivate): @@ -275,12 +277,10 @@ class PGVectorVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoco self.cache[vector_db_id] = VectorDBWithIndex(vector_db, index, self.inference_api) return self.cache[vector_db_id] - async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None: + async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: """Delete a chunk from a PostgreSQL vector store.""" index = await self._get_and_cache_vector_db_index(store_id) if not index: raise VectorStoreNotFoundError(store_id) - for chunk_id in chunk_ids: - # Use the index's delete_chunk method - await index.index.delete_chunk(chunk_id) + await index.index.delete_chunks(chunks_for_deletion) diff --git a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py index 144da0f4f..018015780 100644 --- a/llama_stack/providers/remote/vector_io/qdrant/qdrant.py +++ b/llama_stack/providers/remote/vector_io/qdrant/qdrant.py @@ -29,6 +29,7 @@ from llama_stack.providers.inline.vector_io.qdrant import QdrantVectorIOConfig a from llama_stack.providers.utils.kvstore import KVStore, kvstore_impl from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin from llama_stack.providers.utils.memory.vector_store import ( + ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex, ) @@ -88,15 +89,16 @@ class QdrantIndex(EmbeddingIndex): await self.client.upsert(collection_name=self.collection_name, points=points) - async def delete_chunk(self, chunk_id: str) -> None: + async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None: """Remove a chunk from the Qdrant collection.""" + chunk_ids = [convert_id(c.chunk_id) for c in chunks_for_deletion] try: await self.client.delete( collection_name=self.collection_name, - points_selector=models.PointIdsList(points=[convert_id(chunk_id)]), + points_selector=models.PointIdsList(points=chunk_ids), ) except Exception as e: - log.error(f"Error deleting chunk {chunk_id} from Qdrant collection {self.collection_name}: {e}") + log.error(f"Error deleting chunks from Qdrant collection {self.collection_name}: {e}") raise async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse: @@ -264,12 +266,14 @@ class QdrantVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP ) -> VectorStoreFileObject: # Qdrant doesn't allow multiple clients to access the same storage path simultaneously. async with self._qdrant_lock: - await super().openai_attach_file_to_vector_store(vector_store_id, file_id, attributes, chunking_strategy) + return await super().openai_attach_file_to_vector_store( + vector_store_id, file_id, attributes, chunking_strategy + ) - async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None: + async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: """Delete chunks from a Qdrant vector store.""" index = await self._get_and_cache_vector_db_index(store_id) if not index: raise ValueError(f"Vector DB {store_id} not found") - for chunk_id in chunk_ids: - await index.index.delete_chunk(chunk_id) + + await index.index.delete_chunks(chunks_for_deletion) diff --git a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py index 11da8902c..966724848 100644 --- a/llama_stack/providers/remote/vector_io/weaviate/weaviate.py +++ b/llama_stack/providers/remote/vector_io/weaviate/weaviate.py @@ -26,6 +26,7 @@ from llama_stack.providers.utils.memory.openai_vector_store_mixin import ( OpenAIVectorStoreMixin, ) from llama_stack.providers.utils.memory.vector_store import ( + ChunkForDeletion, EmbeddingIndex, VectorDBWithIndex, ) @@ -67,6 +68,7 @@ class WeaviateIndex(EmbeddingIndex): data_objects.append( wvc.data.DataObject( properties={ + "chunk_id": chunk.chunk_id, "chunk_content": chunk.model_dump_json(), }, vector=embeddings[i].tolist(), @@ -79,10 +81,11 @@ class WeaviateIndex(EmbeddingIndex): # TODO: make this async friendly collection.data.insert_many(data_objects) - async def delete_chunk(self, chunk_id: str) -> None: + async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]) -> None: sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True) collection = self.client.collections.get(sanitized_collection_name) - collection.data.delete_many(where=Filter.by_property("id").contains_any([chunk_id])) + chunk_ids = [chunk.chunk_id for chunk in chunks_for_deletion] + collection.data.delete_many(where=Filter.by_property("chunk_id").contains_any(chunk_ids)) async def query_vector(self, embedding: NDArray, k: int, score_threshold: float) -> QueryChunksResponse: sanitized_collection_name = sanitize_collection_name(self.collection_name, weaviate_format=True) @@ -307,10 +310,10 @@ class WeaviateVectorIOAdapter( return await index.query_chunks(query, params) - async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None: + async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: sanitized_collection_name = sanitize_collection_name(store_id, weaviate_format=True) index = await self._get_and_cache_vector_db_index(sanitized_collection_name) if not index: raise ValueError(f"Vector DB {sanitized_collection_name} not found") - await index.delete(chunk_ids) + await index.index.delete_chunks(chunks_for_deletion) diff --git a/llama_stack/providers/utils/memory/openai_vector_store_mixin.py b/llama_stack/providers/utils/memory/openai_vector_store_mixin.py index 7b6e69df1..120d0d4fc 100644 --- a/llama_stack/providers/utils/memory/openai_vector_store_mixin.py +++ b/llama_stack/providers/utils/memory/openai_vector_store_mixin.py @@ -6,7 +6,6 @@ import asyncio import json -import logging import mimetypes import time import uuid @@ -37,10 +36,15 @@ from llama_stack.apis.vector_io import ( VectorStoreSearchResponse, VectorStoreSearchResponsePage, ) +from llama_stack.log import get_logger from llama_stack.providers.utils.kvstore.api import KVStore -from llama_stack.providers.utils.memory.vector_store import content_from_data_and_mime_type, make_overlapped_chunks +from llama_stack.providers.utils.memory.vector_store import ( + ChunkForDeletion, + content_from_data_and_mime_type, + make_overlapped_chunks, +) -logger = logging.getLogger(__name__) +logger = get_logger(__name__, category="vector_io") # Constants for OpenAI vector stores CHUNK_MULTIPLIER = 5 @@ -154,8 +158,8 @@ class OpenAIVectorStoreMixin(ABC): self.openai_vector_stores = await self._load_openai_vector_stores() @abstractmethod - async def delete_chunks(self, store_id: str, chunk_ids: list[str]) -> None: - """Delete a chunk from a vector store.""" + async def delete_chunks(self, store_id: str, chunks_for_deletion: list[ChunkForDeletion]) -> None: + """Delete chunks from a vector store.""" pass @abstractmethod @@ -614,7 +618,7 @@ class OpenAIVectorStoreMixin(ABC): ) vector_store_file_object.status = "completed" except Exception as e: - logger.error(f"Error attaching file to vector store: {e}") + logger.exception("Error attaching file to vector store") vector_store_file_object.status = "failed" vector_store_file_object.last_error = VectorStoreFileLastError( code="server_error", @@ -767,7 +771,21 @@ class OpenAIVectorStoreMixin(ABC): dict_chunks = await self._load_openai_vector_store_file_contents(vector_store_id, file_id) chunks = [Chunk.model_validate(c) for c in dict_chunks] - await self.delete_chunks(vector_store_id, [str(c.chunk_id) for c in chunks if c.chunk_id]) + + # Create ChunkForDeletion objects with both chunk_id and document_id + chunks_for_deletion = [] + for c in chunks: + if c.chunk_id: + document_id = c.metadata.get("document_id") or ( + c.chunk_metadata.document_id if c.chunk_metadata else None + ) + if document_id: + chunks_for_deletion.append(ChunkForDeletion(chunk_id=str(c.chunk_id), document_id=document_id)) + else: + logger.warning(f"Chunk {c.chunk_id} has no document_id, skipping deletion") + + if chunks_for_deletion: + await self.delete_chunks(vector_store_id, chunks_for_deletion) store_info = self.openai_vector_stores[vector_store_id].copy() diff --git a/llama_stack/providers/utils/memory/vector_store.py b/llama_stack/providers/utils/memory/vector_store.py index bb9002f30..6ae5bb521 100644 --- a/llama_stack/providers/utils/memory/vector_store.py +++ b/llama_stack/providers/utils/memory/vector_store.py @@ -16,6 +16,7 @@ from urllib.parse import unquote import httpx import numpy as np from numpy.typing import NDArray +from pydantic import BaseModel from llama_stack.apis.common.content_types import ( URL, @@ -34,6 +35,18 @@ from llama_stack.providers.utils.vector_io.vector_utils import generate_chunk_id log = logging.getLogger(__name__) + +class ChunkForDeletion(BaseModel): + """Information needed to delete a chunk from a vector store. + + :param chunk_id: The ID of the chunk to delete + :param document_id: The ID of the document this chunk belongs to + """ + + chunk_id: str + document_id: str + + # Constants for reranker types RERANKER_TYPE_RRF = "rrf" RERANKER_TYPE_WEIGHTED = "weighted" @@ -232,7 +245,7 @@ class EmbeddingIndex(ABC): raise NotImplementedError() @abstractmethod - async def delete_chunk(self, chunk_id: str): + async def delete_chunks(self, chunks_for_deletion: list[ChunkForDeletion]): raise NotImplementedError() @abstractmethod diff --git a/tests/common/mcp.py b/tests/common/mcp.py index 775e38295..d05ac39c6 100644 --- a/tests/common/mcp.py +++ b/tests/common/mcp.py @@ -16,13 +16,10 @@ MCP_TOOLGROUP_ID = "mcp::localmcp" def default_tools(): """Default tools for backward compatibility.""" - from mcp import types from mcp.server.fastmcp import Context - async def greet_everyone( - url: str, ctx: Context - ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: - return [types.TextContent(type="text", text="Hello, world!")] + async def greet_everyone(url: str, ctx: Context) -> str: + return "Hello, world!" async def get_boiling_point(liquid_name: str, celsius: bool = True) -> int: """ @@ -45,7 +42,6 @@ def default_tools(): def dependency_tools(): """Tools with natural dependencies for multi-turn testing.""" - from mcp import types from mcp.server.fastmcp import Context async def get_user_id(username: str, ctx: Context) -> str: @@ -106,7 +102,7 @@ def dependency_tools(): else: access = "no" - return [types.TextContent(type="text", text=access)] + return access async def get_experiment_id(experiment_name: str, ctx: Context) -> str: """ @@ -245,7 +241,6 @@ def make_mcp_server(required_auth_token: str | None = None, tools: dict[str, Cal try: yield {"server_url": server_url} finally: - print("Telling SSE server to exit") server_instance.should_exit = True time.sleep(0.5) @@ -269,4 +264,3 @@ def make_mcp_server(required_auth_token: str | None = None, tools: dict[str, Cal AppStatus.should_exit = False AppStatus.should_exit_event = None - print("SSE server exited") diff --git a/tests/integration/fixtures/common.py b/tests/integration/fixtures/common.py index c91391f19..0b7132d71 100644 --- a/tests/integration/fixtures/common.py +++ b/tests/integration/fixtures/common.py @@ -270,7 +270,7 @@ def openai_client(client_with_models): @pytest.fixture(params=["openai_client", "client_with_models"]) def compat_client(request, client_with_models): - if isinstance(client_with_models, LlamaStackAsLibraryClient): + if request.param == "openai_client" and isinstance(client_with_models, LlamaStackAsLibraryClient): # OpenAI client expects a server, so unless we also rewrite OpenAI client's requests # to go via the Stack library client (which itself rewrites requests to be served inline), # we cannot do this. diff --git a/tests/integration/non_ci/responses/fixtures/test_cases/responses.yaml b/tests/integration/non_ci/responses/fixtures/test_cases/responses.yaml index 6db0dd970..353a64291 100644 --- a/tests/integration/non_ci/responses/fixtures/test_cases/responses.yaml +++ b/tests/integration/non_ci/responses/fixtures/test_cases/responses.yaml @@ -137,7 +137,7 @@ test_response_multi_turn_tool_execution: server_url: "" output: "yes" - case_id: "experiment_results_lookup" - input: "I need to get the results for the 'boiling_point' experiment. First, get the experiment ID for 'boiling_point', then use that ID to get the experiment results. Tell me what you found." + input: "I need to get the results for the 'boiling_point' experiment. First, get the experiment ID for 'boiling_point', then use that ID to get the experiment results. Tell me the boiling point in Celsius." tools: - type: mcp server_label: "localmcp" @@ -149,7 +149,7 @@ test_response_multi_turn_tool_execution_streaming: test_params: case: - case_id: "user_permissions_workflow" - input: "Help me with this security check: First, get the user ID for 'charlie', then get the permissions for that user ID, and finally check if that user can access 'secret_file.txt'. Stream your progress as you work through each step." + input: "Help me with this security check: First, get the user ID for 'charlie', then get the permissions for that user ID, and finally check if that user can access 'secret_file.txt'. Stream your progress as you work through each step. Return only one tool call per step. Summarize the final result with a single 'yes' or 'no' response." tools: - type: mcp server_label: "localmcp" @@ -157,7 +157,7 @@ test_response_multi_turn_tool_execution_streaming: stream: true output: "no" - case_id: "experiment_analysis_streaming" - input: "I need a complete analysis: First, get the experiment ID for 'chemical_reaction', then get the results for that experiment, and tell me if the yield was above 80%. Please stream your analysis process." + input: "I need a complete analysis: First, get the experiment ID for 'chemical_reaction', then get the results for that experiment, and tell me if the yield was above 80%. Return only one tool call per step. Please stream your analysis process." tools: - type: mcp server_label: "localmcp" diff --git a/tests/integration/non_ci/responses/test_responses.py b/tests/integration/non_ci/responses/test_responses.py index 4f4f27d7f..39d00f328 100644 --- a/tests/integration/non_ci/responses/test_responses.py +++ b/tests/integration/non_ci/responses/test_responses.py @@ -363,6 +363,9 @@ def test_response_non_streaming_file_search_empty_vector_store(request, compat_c ids=case_id_generator, ) def test_response_non_streaming_mcp_tool(request, compat_client, text_model_id, case): + if not isinstance(compat_client, LlamaStackAsLibraryClient): + pytest.skip("in-process MCP server is only supported in library client") + with make_mcp_server() as mcp_server_info: tools = case["tools"] for tool in tools: @@ -485,8 +488,11 @@ def test_response_non_streaming_multi_turn_image(request, compat_client, text_mo responses_test_cases["test_response_multi_turn_tool_execution"]["test_params"]["case"], ids=case_id_generator, ) -def test_response_non_streaming_multi_turn_tool_execution(request, compat_client, text_model_id, case): +def test_response_non_streaming_multi_turn_tool_execution(compat_client, text_model_id, case): """Test multi-turn tool execution where multiple MCP tool calls are performed in sequence.""" + if not isinstance(compat_client, LlamaStackAsLibraryClient): + pytest.skip("in-process MCP server is only supported in library client") + with make_mcp_server(tools=dependency_tools()) as mcp_server_info: tools = case["tools"] # Replace the placeholder URL with the actual server URL @@ -541,8 +547,11 @@ def test_response_non_streaming_multi_turn_tool_execution(request, compat_client responses_test_cases["test_response_multi_turn_tool_execution_streaming"]["test_params"]["case"], ids=case_id_generator, ) -async def test_response_streaming_multi_turn_tool_execution(request, compat_client, text_model_id, case): +def test_response_streaming_multi_turn_tool_execution(compat_client, text_model_id, case): """Test streaming multi-turn tool execution where multiple MCP tool calls are performed in sequence.""" + if not isinstance(compat_client, LlamaStackAsLibraryClient): + pytest.skip("in-process MCP server is only supported in library client") + with make_mcp_server(tools=dependency_tools()) as mcp_server_info: tools = case["tools"] # Replace the placeholder URL with the actual server URL @@ -634,7 +643,7 @@ async def test_response_streaming_multi_turn_tool_execution(request, compat_clie }, ], ) -def test_response_text_format(request, compat_client, text_model_id, text_format): +def test_response_text_format(compat_client, text_model_id, text_format): if isinstance(compat_client, LlamaStackAsLibraryClient): pytest.skip("Responses API text format is not yet supported in library client.") @@ -653,7 +662,7 @@ def test_response_text_format(request, compat_client, text_model_id, text_format @pytest.fixture -def vector_store_with_filtered_files(request, compat_client, text_model_id, tmp_path_factory): +def vector_store_with_filtered_files(compat_client, text_model_id, tmp_path_factory): """Create a vector store with multiple files that have different attributes for filtering tests.""" if isinstance(compat_client, LlamaStackAsLibraryClient): pytest.skip("Responses API file search is not yet supported in library client.") diff --git a/tests/integration/vector_io/test_openai_vector_stores.py b/tests/integration/vector_io/test_openai_vector_stores.py index 3212a7568..7ccca9077 100644 --- a/tests/integration/vector_io/test_openai_vector_stores.py +++ b/tests/integration/vector_io/test_openai_vector_stores.py @@ -9,10 +9,11 @@ import time from io import BytesIO import pytest -from llama_stack_client import BadRequestError, LlamaStackClient +from llama_stack_client import BadRequestError from openai import BadRequestError as OpenAIBadRequestError from llama_stack.apis.vector_io import Chunk +from llama_stack.core.library_client import LlamaStackAsLibraryClient logger = logging.getLogger(__name__) @@ -475,9 +476,6 @@ def test_openai_vector_store_attach_file(compat_client_with_empty_stores, client """Test OpenAI vector store attach file.""" skip_if_provider_doesnt_support_openai_vector_stores(client_with_models) - if isinstance(compat_client_with_empty_stores, LlamaStackClient): - pytest.skip("Vector Store Files attach is not yet supported with LlamaStackClient") - compat_client = compat_client_with_empty_stores # Create a vector store @@ -526,9 +524,6 @@ def test_openai_vector_store_attach_files_on_creation(compat_client_with_empty_s """Test OpenAI vector store attach files on creation.""" skip_if_provider_doesnt_support_openai_vector_stores(client_with_models) - if isinstance(compat_client_with_empty_stores, LlamaStackClient): - pytest.skip("Vector Store Files attach is not yet supported with LlamaStackClient") - compat_client = compat_client_with_empty_stores # Create some files and attach them to the vector store @@ -582,9 +577,6 @@ def test_openai_vector_store_list_files(compat_client_with_empty_stores, client_ """Test OpenAI vector store list files.""" skip_if_provider_doesnt_support_openai_vector_stores(client_with_models) - if isinstance(compat_client_with_empty_stores, LlamaStackClient): - pytest.skip("Vector Store Files list is not yet supported with LlamaStackClient") - compat_client = compat_client_with_empty_stores # Create a vector store @@ -597,16 +589,20 @@ def test_openai_vector_store_list_files(compat_client_with_empty_stores, client_ file_buffer.name = f"openai_test_{i}.txt" file = compat_client.files.create(file=file_buffer, purpose="assistants") - compat_client.vector_stores.files.create( + response = compat_client.vector_stores.files.create( vector_store_id=vector_store.id, file_id=file.id, ) + assert response is not None + assert response.status == "completed", ( + f"Failed to attach file {file.id} to vector store {vector_store.id}: {response=}" + ) file_ids.append(file.id) files_list = compat_client.vector_stores.files.list(vector_store_id=vector_store.id) assert files_list assert files_list.object == "list" - assert files_list.data + assert files_list.data is not None assert not files_list.has_more assert len(files_list.data) == 3 assert set(file_ids) == {file.id for file in files_list.data} @@ -642,12 +638,13 @@ def test_openai_vector_store_list_files_invalid_vector_store(compat_client_with_ """Test OpenAI vector store list files with invalid vector store ID.""" skip_if_provider_doesnt_support_openai_vector_stores(client_with_models) - if isinstance(compat_client_with_empty_stores, LlamaStackClient): - pytest.skip("Vector Store Files list is not yet supported with LlamaStackClient") - compat_client = compat_client_with_empty_stores + if isinstance(compat_client, LlamaStackAsLibraryClient): + errors = ValueError + else: + errors = (BadRequestError, OpenAIBadRequestError) - with pytest.raises((BadRequestError, OpenAIBadRequestError)): + with pytest.raises(errors): compat_client.vector_stores.files.list(vector_store_id="abc123") @@ -655,9 +652,6 @@ def test_openai_vector_store_retrieve_file_contents(compat_client_with_empty_sto """Test OpenAI vector store retrieve file contents.""" skip_if_provider_doesnt_support_openai_vector_stores(client_with_models) - if isinstance(compat_client_with_empty_stores, LlamaStackClient): - pytest.skip("Vector Store Files retrieve contents is not yet supported with LlamaStackClient") - compat_client = compat_client_with_empty_stores # Create a vector store @@ -685,9 +679,15 @@ def test_openai_vector_store_retrieve_file_contents(compat_client_with_empty_sto file_id=file.id, ) - assert file_contents - assert file_contents.content[0]["type"] == "text" - assert file_contents.content[0]["text"] == test_content.decode("utf-8") + assert file_contents is not None + assert len(file_contents.content) == 1 + content = file_contents.content[0] + + # llama-stack-client returns a model, openai-python is a badboy and returns a dict + if not isinstance(content, dict): + content = content.model_dump() + assert content["type"] == "text" + assert content["text"] == test_content.decode("utf-8") assert file_contents.filename == file_name assert file_contents.attributes == attributes @@ -696,9 +696,6 @@ def test_openai_vector_store_delete_file(compat_client_with_empty_stores, client """Test OpenAI vector store delete file.""" skip_if_provider_doesnt_support_openai_vector_stores(client_with_models) - if isinstance(compat_client_with_empty_stores, LlamaStackClient): - pytest.skip("Vector Store Files list is not yet supported with LlamaStackClient") - compat_client = compat_client_with_empty_stores # Create a vector store @@ -751,9 +748,6 @@ def test_openai_vector_store_delete_file_removes_from_vector_store(compat_client """Test OpenAI vector store delete file removes from vector store.""" skip_if_provider_doesnt_support_openai_vector_stores(client_with_models) - if isinstance(compat_client_with_empty_stores, LlamaStackClient): - pytest.skip("Vector Store Files attach is not yet supported with LlamaStackClient") - compat_client = compat_client_with_empty_stores # Create a vector store @@ -792,9 +786,6 @@ def test_openai_vector_store_update_file(compat_client_with_empty_stores, client """Test OpenAI vector store update file.""" skip_if_provider_doesnt_support_openai_vector_stores(client_with_models) - if isinstance(compat_client_with_empty_stores, LlamaStackClient): - pytest.skip("Vector Store Files update is not yet supported with LlamaStackClient") - compat_client = compat_client_with_empty_stores # Create a vector store @@ -840,9 +831,6 @@ def test_create_vector_store_files_duplicate_vector_store_name(compat_client_wit """ skip_if_provider_doesnt_support_openai_vector_stores(client_with_models) - if isinstance(compat_client_with_empty_stores, LlamaStackClient): - pytest.skip("Vector Store Files create is not yet supported with LlamaStackClient") - compat_client = compat_client_with_empty_stores # Create a vector store with files