remove vector_db

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
This commit is contained in:
Francisco Javier Arceo 2025-10-10 15:53:02 -04:00
parent aee4afb327
commit 27c2b2dd57
16 changed files with 4 additions and 2596 deletions

View file

@ -8,7 +8,6 @@ from typing import Any
from uuid import uuid4
import pytest
import requests
from llama_stack_client import Agent, AgentEventLogger, Document
from llama_stack_client.types.shared_params.agent_config import AgentConfig, ToolConfig
@ -443,118 +442,6 @@ def run_agent_with_tool_choice(client, agent_config, tool_choice):
return [step for step in response.steps if step.step_type == "tool_execution"]
@pytest.mark.parametrize("rag_tool_name", ["builtin::rag/knowledge_search", "builtin::rag"])
def test_rag_agent(llama_stack_client, agent_config, rag_tool_name):
urls = ["chat.rst", "llama3.rst", "memory_optimizations.rst", "lora_finetune.rst"]
documents = [
Document(
document_id=f"num-{i}",
content=f"https://raw.githubusercontent.com/pytorch/torchtune/main/docs/source/tutorials/{url}",
mime_type="text/plain",
metadata={},
)
for i, url in enumerate(urls)
]
vector_db_id = f"test-vector-db-{uuid4()}"
llama_stack_client.vector_dbs.register(
vector_db_id=vector_db_id,
embedding_model="all-MiniLM-L6-v2",
embedding_dimension=384,
)
llama_stack_client.tool_runtime.rag_tool.insert(
documents=documents,
vector_db_id=vector_db_id,
# small chunks help to get specific info out of the docs
chunk_size_in_tokens=256,
)
agent_config = {
**agent_config,
"tools": [
dict(
name=rag_tool_name,
args={
"vector_db_ids": [vector_db_id],
},
)
],
}
rag_agent = Agent(llama_stack_client, **agent_config)
session_id = rag_agent.create_session(f"test-session-{uuid4()}")
user_prompts = [
(
"Instead of the standard multi-head attention, what attention type does Llama3-8B use?",
"grouped",
),
]
for prompt, expected_kw in user_prompts:
response = rag_agent.create_turn(
messages=[{"role": "user", "content": prompt}],
session_id=session_id,
stream=False,
)
# rag is called
tool_execution_step = next(step for step in response.steps if step.step_type == "tool_execution")
assert tool_execution_step.tool_calls[0].tool_name == "knowledge_search"
# document ids are present in metadata
assert all(
doc_id.startswith("num-") for doc_id in tool_execution_step.tool_responses[0].metadata["document_ids"]
)
if expected_kw:
assert expected_kw in response.output_message.content.lower()
def test_rag_agent_with_attachments(llama_stack_client, agent_config_without_safety):
urls = ["llama3.rst", "lora_finetune.rst"]
documents = [
# passign as url
Document(
document_id="num-0",
content={
"type": "url",
"uri": f"https://raw.githubusercontent.com/pytorch/torchtune/main/docs/source/tutorials/{urls[0]}",
},
mime_type="text/plain",
metadata={},
),
# passing as str
Document(
document_id="num-1",
content=requests.get(
f"https://raw.githubusercontent.com/pytorch/torchtune/main/docs/source/tutorials/{urls[1]}"
).text[:500],
mime_type="text/plain",
metadata={},
),
]
rag_agent = Agent(llama_stack_client, **agent_config_without_safety)
session_id = rag_agent.create_session(f"test-session-{uuid4()}")
user_prompts = [
(
"I am attaching some documentation for Torchtune. Help me answer questions I will ask next.",
documents,
),
(
"Tell me how to use LoRA in 100 words or less",
None,
),
]
for prompt in user_prompts:
response = rag_agent.create_turn(
messages=[
{
"role": "user",
"content": prompt[0],
}
],
documents=prompt[1],
session_id=session_id,
stream=False,
)
assert "lora" in response.output_message.content.lower()
@pytest.mark.parametrize(
"client_tools",
[(get_boiling_point, False), (get_boiling_point_with_metadata, True)],

View file

@ -1,459 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import pytest
from llama_stack_client import BadRequestError
from llama_stack_client.types import Document
@pytest.fixture(scope="function")
def client_with_empty_registry(client_with_models):
def clear_registry():
vector_dbs = [vector_db.identifier for vector_db in client_with_models.vector_dbs.list()]
for vector_db_id in vector_dbs:
client_with_models.vector_dbs.unregister(vector_db_id=vector_db_id)
clear_registry()
try:
client_with_models.toolgroups.register(toolgroup_id="builtin::rag", provider_id="rag-runtime")
except Exception:
pass
yield client_with_models
clear_registry()
@pytest.fixture(scope="session")
def sample_documents():
return [
Document(
document_id="test-doc-1",
content="Python is a high-level programming language.",
metadata={"category": "programming", "difficulty": "beginner"},
),
Document(
document_id="test-doc-2",
content="Machine learning is a subset of artificial intelligence.",
metadata={"category": "AI", "difficulty": "advanced"},
),
Document(
document_id="test-doc-3",
content="Data structures are fundamental to computer science.",
metadata={"category": "computer science", "difficulty": "intermediate"},
),
Document(
document_id="test-doc-4",
content="Neural networks are inspired by biological neural networks.",
metadata={"category": "AI", "difficulty": "advanced"},
),
]
def assert_valid_chunk_response(response):
assert len(response.chunks) > 0
assert len(response.scores) > 0
assert len(response.chunks) == len(response.scores)
for chunk in response.chunks:
assert isinstance(chunk.content, str)
def assert_valid_text_response(response):
assert len(response.content) > 0
assert all(isinstance(chunk.text, str) for chunk in response.content)
def test_vector_db_insert_inline_and_query(
client_with_empty_registry, sample_documents, embedding_model_id, embedding_dimension
):
vector_db_name = "test_vector_db"
vector_db = client_with_empty_registry.vector_dbs.register(
vector_db_id=vector_db_name,
embedding_model=embedding_model_id,
embedding_dimension=embedding_dimension,
)
vector_db_id = vector_db.identifier
client_with_empty_registry.tool_runtime.rag_tool.insert(
documents=sample_documents,
chunk_size_in_tokens=512,
vector_db_id=vector_db_id,
)
# Query with a direct match
query1 = "programming language"
response1 = client_with_empty_registry.vector_io.query(
vector_db_id=vector_db_id,
query=query1,
)
assert_valid_chunk_response(response1)
assert any("Python" in chunk.content for chunk in response1.chunks)
# Query with semantic similarity
query2 = "AI and brain-inspired computing"
response2 = client_with_empty_registry.vector_io.query(
vector_db_id=vector_db_id,
query=query2,
)
assert_valid_chunk_response(response2)
assert any("neural networks" in chunk.content.lower() for chunk in response2.chunks)
# Query with limit on number of results (max_chunks=2)
query3 = "computer"
response3 = client_with_empty_registry.vector_io.query(
vector_db_id=vector_db_id,
query=query3,
params={"max_chunks": 2},
)
assert_valid_chunk_response(response3)
assert len(response3.chunks) <= 2
# Query with threshold on similarity score
query4 = "computer"
response4 = client_with_empty_registry.vector_io.query(
vector_db_id=vector_db_id,
query=query4,
params={"score_threshold": 0.01},
)
assert_valid_chunk_response(response4)
assert all(score >= 0.01 for score in response4.scores)
def test_vector_db_insert_from_url_and_query(
client_with_empty_registry, sample_documents, embedding_model_id, embedding_dimension
):
providers = [p for p in client_with_empty_registry.providers.list() if p.api == "vector_io"]
assert len(providers) > 0
vector_db_id = "test_vector_db"
client_with_empty_registry.vector_dbs.register(
vector_db_id=vector_db_id,
embedding_model=embedding_model_id,
embedding_dimension=embedding_dimension,
)
# list to check memory bank is successfully registered
available_vector_dbs = [vector_db.identifier for vector_db in client_with_empty_registry.vector_dbs.list()]
# VectorDB is being migrated to VectorStore, so the ID will be different
# Just check that at least one vector DB was registered
assert len(available_vector_dbs) > 0
# Use the actual registered vector_db_id for subsequent operations
actual_vector_db_id = available_vector_dbs[0]
urls = [
"memory_optimizations.rst",
"chat.rst",
"llama3.rst",
]
documents = [
Document(
document_id=f"num-{i}",
content=f"https://raw.githubusercontent.com/pytorch/torchtune/main/docs/source/tutorials/{url}",
mime_type="text/plain",
metadata={},
)
for i, url in enumerate(urls)
]
client_with_empty_registry.tool_runtime.rag_tool.insert(
documents=documents,
vector_db_id=actual_vector_db_id,
chunk_size_in_tokens=512,
)
# Query for the name of method
response1 = client_with_empty_registry.vector_io.query(
vector_db_id=actual_vector_db_id,
query="What's the name of the fine-tunning method used?",
)
assert_valid_chunk_response(response1)
assert any("lora" in chunk.content.lower() for chunk in response1.chunks)
# Query for the name of model
response2 = client_with_empty_registry.vector_io.query(
vector_db_id=actual_vector_db_id,
query="Which Llama model is mentioned?",
)
assert_valid_chunk_response(response2)
assert any("llama2" in chunk.content.lower() for chunk in response2.chunks)
def test_rag_tool_openai_apis(client_with_empty_registry, embedding_model_id, embedding_dimension):
vector_db_id = "test_openai_vector_db"
client_with_empty_registry.vector_dbs.register(
vector_db_id=vector_db_id,
embedding_model=embedding_model_id,
embedding_dimension=embedding_dimension,
)
available_vector_dbs = [vector_db.identifier for vector_db in client_with_empty_registry.vector_dbs.list()]
actual_vector_db_id = available_vector_dbs[0]
# different document formats that should work with OpenAI APIs
documents = [
Document(
document_id="text-doc",
content="This is a plain text document about machine learning algorithms.",
metadata={"type": "text", "category": "AI"},
),
Document(
document_id="url-doc",
content="https://raw.githubusercontent.com/pytorch/torchtune/main/docs/source/tutorials/chat.rst",
mime_type="text/plain",
metadata={"type": "url", "source": "pytorch"},
),
Document(
document_id="data-url-doc",
content="data:text/plain;base64,VGhpcyBpcyBhIGRhdGEgVVJMIGRvY3VtZW50IGFib3V0IGRlZXAgbGVhcm5pbmcu", # "This is a data URL document about deep learning."
metadata={"type": "data_url", "encoding": "base64"},
),
]
client_with_empty_registry.tool_runtime.rag_tool.insert(
documents=documents,
vector_db_id=actual_vector_db_id,
chunk_size_in_tokens=256,
)
files_list = client_with_empty_registry.files.list()
assert len(files_list.data) >= len(documents), (
f"Expected at least {len(documents)} files, got {len(files_list.data)}"
)
vector_store_files = client_with_empty_registry.vector_io.openai_list_files_in_vector_store(
vector_store_id=actual_vector_db_id
)
assert len(vector_store_files.data) >= len(documents), f"Expected at least {len(documents)} files in vector store"
response = client_with_empty_registry.tool_runtime.rag_tool.query(
vector_db_ids=[actual_vector_db_id],
content="Tell me about machine learning and deep learning",
)
assert_valid_text_response(response)
content_text = " ".join([chunk.text for chunk in response.content]).lower()
assert "machine learning" in content_text or "deep learning" in content_text
def test_rag_tool_exception_handling(client_with_empty_registry, embedding_model_id, embedding_dimension):
vector_db_id = "test_exception_handling"
client_with_empty_registry.vector_dbs.register(
vector_db_id=vector_db_id,
embedding_model=embedding_model_id,
embedding_dimension=embedding_dimension,
)
available_vector_dbs = [vector_db.identifier for vector_db in client_with_empty_registry.vector_dbs.list()]
actual_vector_db_id = available_vector_dbs[0]
documents = [
Document(
document_id="valid-doc",
content="This is a valid document that should be processed successfully.",
metadata={"status": "valid"},
),
Document(
document_id="invalid-url-doc",
content="https://nonexistent-domain-12345.com/invalid.txt",
metadata={"status": "invalid_url"},
),
Document(
document_id="another-valid-doc",
content="This is another valid document for testing resilience.",
metadata={"status": "valid"},
),
]
client_with_empty_registry.tool_runtime.rag_tool.insert(
documents=documents,
vector_db_id=actual_vector_db_id,
chunk_size_in_tokens=256,
)
response = client_with_empty_registry.tool_runtime.rag_tool.query(
vector_db_ids=[actual_vector_db_id],
content="valid document",
)
assert_valid_text_response(response)
content_text = " ".join([chunk.text for chunk in response.content]).lower()
assert "valid document" in content_text
def test_rag_tool_insert_and_query(client_with_empty_registry, embedding_model_id, embedding_dimension):
providers = [p for p in client_with_empty_registry.providers.list() if p.api == "vector_io"]
assert len(providers) > 0
vector_db_id = "test_vector_db"
client_with_empty_registry.vector_dbs.register(
vector_db_id=vector_db_id,
embedding_model=embedding_model_id,
embedding_dimension=embedding_dimension,
)
available_vector_dbs = [vector_db.identifier for vector_db in client_with_empty_registry.vector_dbs.list()]
# VectorDB is being migrated to VectorStore, so the ID will be different
# Just check that at least one vector DB was registered
assert len(available_vector_dbs) > 0
# Use the actual registered vector_db_id for subsequent operations
actual_vector_db_id = available_vector_dbs[0]
urls = [
"memory_optimizations.rst",
"chat.rst",
"llama3.rst",
]
documents = [
Document(
document_id=f"num-{i}",
content=f"https://raw.githubusercontent.com/pytorch/torchtune/main/docs/source/tutorials/{url}",
mime_type="text/plain",
metadata={"author": "llama", "source": url},
)
for i, url in enumerate(urls)
]
client_with_empty_registry.tool_runtime.rag_tool.insert(
documents=documents,
vector_db_id=actual_vector_db_id,
chunk_size_in_tokens=512,
)
response_with_metadata = client_with_empty_registry.tool_runtime.rag_tool.query(
vector_db_ids=[actual_vector_db_id],
content="What is the name of the method used for fine-tuning?",
)
assert_valid_text_response(response_with_metadata)
assert any("metadata:" in chunk.text.lower() for chunk in response_with_metadata.content)
response_without_metadata = client_with_empty_registry.tool_runtime.rag_tool.query(
vector_db_ids=[actual_vector_db_id],
content="What is the name of the method used for fine-tuning?",
query_config={
"include_metadata_in_content": True,
"chunk_template": "Result {index}\nContent: {chunk.content}\n",
},
)
assert_valid_text_response(response_without_metadata)
assert not any("metadata:" in chunk.text.lower() for chunk in response_without_metadata.content)
with pytest.raises((ValueError, BadRequestError)):
client_with_empty_registry.tool_runtime.rag_tool.query(
vector_db_ids=[actual_vector_db_id],
content="What is the name of the method used for fine-tuning?",
query_config={
"chunk_template": "This should raise a ValueError because it is missing the proper template variables",
},
)
def test_rag_tool_query_generation(client_with_empty_registry, embedding_model_id, embedding_dimension):
vector_db_id = "test_query_generation_db"
client_with_empty_registry.vector_dbs.register(
vector_db_id=vector_db_id,
embedding_model=embedding_model_id,
embedding_dimension=embedding_dimension,
)
available_vector_dbs = [vector_db.identifier for vector_db in client_with_empty_registry.vector_dbs.list()]
actual_vector_db_id = available_vector_dbs[0]
documents = [
Document(
document_id="ai-doc",
content="Artificial intelligence and machine learning are transforming technology.",
metadata={"category": "AI"},
),
Document(
document_id="banana-doc",
content="Don't bring a banana to a knife fight.",
metadata={"category": "wisdom"},
),
]
client_with_empty_registry.tool_runtime.rag_tool.insert(
documents=documents,
vector_db_id=actual_vector_db_id,
chunk_size_in_tokens=256,
)
response = client_with_empty_registry.tool_runtime.rag_tool.query(
vector_db_ids=[actual_vector_db_id],
content="Tell me about AI",
)
assert_valid_text_response(response)
content_text = " ".join([chunk.text for chunk in response.content]).lower()
assert "artificial intelligence" in content_text or "machine learning" in content_text
def test_rag_tool_pdf_data_url_handling(client_with_empty_registry, embedding_model_id, embedding_dimension):
vector_db_id = "test_pdf_data_url_db"
client_with_empty_registry.vector_dbs.register(
vector_db_id=vector_db_id,
embedding_model=embedding_model_id,
embedding_dimension=embedding_dimension,
)
available_vector_dbs = [vector_db.identifier for vector_db in client_with_empty_registry.vector_dbs.list()]
actual_vector_db_id = available_vector_dbs[0]
sample_pdf = b"%PDF-1.3\n3 0 obj\n<</Type /Page\n/Parent 1 0 R\n/Resources 2 0 R\n/Contents 4 0 R>>\nendobj\n4 0 obj\n<</Filter /FlateDecode /Length 115>>\nstream\nx\x9c\x15\xcc1\x0e\x820\x18@\xe1\x9dS\xbcM]jk$\xd5\xd5(\x83!\x86\xa1\x17\xf8\xa3\xa5`LIh+\xd7W\xc6\xf7\r\xef\xc0\xbd\xd2\xaa\xb6,\xd5\xc5\xb1o\x0c\xa6VZ\xe3znn%\xf3o\xab\xb1\xe7\xa3:Y\xdc\x8bm\xeb\xf3&1\xc8\xd7\xd3\x97\xc82\xe6\x81\x87\xe42\xcb\x87Vb(\x12<\xdd<=}Jc\x0cL\x91\xee\xda$\xb5\xc3\xbd\xd7\xe9\x0f\x8d\x97 $\nendstream\nendobj\n1 0 obj\n<</Type /Pages\n/Kids [3 0 R ]\n/Count 1\n/MediaBox [0 0 595.28 841.89]\n>>\nendobj\n5 0 obj\n<</Type /Font\n/BaseFont /Helvetica\n/Subtype /Type1\n/Encoding /WinAnsiEncoding\n>>\nendobj\n2 0 obj\n<<\n/ProcSet [/PDF /Text /ImageB /ImageC /ImageI]\n/Font <<\n/F1 5 0 R\n>>\n/XObject <<\n>>\n>>\nendobj\n6 0 obj\n<<\n/Producer (PyFPDF 1.7.2 http://pyfpdf.googlecode.com/)\n/Title (This is a sample title.)\n/Author (Llama Stack Developers)\n/CreationDate (D:20250312165548)\n>>\nendobj\n7 0 obj\n<<\n/Type /Catalog\n/Pages 1 0 R\n/OpenAction [3 0 R /FitH null]\n/PageLayout /OneColumn\n>>\nendobj\nxref\n0 8\n0000000000 65535 f \n0000000272 00000 n \n0000000455 00000 n \n0000000009 00000 n \n0000000087 00000 n \n0000000359 00000 n \n0000000559 00000 n \n0000000734 00000 n \ntrailer\n<<\n/Size 8\n/Root 7 0 R\n/Info 6 0 R\n>>\nstartxref\n837\n%%EOF\n"
import base64
pdf_base64 = base64.b64encode(sample_pdf).decode("utf-8")
pdf_data_url = f"data:application/pdf;base64,{pdf_base64}"
documents = [
Document(
document_id="test-pdf-data-url",
content=pdf_data_url,
metadata={"type": "pdf", "source": "data_url"},
),
]
client_with_empty_registry.tool_runtime.rag_tool.insert(
documents=documents,
vector_db_id=actual_vector_db_id,
chunk_size_in_tokens=256,
)
files_list = client_with_empty_registry.files.list()
assert len(files_list.data) >= 1, "PDF should have been uploaded to Files API"
pdf_file = None
for file in files_list.data:
if file.filename and "test-pdf-data-url" in file.filename:
pdf_file = file
break
assert pdf_file is not None, "PDF file should be found in Files API"
assert pdf_file.bytes == len(sample_pdf), f"File size should match original PDF ({len(sample_pdf)} bytes)"
file_content = client_with_empty_registry.files.retrieve_content(pdf_file.id)
assert file_content.startswith(b"%PDF-"), "Retrieved file should be a valid PDF"
vector_store_files = client_with_empty_registry.vector_io.openai_list_files_in_vector_store(
vector_store_id=actual_vector_db_id
)
assert len(vector_store_files.data) >= 1, "PDF should be attached to vector store"
response = client_with_empty_registry.tool_runtime.rag_tool.query(
vector_db_ids=[actual_vector_db_id],
content="sample title",
)
assert_valid_text_response(response)
content_text = " ".join([chunk.text for chunk in response.content]).lower()
assert "sample title" in content_text or "title" in content_text

View file

@ -1,381 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
# Unit tests for the routing tables vector_dbs
import time
import uuid
from unittest.mock import AsyncMock
import pytest
from llama_stack.apis.datatypes import Api
from llama_stack.apis.models import ModelType
from llama_stack.apis.vector_dbs.vector_dbs import VectorDB
from llama_stack.apis.vector_io.vector_io import (
VectorStoreContent,
VectorStoreDeleteResponse,
VectorStoreFileContentsResponse,
VectorStoreFileCounts,
VectorStoreFileDeleteResponse,
VectorStoreFileObject,
VectorStoreObject,
VectorStoreSearchResponsePage,
)
from llama_stack.core.access_control.datatypes import AccessRule, Scope
from llama_stack.core.datatypes import User
from llama_stack.core.request_headers import request_provider_data_context
from llama_stack.core.routing_tables.vector_dbs import VectorDBsRoutingTable
from tests.unit.distribution.routers.test_routing_tables import Impl, InferenceImpl, ModelsRoutingTable
class VectorDBImpl(Impl):
def __init__(self):
super().__init__(Api.vector_io)
self.vector_stores = {}
async def register_vector_db(self, vector_db: VectorDB):
return vector_db
async def unregister_vector_db(self, vector_db_id: str):
return vector_db_id
async def openai_retrieve_vector_store(self, vector_store_id):
return VectorStoreObject(
id=vector_store_id,
name="Test Store",
created_at=int(time.time()),
file_counts=VectorStoreFileCounts(completed=0, cancelled=0, failed=0, in_progress=0, total=0),
)
async def openai_update_vector_store(self, vector_store_id, **kwargs):
return VectorStoreObject(
id=vector_store_id,
name="Updated Store",
created_at=int(time.time()),
file_counts=VectorStoreFileCounts(completed=0, cancelled=0, failed=0, in_progress=0, total=0),
)
async def openai_delete_vector_store(self, vector_store_id):
return VectorStoreDeleteResponse(id=vector_store_id, object="vector_store.deleted", deleted=True)
async def openai_search_vector_store(self, vector_store_id, query, **kwargs):
return VectorStoreSearchResponsePage(
object="vector_store.search_results.page", search_query="query", data=[], has_more=False, next_page=None
)
async def openai_attach_file_to_vector_store(self, vector_store_id, file_id, **kwargs):
return VectorStoreFileObject(
id=file_id,
status="completed",
chunking_strategy={"type": "auto"},
created_at=int(time.time()),
vector_store_id=vector_store_id,
)
async def openai_list_files_in_vector_store(self, vector_store_id, **kwargs):
return [
VectorStoreFileObject(
id="1",
status="completed",
chunking_strategy={"type": "auto"},
created_at=int(time.time()),
vector_store_id=vector_store_id,
)
]
async def openai_retrieve_vector_store_file(self, vector_store_id, file_id):
return VectorStoreFileObject(
id=file_id,
status="completed",
chunking_strategy={"type": "auto"},
created_at=int(time.time()),
vector_store_id=vector_store_id,
)
async def openai_retrieve_vector_store_file_contents(self, vector_store_id, file_id):
return VectorStoreFileContentsResponse(
file_id=file_id,
filename="Sample File name",
attributes={"key": "value"},
content=[VectorStoreContent(type="text", text="Sample content")],
)
async def openai_update_vector_store_file(self, vector_store_id, file_id, **kwargs):
return VectorStoreFileObject(
id=file_id,
status="completed",
chunking_strategy={"type": "auto"},
created_at=int(time.time()),
vector_store_id=vector_store_id,
)
async def openai_delete_vector_store_file(self, vector_store_id, file_id):
return VectorStoreFileDeleteResponse(id=file_id, deleted=True)
async def openai_create_vector_store(
self,
name=None,
embedding_model=None,
embedding_dimension=None,
provider_id=None,
provider_vector_db_id=None,
**kwargs,
):
vector_store_id = provider_vector_db_id or f"vs_{uuid.uuid4()}"
vector_store = VectorStoreObject(
id=vector_store_id,
name=name or vector_store_id,
created_at=int(time.time()),
file_counts=VectorStoreFileCounts(completed=0, cancelled=0, failed=0, in_progress=0, total=0),
)
self.vector_stores[vector_store_id] = vector_store
return vector_store
async def openai_list_vector_stores(self, **kwargs):
from llama_stack.apis.vector_io.vector_io import VectorStoreListResponse
return VectorStoreListResponse(
data=list(self.vector_stores.values()), has_more=False, first_id=None, last_id=None
)
async def test_vectordbs_routing_table(cached_disk_dist_registry):
n = 10
table = VectorDBsRoutingTable({"test_provider": VectorDBImpl()}, cached_disk_dist_registry, {})
await table.initialize()
m_table = ModelsRoutingTable({"test_provider": InferenceImpl()}, cached_disk_dist_registry, {})
await m_table.initialize()
await m_table.register_model(
model_id="test-model",
provider_id="test_provider",
metadata={"embedding_dimension": 128},
model_type=ModelType.embedding,
)
# Register multiple vector databases and verify listing
vdb_dict = {}
for i in range(n):
vdb_dict[i] = await table.register_vector_db(vector_db_id=f"test-vectordb-{i}", embedding_model="test-model")
vector_dbs = await table.list_vector_dbs()
assert len(vector_dbs.data) == len(vdb_dict)
vector_db_ids = {v.identifier for v in vector_dbs.data}
for k in vdb_dict:
assert vdb_dict[k].identifier in vector_db_ids
for k in vdb_dict:
await table.unregister_vector_db(vector_db_id=vdb_dict[k].identifier)
vector_dbs = await table.list_vector_dbs()
assert len(vector_dbs.data) == 0
async def test_vector_db_and_vector_store_id_mapping(cached_disk_dist_registry):
n = 10
impl = VectorDBImpl()
table = VectorDBsRoutingTable({"test_provider": impl}, cached_disk_dist_registry, {})
await table.initialize()
m_table = ModelsRoutingTable({"test_provider": InferenceImpl()}, cached_disk_dist_registry, {})
await m_table.initialize()
await m_table.register_model(
model_id="test-model",
provider_id="test_provider",
metadata={"embedding_dimension": 128},
model_type=ModelType.embedding,
)
vdb_dict = {}
for i in range(n):
vdb_dict[i] = await table.register_vector_db(vector_db_id=f"test-vectordb-{i}", embedding_model="test-model")
vector_dbs = await table.list_vector_dbs()
vector_db_ids = {v.identifier for v in vector_dbs.data}
vector_stores = await impl.openai_list_vector_stores()
vector_store_ids = {v.id for v in vector_stores.data}
assert vector_db_ids == vector_store_ids, (
f"Vector DB IDs {vector_db_ids} don't match vector store IDs {vector_store_ids}"
)
for vector_store in vector_stores.data:
vector_db = await table.get_vector_db(vector_store.id)
assert vector_store.name == vector_db.vector_db_name, (
f"Vector store name {vector_store.name} doesn't match vector store ID {vector_store.id}"
)
for vector_db_id in vector_db_ids:
await table.unregister_vector_db(vector_db_id)
assert len((await table.list_vector_dbs()).data) == 0
async def test_vector_db_id_becomes_vector_store_name(cached_disk_dist_registry):
impl = VectorDBImpl()
table = VectorDBsRoutingTable({"test_provider": impl}, cached_disk_dist_registry, {})
await table.initialize()
m_table = ModelsRoutingTable({"test_provider": InferenceImpl()}, cached_disk_dist_registry, {})
await m_table.initialize()
await m_table.register_model(
model_id="test-model",
provider_id="test_provider",
metadata={"embedding_dimension": 128},
model_type=ModelType.embedding,
)
user_provided_id = "my-custom-vector-db"
await table.register_vector_db(vector_db_id=user_provided_id, embedding_model="test-model")
vector_stores = await impl.openai_list_vector_stores()
assert len(vector_stores.data) == 1
vector_store = vector_stores.data[0]
assert vector_store.name == user_provided_id
assert vector_store.id.startswith("vs_")
assert vector_store.id != user_provided_id
vector_dbs = await table.list_vector_dbs()
assert len(vector_dbs.data) == 1
assert vector_dbs.data[0].identifier == vector_store.id
await table.unregister_vector_db(vector_store.id)
async def test_openai_vector_stores_routing_table_roles(cached_disk_dist_registry):
impl = VectorDBImpl()
impl.openai_retrieve_vector_store = AsyncMock(return_value="OK")
table = VectorDBsRoutingTable({"test_provider": impl}, cached_disk_dist_registry, policy=[])
m_table = ModelsRoutingTable({"test_provider": InferenceImpl()}, cached_disk_dist_registry, policy=[])
authorized_table = "vs1"
authorized_team = "team1"
unauthorized_team = "team2"
await m_table.initialize()
await m_table.register_model(
model_id="test-model",
provider_id="test_provider",
metadata={"embedding_dimension": 128},
model_type=ModelType.embedding,
)
authorized_user = User(principal="alice", attributes={"roles": [authorized_team]})
with request_provider_data_context({}, authorized_user):
registered_vdb = await table.register_vector_db(vector_db_id="vs1", embedding_model="test-model")
authorized_table = registered_vdb.identifier # Use the actual generated ID
# Authorized reader
with request_provider_data_context({}, authorized_user):
res = await table.openai_retrieve_vector_store(authorized_table)
assert res == "OK"
# Authorized updater
impl.openai_update_vector_store_file = AsyncMock(return_value="UPDATED")
with request_provider_data_context({}, authorized_user):
res = await table.openai_update_vector_store_file(authorized_table, file_id="file1", attributes={"foo": "bar"})
assert res == "UPDATED"
# Unauthorized reader
unauthorized_user = User(principal="eve", attributes={"roles": [unauthorized_team]})
with request_provider_data_context({}, unauthorized_user):
with pytest.raises(ValueError):
await table.openai_retrieve_vector_store(authorized_table)
# Unauthorized updater
with request_provider_data_context({}, unauthorized_user):
with pytest.raises(ValueError):
await table.openai_update_vector_store_file(authorized_table, file_id="file1", attributes={"foo": "bar"})
# Authorized deleter
impl.openai_delete_vector_store_file = AsyncMock(return_value="DELETED")
with request_provider_data_context({}, authorized_user):
res = await table.openai_delete_vector_store_file(authorized_table, file_id="file1")
assert res == "DELETED"
# Unauthorized deleter
with request_provider_data_context({}, unauthorized_user):
with pytest.raises(ValueError):
await table.openai_delete_vector_store_file(authorized_table, file_id="file1")
async def test_openai_vector_stores_routing_table_actions(cached_disk_dist_registry):
impl = VectorDBImpl()
policy = [
AccessRule(permit=Scope(actions=["create", "read", "update", "delete"]), when="user with admin in roles"),
AccessRule(permit=Scope(actions=["read"]), when="user with reader in roles"),
]
table = VectorDBsRoutingTable({"test_provider": impl}, cached_disk_dist_registry, policy=policy)
m_table = ModelsRoutingTable({"test_provider": InferenceImpl()}, cached_disk_dist_registry, policy=[])
vector_db_id = "vs1"
file_id = "file-1"
admin_user = User(principal="admin", attributes={"roles": ["admin"]})
read_only_user = User(principal="reader", attributes={"roles": ["reader"]})
no_access_user = User(principal="outsider", attributes={"roles": ["no_access"]})
await m_table.initialize()
await m_table.register_model(
model_id="test-model",
provider_id="test_provider",
metadata={"embedding_dimension": 128},
model_type=ModelType.embedding,
)
with request_provider_data_context({}, admin_user):
registered_vdb = await table.register_vector_db(vector_db_id=vector_db_id, embedding_model="test-model")
vector_db_id = registered_vdb.identifier # Use the actual generated ID
read_methods = [
(table.openai_retrieve_vector_store, (vector_db_id,), {}),
(table.openai_search_vector_store, (vector_db_id, "query"), {}),
(table.openai_list_files_in_vector_store, (vector_db_id,), {}),
(table.openai_retrieve_vector_store_file, (vector_db_id, file_id), {}),
(table.openai_retrieve_vector_store_file_contents, (vector_db_id, file_id), {}),
]
update_methods = [
(table.openai_update_vector_store, (vector_db_id,), {"name": "Updated DB"}),
(table.openai_attach_file_to_vector_store, (vector_db_id, file_id), {}),
(table.openai_update_vector_store_file, (vector_db_id, file_id), {"attributes": {"key": "value"}}),
]
delete_methods = [
(table.openai_delete_vector_store_file, (vector_db_id, file_id), {}),
(table.openai_delete_vector_store, (vector_db_id,), {}),
]
for user in [admin_user, read_only_user]:
with request_provider_data_context({}, user):
for method, args, kwargs in read_methods:
result = await method(*args, **kwargs)
assert result is not None, f"Read operation failed with user {user.principal}"
with request_provider_data_context({}, no_access_user):
for method, args, kwargs in read_methods:
with pytest.raises(ValueError):
await method(*args, **kwargs)
with request_provider_data_context({}, admin_user):
for method, args, kwargs in update_methods:
result = await method(*args, **kwargs)
assert result is not None, "Update operation failed with admin user"
with request_provider_data_context({}, admin_user):
for method, args, kwargs in delete_methods:
result = await method(*args, **kwargs)
assert result is not None, "Delete operation failed with admin user"
for user in [read_only_user, no_access_user]:
with request_provider_data_context({}, user):
for method, args, kwargs in delete_methods:
with pytest.raises(ValueError):
await method(*args, **kwargs)