Merge branch 'main' into responses_object

This commit is contained in:
Emilio Garcia 2025-09-17 08:48:08 -04:00 committed by GitHub
commit a666f6df3e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
381 changed files with 56349 additions and 12626 deletions

View file

@ -105,6 +105,9 @@ class ScoringFunctionsImpl(Impl):
async def register_scoring_function(self, scoring_fn):
return scoring_fn
async def unregister_scoring_function(self, scoring_fn_id: str):
return scoring_fn_id
class BenchmarksImpl(Impl):
def __init__(self):
@ -113,6 +116,9 @@ class BenchmarksImpl(Impl):
async def register_benchmark(self, benchmark):
return benchmark
async def unregister_benchmark(self, benchmark_id: str):
return benchmark_id
class ToolGroupsImpl(Impl):
def __init__(self):
@ -146,6 +152,20 @@ class VectorDBImpl(Impl):
async def unregister_vector_db(self, vector_db_id: str):
return vector_db_id
async def openai_create_vector_store(self, **kwargs):
import time
import uuid
from llama_stack.apis.vector_io.vector_io import VectorStoreFileCounts, VectorStoreObject
vector_store_id = kwargs.get("provider_vector_db_id") or f"vs_{uuid.uuid4()}"
return VectorStoreObject(
id=vector_store_id,
name=kwargs.get("name", vector_store_id),
created_at=int(time.time()),
file_counts=VectorStoreFileCounts(completed=0, cancelled=0, failed=0, in_progress=0, total=0),
)
async def test_models_routing_table(cached_disk_dist_registry):
table = ModelsRoutingTable({"test_provider": InferenceImpl()}, cached_disk_dist_registry, {})
@ -247,17 +267,21 @@ async def test_vectordbs_routing_table(cached_disk_dist_registry):
)
# Register multiple vector databases and verify listing
await table.register_vector_db(vector_db_id="test-vectordb", embedding_model="test_provider/test-model")
await table.register_vector_db(vector_db_id="test-vectordb-2", embedding_model="test_provider/test-model")
vdb1 = await table.register_vector_db(vector_db_id="test-vectordb", embedding_model="test_provider/test-model")
vdb2 = await table.register_vector_db(vector_db_id="test-vectordb-2", embedding_model="test_provider/test-model")
vector_dbs = await table.list_vector_dbs()
assert len(vector_dbs.data) == 2
vector_db_ids = {v.identifier for v in vector_dbs.data}
assert "test-vectordb" in vector_db_ids
assert "test-vectordb-2" in vector_db_ids
assert vdb1.identifier in vector_db_ids
assert vdb2.identifier in vector_db_ids
await table.unregister_vector_db(vector_db_id="test-vectordb")
await table.unregister_vector_db(vector_db_id="test-vectordb-2")
# Verify they have UUID-based identifiers
assert vdb1.identifier.startswith("vs_")
assert vdb2.identifier.startswith("vs_")
await table.unregister_vector_db(vector_db_id=vdb1.identifier)
await table.unregister_vector_db(vector_db_id=vdb2.identifier)
vector_dbs = await table.list_vector_dbs()
assert len(vector_dbs.data) == 0
@ -312,6 +336,13 @@ async def test_scoring_functions_routing_table(cached_disk_dist_registry):
assert "test-scoring-fn" in scoring_fn_ids
assert "test-scoring-fn-2" in scoring_fn_ids
# Unregister scoring functions and verify listing
for i in range(len(scoring_functions.data)):
await table.unregister_scoring_function(scoring_functions.data[i].scoring_fn_id)
scoring_functions_list_after_deletion = await table.list_scoring_functions()
assert len(scoring_functions_list_after_deletion.data) == 0
async def test_benchmarks_routing_table(cached_disk_dist_registry):
table = BenchmarksRoutingTable({"test_provider": BenchmarksImpl()}, cached_disk_dist_registry, {})
@ -329,6 +360,15 @@ async def test_benchmarks_routing_table(cached_disk_dist_registry):
benchmark_ids = {b.identifier for b in benchmarks.data}
assert "test-benchmark" in benchmark_ids
# Unregister the benchmark and verify removal
await table.unregister_benchmark(benchmark_id="test-benchmark")
benchmarks_after = await table.list_benchmarks()
assert len(benchmarks_after.data) == 0
# Unregistering a non-existent benchmark should raise a clear error
with pytest.raises(ValueError, match="Benchmark 'dummy_benchmark' not found"):
await table.unregister_benchmark(benchmark_id="dummy_benchmark")
async def test_tool_groups_routing_table(cached_disk_dist_registry):
table = ToolGroupsRoutingTable({"test_provider": ToolGroupsImpl()}, cached_disk_dist_registry, {})

View file

@ -7,6 +7,7 @@
# Unit tests for the routing tables vector_dbs
import time
import uuid
from unittest.mock import AsyncMock
import pytest
@ -34,6 +35,7 @@ from tests.unit.distribution.routers.test_routing_tables import Impl, InferenceI
class VectorDBImpl(Impl):
def __init__(self):
super().__init__(Api.vector_io)
self.vector_stores = {}
async def register_vector_db(self, vector_db: VectorDB):
return vector_db
@ -114,8 +116,35 @@ class VectorDBImpl(Impl):
async def openai_delete_vector_store_file(self, vector_store_id, file_id):
return VectorStoreFileDeleteResponse(id=file_id, deleted=True)
async def openai_create_vector_store(
self,
name=None,
embedding_model=None,
embedding_dimension=None,
provider_id=None,
provider_vector_db_id=None,
**kwargs,
):
vector_store_id = provider_vector_db_id or f"vs_{uuid.uuid4()}"
vector_store = VectorStoreObject(
id=vector_store_id,
name=name or vector_store_id,
created_at=int(time.time()),
file_counts=VectorStoreFileCounts(completed=0, cancelled=0, failed=0, in_progress=0, total=0),
)
self.vector_stores[vector_store_id] = vector_store
return vector_store
async def openai_list_vector_stores(self, **kwargs):
from llama_stack.apis.vector_io.vector_io import VectorStoreListResponse
return VectorStoreListResponse(
data=list(self.vector_stores.values()), has_more=False, first_id=None, last_id=None
)
async def test_vectordbs_routing_table(cached_disk_dist_registry):
n = 10
table = VectorDBsRoutingTable({"test_provider": VectorDBImpl()}, cached_disk_dist_registry, {})
await table.initialize()
@ -129,22 +158,98 @@ async def test_vectordbs_routing_table(cached_disk_dist_registry):
)
# Register multiple vector databases and verify listing
await table.register_vector_db(vector_db_id="test-vectordb", embedding_model="test-model")
await table.register_vector_db(vector_db_id="test-vectordb-2", embedding_model="test-model")
vdb_dict = {}
for i in range(n):
vdb_dict[i] = await table.register_vector_db(vector_db_id=f"test-vectordb-{i}", embedding_model="test-model")
vector_dbs = await table.list_vector_dbs()
assert len(vector_dbs.data) == 2
assert len(vector_dbs.data) == len(vdb_dict)
vector_db_ids = {v.identifier for v in vector_dbs.data}
assert "test-vectordb" in vector_db_ids
assert "test-vectordb-2" in vector_db_ids
await table.unregister_vector_db(vector_db_id="test-vectordb")
await table.unregister_vector_db(vector_db_id="test-vectordb-2")
for k in vdb_dict:
assert vdb_dict[k].identifier in vector_db_ids
for k in vdb_dict:
await table.unregister_vector_db(vector_db_id=vdb_dict[k].identifier)
vector_dbs = await table.list_vector_dbs()
assert len(vector_dbs.data) == 0
async def test_vector_db_and_vector_store_id_mapping(cached_disk_dist_registry):
n = 10
impl = VectorDBImpl()
table = VectorDBsRoutingTable({"test_provider": impl}, cached_disk_dist_registry, {})
await table.initialize()
m_table = ModelsRoutingTable({"test_provider": InferenceImpl()}, cached_disk_dist_registry, {})
await m_table.initialize()
await m_table.register_model(
model_id="test-model",
provider_id="test_provider",
metadata={"embedding_dimension": 128},
model_type=ModelType.embedding,
)
vdb_dict = {}
for i in range(n):
vdb_dict[i] = await table.register_vector_db(vector_db_id=f"test-vectordb-{i}", embedding_model="test-model")
vector_dbs = await table.list_vector_dbs()
vector_db_ids = {v.identifier for v in vector_dbs.data}
vector_stores = await impl.openai_list_vector_stores()
vector_store_ids = {v.id for v in vector_stores.data}
assert vector_db_ids == vector_store_ids, (
f"Vector DB IDs {vector_db_ids} don't match vector store IDs {vector_store_ids}"
)
for vector_store in vector_stores.data:
vector_db = await table.get_vector_db(vector_store.id)
assert vector_store.name == vector_db.vector_db_name, (
f"Vector store name {vector_store.name} doesn't match vector store ID {vector_store.id}"
)
for vector_db_id in vector_db_ids:
await table.unregister_vector_db(vector_db_id)
assert len((await table.list_vector_dbs()).data) == 0
async def test_vector_db_id_becomes_vector_store_name(cached_disk_dist_registry):
impl = VectorDBImpl()
table = VectorDBsRoutingTable({"test_provider": impl}, cached_disk_dist_registry, {})
await table.initialize()
m_table = ModelsRoutingTable({"test_provider": InferenceImpl()}, cached_disk_dist_registry, {})
await m_table.initialize()
await m_table.register_model(
model_id="test-model",
provider_id="test_provider",
metadata={"embedding_dimension": 128},
model_type=ModelType.embedding,
)
user_provided_id = "my-custom-vector-db"
await table.register_vector_db(vector_db_id=user_provided_id, embedding_model="test-model")
vector_stores = await impl.openai_list_vector_stores()
assert len(vector_stores.data) == 1
vector_store = vector_stores.data[0]
assert vector_store.name == user_provided_id
assert vector_store.id.startswith("vs_")
assert vector_store.id != user_provided_id
vector_dbs = await table.list_vector_dbs()
assert len(vector_dbs.data) == 1
assert vector_dbs.data[0].identifier == vector_store.id
await table.unregister_vector_db(vector_store.id)
async def test_openai_vector_stores_routing_table_roles(cached_disk_dist_registry):
impl = VectorDBImpl()
impl.openai_retrieve_vector_store = AsyncMock(return_value="OK")
@ -164,7 +269,8 @@ async def test_openai_vector_stores_routing_table_roles(cached_disk_dist_registr
authorized_user = User(principal="alice", attributes={"roles": [authorized_team]})
with request_provider_data_context({}, authorized_user):
_ = await table.register_vector_db(vector_db_id="vs1", embedding_model="test-model")
registered_vdb = await table.register_vector_db(vector_db_id="vs1", embedding_model="test-model")
authorized_table = registered_vdb.identifier # Use the actual generated ID
# Authorized reader
with request_provider_data_context({}, authorized_user):
@ -227,7 +333,8 @@ async def test_openai_vector_stores_routing_table_actions(cached_disk_dist_regis
)
with request_provider_data_context({}, admin_user):
await table.register_vector_db(vector_db_id=vector_db_id, embedding_model="test-model")
registered_vdb = await table.register_vector_db(vector_db_id=vector_db_id, embedding_model="test-model")
vector_db_id = registered_vdb.identifier # Use the actual generated ID
read_methods = [
(table.openai_retrieve_vector_store, (vector_db_id,), {}),

View file

@ -12,7 +12,7 @@ import yaml
from pydantic import BaseModel, Field, ValidationError
from llama_stack.core.datatypes import Api, Provider, StackRunConfig
from llama_stack.core.distribution import get_provider_registry
from llama_stack.core.distribution import INTERNAL_APIS, get_provider_registry, providable_apis
from llama_stack.providers.datatypes import ProviderSpec
@ -152,6 +152,24 @@ class TestProviderRegistry:
assert registry[Api.inference]["test_provider"].provider_type == "test_provider"
assert registry[Api.inference]["test_provider"].api == Api.inference
def test_internal_apis_excluded(self):
"""Test that internal APIs are excluded and APIs without provider registries are marked as internal."""
import importlib
apis = providable_apis()
for internal_api in INTERNAL_APIS:
assert internal_api not in apis, f"Internal API {internal_api} should not be in providable_apis"
for api in apis:
module_name = f"llama_stack.providers.registry.{api.name.lower()}"
try:
importlib.import_module(module_name)
except ImportError as err:
raise AssertionError(
f"API {api} is in providable_apis but has no provider registry module ({module_name})"
) from err
def test_external_remote_providers(self, api_directories, mock_providers, base_config, provider_spec_yaml):
"""Test loading external remote providers from YAML files."""
remote_dir, _ = api_directories

View file

@ -4,19 +4,20 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import sqlite3
import tempfile
from pathlib import Path
from unittest.mock import patch
from unittest.mock import AsyncMock, Mock, patch
import pytest
from openai import AsyncOpenAI
from openai import NOT_GIVEN, AsyncOpenAI
from openai.types.model import Model as OpenAIModel
# Import the real Pydantic response types instead of using Mocks
from llama_stack.apis.inference import (
OpenAIAssistantMessageParam,
OpenAIChatCompletion,
OpenAIChoice,
OpenAICompletion,
OpenAIEmbeddingData,
OpenAIEmbeddingsResponse,
OpenAIEmbeddingUsage,
@ -133,7 +134,6 @@ class TestInferenceRecording:
# Test directory creation
assert storage.test_dir.exists()
assert storage.responses_dir.exists()
assert storage.db_path.exists()
# Test storing and retrieving a recording
request_hash = "test_hash_123"
@ -147,15 +147,6 @@ class TestInferenceRecording:
storage.store_recording(request_hash, request_data, response_data)
# Verify SQLite record
with sqlite3.connect(storage.db_path) as conn:
result = conn.execute("SELECT * FROM recordings WHERE request_hash = ?", (request_hash,)).fetchone()
assert result is not None
assert result[0] == request_hash # request_hash
assert result[2] == "/v1/chat/completions" # endpoint
assert result[3] == "llama3.2:3b" # model
# Verify file storage and retrieval
retrieved = storage.find_recording(request_hash)
assert retrieved is not None
@ -164,68 +155,97 @@ class TestInferenceRecording:
async def test_recording_mode(self, temp_storage_dir, real_openai_chat_response):
"""Test that recording mode captures and stores responses."""
async def mock_create(*args, **kwargs):
return real_openai_chat_response
temp_storage_dir = temp_storage_dir / "test_recording_mode"
with patch("openai.resources.chat.completions.AsyncCompletions.create", side_effect=mock_create):
with inference_recording(mode=InferenceMode.RECORD, storage_dir=str(temp_storage_dir)):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
with inference_recording(mode=InferenceMode.RECORD, storage_dir=str(temp_storage_dir)):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
client.chat.completions._post = AsyncMock(return_value=real_openai_chat_response)
response = await client.chat.completions.create(
model="llama3.2:3b",
messages=[{"role": "user", "content": "Hello, how are you?"}],
temperature=0.7,
max_tokens=50,
)
response = await client.chat.completions.create(
model="llama3.2:3b",
messages=[{"role": "user", "content": "Hello, how are you?"}],
temperature=0.7,
max_tokens=50,
user=NOT_GIVEN,
)
# Verify the response was returned correctly
assert response.choices[0].message.content == "Hello! I'm doing well, thank you for asking."
# Verify the response was returned correctly
assert response.choices[0].message.content == "Hello! I'm doing well, thank you for asking."
client.chat.completions._post.assert_called_once()
# Verify recording was stored
storage = ResponseStorage(temp_storage_dir)
with sqlite3.connect(storage.db_path) as conn:
recordings = conn.execute("SELECT COUNT(*) FROM recordings").fetchone()[0]
assert recordings == 1
assert storage.responses_dir.exists()
async def test_replay_mode(self, temp_storage_dir, real_openai_chat_response):
"""Test that replay mode returns stored responses without making real calls."""
async def mock_create(*args, **kwargs):
return real_openai_chat_response
temp_storage_dir = temp_storage_dir / "test_replay_mode"
# First, record a response
with patch("openai.resources.chat.completions.AsyncCompletions.create", side_effect=mock_create):
with inference_recording(mode=InferenceMode.RECORD, storage_dir=str(temp_storage_dir)):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
with inference_recording(mode=InferenceMode.RECORD, storage_dir=str(temp_storage_dir)):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
client.chat.completions._post = AsyncMock(return_value=real_openai_chat_response)
response = await client.chat.completions.create(
model="llama3.2:3b",
messages=[{"role": "user", "content": "Hello, how are you?"}],
temperature=0.7,
max_tokens=50,
)
response = await client.chat.completions.create(
model="llama3.2:3b",
messages=[{"role": "user", "content": "Hello, how are you?"}],
temperature=0.7,
max_tokens=50,
user=NOT_GIVEN,
)
client.chat.completions._post.assert_called_once()
# Now test replay mode - should not call the original method
with patch("openai.resources.chat.completions.AsyncCompletions.create") as mock_create_patch:
with inference_recording(mode=InferenceMode.REPLAY, storage_dir=str(temp_storage_dir)):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
with inference_recording(mode=InferenceMode.REPLAY, storage_dir=str(temp_storage_dir)):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
client.chat.completions._post = AsyncMock(return_value=real_openai_chat_response)
response = await client.chat.completions.create(
model="llama3.2:3b",
messages=[{"role": "user", "content": "Hello, how are you?"}],
temperature=0.7,
max_tokens=50,
)
response = await client.chat.completions.create(
model="llama3.2:3b",
messages=[{"role": "user", "content": "Hello, how are you?"}],
temperature=0.7,
max_tokens=50,
)
# Verify we got the recorded response
assert response.choices[0].message.content == "Hello! I'm doing well, thank you for asking."
# Verify we got the recorded response
assert response.choices[0].message.content == "Hello! I'm doing well, thank you for asking."
# Verify the original method was NOT called
mock_create_patch.assert_not_called()
# Verify the original method was NOT called
client.chat.completions._post.assert_not_called()
async def test_replay_mode_models(self, temp_storage_dir):
"""Test that replay mode returns stored responses without making real model listing calls."""
async def _async_iterator(models):
for model in models:
yield model
models = [
OpenAIModel(id="foo", created=1, object="model", owned_by="test"),
OpenAIModel(id="bar", created=2, object="model", owned_by="test"),
]
expected_ids = {m.id for m in models}
temp_storage_dir = temp_storage_dir / "test_replay_mode_models"
# baseline - mock works without recording
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
client.models._get_api_list = Mock(return_value=_async_iterator(models))
assert {m.id async for m in client.models.list()} == expected_ids
client.models._get_api_list.assert_called_once()
# record the call
with inference_recording(mode=InferenceMode.RECORD, storage_dir=temp_storage_dir):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
client.models._get_api_list = Mock(return_value=_async_iterator(models))
assert {m.id async for m in client.models.list()} == expected_ids
client.models._get_api_list.assert_called_once()
# replay the call
with inference_recording(mode=InferenceMode.REPLAY, storage_dir=temp_storage_dir):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
client.models._get_api_list = Mock(return_value=_async_iterator(models))
assert {m.id async for m in client.models.list()} == expected_ids
client.models._get_api_list.assert_not_called()
async def test_replay_missing_recording(self, temp_storage_dir):
"""Test that replay mode fails when no recording is found."""
@ -242,36 +262,110 @@ class TestInferenceRecording:
async def test_embeddings_recording(self, temp_storage_dir, real_embeddings_response):
"""Test recording and replay of embeddings calls."""
async def mock_create(*args, **kwargs):
return real_embeddings_response
# baseline - mock works without recording
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
client.embeddings._post = AsyncMock(return_value=real_embeddings_response)
response = await client.embeddings.create(
model=real_embeddings_response.model,
input=["Hello world", "Test embedding"],
encoding_format=NOT_GIVEN,
)
assert len(response.data) == 2
assert response.data[0].embedding == [0.1, 0.2, 0.3]
client.embeddings._post.assert_called_once()
temp_storage_dir = temp_storage_dir / "test_embeddings_recording"
# Record
with patch("openai.resources.embeddings.AsyncEmbeddings.create", side_effect=mock_create):
with inference_recording(mode=InferenceMode.RECORD, storage_dir=str(temp_storage_dir)):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
with inference_recording(mode=InferenceMode.RECORD, storage_dir=str(temp_storage_dir)):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
client.embeddings._post = AsyncMock(return_value=real_embeddings_response)
response = await client.embeddings.create(
model="nomic-embed-text", input=["Hello world", "Test embedding"]
)
response = await client.embeddings.create(
model=real_embeddings_response.model,
input=["Hello world", "Test embedding"],
encoding_format=NOT_GIVEN,
dimensions=NOT_GIVEN,
user=NOT_GIVEN,
)
assert len(response.data) == 2
assert len(response.data) == 2
# Replay
with patch("openai.resources.embeddings.AsyncEmbeddings.create") as mock_create_patch:
with inference_recording(mode=InferenceMode.REPLAY, storage_dir=str(temp_storage_dir)):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
with inference_recording(mode=InferenceMode.REPLAY, storage_dir=str(temp_storage_dir)):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
client.embeddings._post = AsyncMock(return_value=real_embeddings_response)
response = await client.embeddings.create(
model="nomic-embed-text", input=["Hello world", "Test embedding"]
)
response = await client.embeddings.create(
model=real_embeddings_response.model,
input=["Hello world", "Test embedding"],
)
# Verify we got the recorded response
assert len(response.data) == 2
assert response.data[0].embedding == [0.1, 0.2, 0.3]
# Verify we got the recorded response
assert len(response.data) == 2
assert response.data[0].embedding == [0.1, 0.2, 0.3]
# Verify original method was not called
mock_create_patch.assert_not_called()
# Verify original method was not called
client.embeddings._post.assert_not_called()
async def test_completions_recording(self, temp_storage_dir):
real_completions_response = OpenAICompletion(
id="test_completion",
object="text_completion",
created=1234567890,
model="llama3.2:3b",
choices=[
{
"text": "Hello! I'm doing well, thank you for asking.",
"index": 0,
"logprobs": None,
"finish_reason": "stop",
}
],
)
temp_storage_dir = temp_storage_dir / "test_completions_recording"
# baseline - mock works without recording
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
client.completions._post = AsyncMock(return_value=real_completions_response)
response = await client.completions.create(
model=real_completions_response.model,
prompt="Hello, how are you?",
temperature=0.7,
max_tokens=50,
user=NOT_GIVEN,
)
assert response.choices[0].text == real_completions_response.choices[0].text
client.completions._post.assert_called_once()
# Record
with inference_recording(mode=InferenceMode.RECORD, storage_dir=str(temp_storage_dir)):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
client.completions._post = AsyncMock(return_value=real_completions_response)
response = await client.completions.create(
model=real_completions_response.model,
prompt="Hello, how are you?",
temperature=0.7,
max_tokens=50,
user=NOT_GIVEN,
)
assert response.choices[0].text == real_completions_response.choices[0].text
client.completions._post.assert_called_once()
# Replay
with inference_recording(mode=InferenceMode.REPLAY, storage_dir=str(temp_storage_dir)):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
client.completions._post = AsyncMock(return_value=real_completions_response)
response = await client.completions.create(
model=real_completions_response.model,
prompt="Hello, how are you?",
temperature=0.7,
max_tokens=50,
)
assert response.choices[0].text == real_completions_response.choices[0].text
client.completions._post.assert_not_called()
async def test_live_mode(self, real_openai_chat_response):
"""Test that live mode passes through to original methods."""
@ -280,7 +374,7 @@ class TestInferenceRecording:
return real_openai_chat_response
with patch("openai.resources.chat.completions.AsyncCompletions.create", side_effect=mock_create):
with inference_recording(mode=InferenceMode.LIVE):
with inference_recording(mode=InferenceMode.LIVE, storage_dir="foo"):
client = AsyncOpenAI(base_url="http://localhost:11434/v1", api_key="test")
response = await client.chat.completions.create(

View file

@ -0,0 +1,5 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

View file

@ -0,0 +1,30 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import random
import pytest
from llama_stack.core.prompts.prompts import PromptServiceConfig, PromptServiceImpl
from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig
@pytest.fixture
async def temp_prompt_store(tmp_path_factory):
unique_id = f"prompt_store_{random.randint(1, 1000000)}"
temp_dir = tmp_path_factory.getbasetemp()
db_path = str(temp_dir / f"{unique_id}.db")
from llama_stack.core.datatypes import StackRunConfig
from llama_stack.providers.utils.kvstore import kvstore_impl
mock_run_config = StackRunConfig(image_name="test-distribution", apis=[], providers={})
config = PromptServiceConfig(run_config=mock_run_config)
store = PromptServiceImpl(config, deps={})
store.kvstore = await kvstore_impl(SqliteKVStoreConfig(db_path=db_path))
yield store

View file

@ -0,0 +1,144 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import pytest
class TestPrompts:
async def test_create_and_get_prompt(self, temp_prompt_store):
prompt = await temp_prompt_store.create_prompt("Hello world!", ["name"])
assert prompt.prompt == "Hello world!"
assert prompt.version == 1
assert prompt.prompt_id.startswith("pmpt_")
assert prompt.variables == ["name"]
retrieved = await temp_prompt_store.get_prompt(prompt.prompt_id)
assert retrieved.prompt_id == prompt.prompt_id
assert retrieved.prompt == prompt.prompt
async def test_update_prompt(self, temp_prompt_store):
prompt = await temp_prompt_store.create_prompt("Original")
updated = await temp_prompt_store.update_prompt(prompt.prompt_id, "Updated", 1, ["v"])
assert updated.version == 2
assert updated.prompt == "Updated"
async def test_update_prompt_with_version(self, temp_prompt_store):
version_for_update = 1
prompt = await temp_prompt_store.create_prompt("Original")
assert prompt.version == 1
prompt = await temp_prompt_store.update_prompt(prompt.prompt_id, "Updated", version_for_update, ["v"])
assert prompt.version == 2
with pytest.raises(ValueError):
# now this is a stale version
await temp_prompt_store.update_prompt(prompt.prompt_id, "Another Update", version_for_update, ["v"])
with pytest.raises(ValueError):
# this version does not exist
await temp_prompt_store.update_prompt(prompt.prompt_id, "Another Update", 99, ["v"])
async def test_delete_prompt(self, temp_prompt_store):
prompt = await temp_prompt_store.create_prompt("to be deleted")
await temp_prompt_store.delete_prompt(prompt.prompt_id)
with pytest.raises(ValueError):
await temp_prompt_store.get_prompt(prompt.prompt_id)
async def test_list_prompts(self, temp_prompt_store):
response = await temp_prompt_store.list_prompts()
assert response.data == []
await temp_prompt_store.create_prompt("first")
await temp_prompt_store.create_prompt("second")
response = await temp_prompt_store.list_prompts()
assert len(response.data) == 2
async def test_version(self, temp_prompt_store):
prompt = await temp_prompt_store.create_prompt("V1")
await temp_prompt_store.update_prompt(prompt.prompt_id, "V2", 1)
v1 = await temp_prompt_store.get_prompt(prompt.prompt_id, version=1)
assert v1.version == 1 and v1.prompt == "V1"
latest = await temp_prompt_store.get_prompt(prompt.prompt_id)
assert latest.version == 2 and latest.prompt == "V2"
async def test_set_default_version(self, temp_prompt_store):
prompt0 = await temp_prompt_store.create_prompt("V1")
prompt1 = await temp_prompt_store.update_prompt(prompt0.prompt_id, "V2", 1)
assert (await temp_prompt_store.get_prompt(prompt0.prompt_id)).version == 2
prompt_default = await temp_prompt_store.set_default_version(prompt0.prompt_id, 1)
assert (await temp_prompt_store.get_prompt(prompt0.prompt_id)).version == 1
assert prompt_default.version == 1
prompt2 = await temp_prompt_store.update_prompt(prompt0.prompt_id, "V3", prompt1.version)
assert prompt2.version == 3
async def test_prompt_id_generation_and_validation(self, temp_prompt_store):
prompt = await temp_prompt_store.create_prompt("Test")
assert prompt.prompt_id.startswith("pmpt_")
assert len(prompt.prompt_id) == 53
with pytest.raises(ValueError):
await temp_prompt_store.get_prompt("invalid_id")
async def test_list_shows_default_versions(self, temp_prompt_store):
prompt = await temp_prompt_store.create_prompt("V1")
await temp_prompt_store.update_prompt(prompt.prompt_id, "V2", 1)
await temp_prompt_store.update_prompt(prompt.prompt_id, "V3", 2)
response = await temp_prompt_store.list_prompts()
listed_prompt = response.data[0]
assert listed_prompt.version == 3 and listed_prompt.prompt == "V3"
await temp_prompt_store.set_default_version(prompt.prompt_id, 1)
response = await temp_prompt_store.list_prompts()
listed_prompt = response.data[0]
assert listed_prompt.version == 1 and listed_prompt.prompt == "V1"
assert not (await temp_prompt_store.get_prompt(prompt.prompt_id, 3)).is_default
async def test_get_all_prompt_versions(self, temp_prompt_store):
prompt = await temp_prompt_store.create_prompt("V1")
await temp_prompt_store.update_prompt(prompt.prompt_id, "V2", 1)
await temp_prompt_store.update_prompt(prompt.prompt_id, "V3", 2)
versions = (await temp_prompt_store.list_prompt_versions(prompt.prompt_id)).data
assert len(versions) == 3
assert [v.version for v in versions] == [1, 2, 3]
assert [v.is_default for v in versions] == [False, False, True]
await temp_prompt_store.set_default_version(prompt.prompt_id, 2)
versions = (await temp_prompt_store.list_prompt_versions(prompt.prompt_id)).data
assert [v.is_default for v in versions] == [False, True, False]
with pytest.raises(ValueError):
await temp_prompt_store.list_prompt_versions("nonexistent")
async def test_prompt_variable_validation(self, temp_prompt_store):
prompt = await temp_prompt_store.create_prompt("Hello {{ name }}, you live in {{ city }}!", ["name", "city"])
assert prompt.variables == ["name", "city"]
prompt_no_vars = await temp_prompt_store.create_prompt("Hello world!", [])
assert prompt_no_vars.variables == []
with pytest.raises(ValueError, match="undeclared variables"):
await temp_prompt_store.create_prompt("Hello {{ name }}, invalid {{ unknown }}!", ["name"])
async def test_update_prompt_set_as_default_behavior(self, temp_prompt_store):
prompt = await temp_prompt_store.create_prompt("V1")
assert (await temp_prompt_store.get_prompt(prompt.prompt_id)).version == 1
prompt_v2 = await temp_prompt_store.update_prompt(prompt.prompt_id, "V2", 1, [], set_as_default=True)
assert prompt_v2.version == 2
assert (await temp_prompt_store.get_prompt(prompt.prompt_id)).version == 2
prompt_v3 = await temp_prompt_store.update_prompt(prompt.prompt_id, "V3", 2, [], set_as_default=False)
assert prompt_v3.version == 3
assert (await temp_prompt_store.get_prompt(prompt.prompt_id)).version == 2

View file

@ -46,7 +46,8 @@ The tests are categorized and outlined below, keep this updated:
* test_validate_input_url_mismatch (negative)
* test_validate_input_multiple_errors_per_request (negative)
* test_validate_input_invalid_request_format (negative)
* test_validate_input_missing_parameters (parametrized negative - custom_id, method, url, body, model, messages missing validation)
* test_validate_input_missing_parameters_chat_completions (parametrized negative - custom_id, method, url, body, model, messages missing validation for chat/completions)
* test_validate_input_missing_parameters_completions (parametrized negative - custom_id, method, url, body, model, prompt missing validation for completions)
* test_validate_input_invalid_parameter_types (parametrized negative - custom_id, url, method, body, model, messages type validation)
The tests use temporary SQLite databases for isolation and mock external
@ -213,7 +214,6 @@ class TestReferenceBatchesImpl:
"endpoint",
[
"/v1/embeddings",
"/v1/completions",
"/v1/invalid/endpoint",
"",
],
@ -499,8 +499,10 @@ class TestReferenceBatchesImpl:
("messages", "body.messages", "invalid_request", "Messages parameter is required"),
],
)
async def test_validate_input_missing_parameters(self, provider, param_name, param_path, error_code, error_message):
"""Test _validate_input when file contains request with missing required parameters."""
async def test_validate_input_missing_parameters_chat_completions(
self, provider, param_name, param_path, error_code, error_message
):
"""Test _validate_input when file contains request with missing required parameters for chat completions."""
provider.files_api.openai_retrieve_file = AsyncMock()
mock_response = MagicMock()
@ -541,6 +543,61 @@ class TestReferenceBatchesImpl:
assert errors[0].message == error_message
assert errors[0].param == param_path
@pytest.mark.parametrize(
"param_name,param_path,error_code,error_message",
[
("custom_id", "custom_id", "missing_required_parameter", "Missing required parameter: custom_id"),
("method", "method", "missing_required_parameter", "Missing required parameter: method"),
("url", "url", "missing_required_parameter", "Missing required parameter: url"),
("body", "body", "missing_required_parameter", "Missing required parameter: body"),
("model", "body.model", "invalid_request", "Model parameter is required"),
("prompt", "body.prompt", "invalid_request", "Prompt parameter is required"),
],
)
async def test_validate_input_missing_parameters_completions(
self, provider, param_name, param_path, error_code, error_message
):
"""Test _validate_input when file contains request with missing required parameters for text completions."""
provider.files_api.openai_retrieve_file = AsyncMock()
mock_response = MagicMock()
base_request = {
"custom_id": "req-1",
"method": "POST",
"url": "/v1/completions",
"body": {"model": "test-model", "prompt": "Hello"},
}
# Remove the specific parameter being tested
if "." in param_path:
top_level, nested_param = param_path.split(".", 1)
del base_request[top_level][nested_param]
else:
del base_request[param_name]
mock_response.body = json.dumps(base_request).encode()
provider.files_api.openai_retrieve_file_content = AsyncMock(return_value=mock_response)
batch = BatchObject(
id="batch_test",
object="batch",
endpoint="/v1/completions",
input_file_id=f"missing_{param_name}_file",
completion_window="24h",
status="validating",
created_at=1234567890,
)
errors, requests = await provider._validate_input(batch)
assert len(errors) == 1
assert len(requests) == 0
assert errors[0].code == error_code
assert errors[0].line == 1
assert errors[0].message == error_message
assert errors[0].param == param_path
async def test_validate_input_url_mismatch(self, provider):
"""Test _validate_input when file contains request with URL that doesn't match batch endpoint."""
provider.files_api.openai_retrieve_file = AsyncMock()

View file

@ -0,0 +1,62 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import boto3
import pytest
from moto import mock_aws
from llama_stack.providers.remote.files.s3 import S3FilesImplConfig, get_adapter_impl
from llama_stack.providers.utils.sqlstore.sqlstore import SqliteSqlStoreConfig
class MockUploadFile:
def __init__(self, content: bytes, filename: str, content_type: str = "text/plain"):
self.content = content
self.filename = filename
self.content_type = content_type
async def read(self):
return self.content
@pytest.fixture
def sample_text_file():
content = b"Hello, this is a test file for the S3 Files API!"
return MockUploadFile(content, "sample_text_file-0.txt")
@pytest.fixture
def sample_text_file2():
content = b"Hello, this is a second test file for the S3 Files API!"
return MockUploadFile(content, "sample_text_file-1.txt")
@pytest.fixture
def s3_config(tmp_path):
db_path = tmp_path / "s3_files_metadata.db"
return S3FilesImplConfig(
bucket_name=f"test-bucket-{tmp_path.name}",
region="not-a-region",
auto_create_bucket=True,
metadata_store=SqliteSqlStoreConfig(db_path=db_path.as_posix()),
)
@pytest.fixture
def s3_client():
# we use `with mock_aws()` because @mock_aws decorator does not support
# being a generator
with mock_aws():
# must yield or the mock will be reset before it is used
yield boto3.client("s3")
@pytest.fixture
async def s3_provider(s3_config, s3_client): # s3_client provides the moto mock, don't remove it
provider = await get_adapter_impl(s3_config, {})
yield provider
await provider.shutdown()

View file

@ -6,63 +6,11 @@
from unittest.mock import patch
import boto3
import pytest
from botocore.exceptions import ClientError
from moto import mock_aws
from llama_stack.apis.common.errors import ResourceNotFoundError
from llama_stack.apis.files import OpenAIFilePurpose
from llama_stack.providers.remote.files.s3 import (
S3FilesImplConfig,
get_adapter_impl,
)
from llama_stack.providers.utils.sqlstore.sqlstore import SqliteSqlStoreConfig
class MockUploadFile:
def __init__(self, content: bytes, filename: str, content_type: str = "text/plain"):
self.content = content
self.filename = filename
self.content_type = content_type
async def read(self):
return self.content
@pytest.fixture
def s3_config(tmp_path):
db_path = tmp_path / "s3_files_metadata.db"
return S3FilesImplConfig(
bucket_name="test-bucket",
region="not-a-region",
auto_create_bucket=True,
metadata_store=SqliteSqlStoreConfig(db_path=db_path.as_posix()),
)
@pytest.fixture
def s3_client():
"""Create a mocked S3 client for testing."""
# we use `with mock_aws()` because @mock_aws decorator does not support being a generator
with mock_aws():
# must yield or the mock will be reset before it is used
yield boto3.client("s3")
@pytest.fixture
async def s3_provider(s3_config, s3_client):
"""Create an S3 files provider with mocked S3 for testing."""
provider = await get_adapter_impl(s3_config, {})
yield provider
await provider.shutdown()
@pytest.fixture
def sample_text_file():
content = b"Hello, this is a test file for the S3 Files API!"
return MockUploadFile(content, "sample_text_file.txt")
class TestS3FilesImpl:
@ -143,7 +91,7 @@ class TestS3FilesImpl:
s3_client.head_object(Bucket=s3_config.bucket_name, Key=uploaded.id)
assert exc_info.value.response["Error"]["Code"] == "404"
async def test_list_files(self, s3_provider, sample_text_file):
async def test_list_files(self, s3_provider, sample_text_file, sample_text_file2):
"""Test listing files after uploading some."""
sample_text_file.filename = "test_list_files_with_content_file1"
file1 = await s3_provider.openai_upload_file(
@ -151,9 +99,9 @@ class TestS3FilesImpl:
purpose=OpenAIFilePurpose.ASSISTANTS,
)
file2_content = MockUploadFile(b"Second file content", "test_list_files_with_content_file2")
sample_text_file2.filename = "test_list_files_with_content_file2"
file2 = await s3_provider.openai_upload_file(
file=file2_content,
file=sample_text_file2,
purpose=OpenAIFilePurpose.BATCH,
)
@ -164,7 +112,7 @@ class TestS3FilesImpl:
assert file1.id in file_ids
assert file2.id in file_ids
async def test_list_files_with_purpose_filter(self, s3_provider, sample_text_file):
async def test_list_files_with_purpose_filter(self, s3_provider, sample_text_file, sample_text_file2):
"""Test listing files with purpose filter."""
sample_text_file.filename = "test_list_files_with_purpose_filter_file1"
file1 = await s3_provider.openai_upload_file(
@ -172,9 +120,9 @@ class TestS3FilesImpl:
purpose=OpenAIFilePurpose.ASSISTANTS,
)
file2_content = MockUploadFile(b"Batch file content", "test_list_files_with_purpose_filter_file2")
sample_text_file2.filename = "test_list_files_with_purpose_filter_file2"
await s3_provider.openai_upload_file(
file=file2_content,
file=sample_text_file2,
purpose=OpenAIFilePurpose.BATCH,
)
@ -249,3 +197,104 @@ class TestS3FilesImpl:
files_list = await s3_provider.openai_list_files()
assert len(files_list.data) == 0, "No file metadata should remain after failed upload"
@pytest.mark.parametrize("purpose", [p for p in OpenAIFilePurpose if p != OpenAIFilePurpose.BATCH])
async def test_default_no_expiration(self, s3_provider, sample_text_file, purpose):
"""Test that by default files have no expiration."""
sample_text_file.filename = "test_default_no_expiration"
uploaded = await s3_provider.openai_upload_file(
file=sample_text_file,
purpose=purpose,
)
assert uploaded.expires_at is None, "By default files should have no expiration"
async def test_default_batch_expiration(self, s3_provider, sample_text_file):
"""Test that by default batch files have an expiration."""
sample_text_file.filename = "test_default_batch_an_expiration"
uploaded = await s3_provider.openai_upload_file(
file=sample_text_file,
purpose=OpenAIFilePurpose.BATCH,
)
assert uploaded.expires_at is not None, "By default batch files should have an expiration"
thirty_days_seconds = 30 * 24 * 3600
assert uploaded.expires_at == uploaded.created_at + thirty_days_seconds, (
"Batch default expiration should be 30 days"
)
async def test_expired_file_is_unavailable(self, s3_provider, sample_text_file, s3_config, s3_client):
"""Uploaded file that has expired should not be listed or retrievable/deletable."""
with patch.object(s3_provider, "_now") as mock_now: # control time
two_hours = 2 * 60 * 60
mock_now.return_value = 0
sample_text_file.filename = "test_expired_file"
uploaded = await s3_provider.openai_upload_file(
file=sample_text_file,
purpose=OpenAIFilePurpose.ASSISTANTS,
expires_after_anchor="created_at",
expires_after_seconds=two_hours,
)
mock_now.return_value = two_hours * 2 # fast forward 4 hours
listed = await s3_provider.openai_list_files()
assert uploaded.id not in [f.id for f in listed.data]
with pytest.raises(ResourceNotFoundError, match="not found"):
await s3_provider.openai_retrieve_file(uploaded.id)
with pytest.raises(ResourceNotFoundError, match="not found"):
await s3_provider.openai_retrieve_file_content(uploaded.id)
with pytest.raises(ResourceNotFoundError, match="not found"):
await s3_provider.openai_delete_file(uploaded.id)
with pytest.raises(ClientError) as exc_info:
s3_client.head_object(Bucket=s3_config.bucket_name, Key=uploaded.id)
assert exc_info.value.response["Error"]["Code"] == "404"
with pytest.raises(ResourceNotFoundError, match="not found"):
await s3_provider._get_file(uploaded.id, return_expired=True)
async def test_unsupported_expires_after_anchor(self, s3_provider, sample_text_file):
"""Unsupported anchor value should raise ValueError."""
sample_text_file.filename = "test_unsupported_expires_after_anchor"
with pytest.raises(ValueError, match="Input should be 'created_at'"):
await s3_provider.openai_upload_file(
file=sample_text_file,
purpose=OpenAIFilePurpose.ASSISTANTS,
expires_after_anchor="now",
expires_after_seconds=3600,
)
async def test_nonint_expires_after_seconds(self, s3_provider, sample_text_file):
"""Non-integer seconds in expires_after should raise ValueError."""
sample_text_file.filename = "test_nonint_expires_after_seconds"
with pytest.raises(ValueError, match="should be a valid integer"):
await s3_provider.openai_upload_file(
file=sample_text_file,
purpose=OpenAIFilePurpose.ASSISTANTS,
expires_after_anchor="created_at",
expires_after_seconds="many",
)
async def test_expires_after_seconds_out_of_bounds(self, s3_provider, sample_text_file):
"""Seconds outside allowed range should raise ValueError."""
with pytest.raises(ValueError, match="greater than or equal to 3600"):
await s3_provider.openai_upload_file(
file=sample_text_file,
purpose=OpenAIFilePurpose.ASSISTANTS,
expires_after_anchor="created_at",
expires_after_seconds=3599,
)
with pytest.raises(ValueError, match="less than or equal to 2592000"):
await s3_provider.openai_upload_file(
file=sample_text_file,
purpose=OpenAIFilePurpose.ASSISTANTS,
expires_after_anchor="created_at",
expires_after_seconds=2592001,
)

View file

@ -0,0 +1,89 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from unittest.mock import patch
import pytest
from llama_stack.apis.common.errors import ResourceNotFoundError
from llama_stack.apis.files import OpenAIFilePurpose
from llama_stack.core.datatypes import User
from llama_stack.providers.remote.files.s3.files import S3FilesImpl
async def test_listing_hides_other_users_file(s3_provider, sample_text_file):
"""Listing should not show files uploaded by other users."""
user_a = User("user-a", {"roles": ["team-a"]})
user_b = User("user-b", {"roles": ["team-b"]})
with patch("llama_stack.providers.utils.sqlstore.authorized_sqlstore.get_authenticated_user") as mock_get_user:
mock_get_user.return_value = user_a
uploaded = await s3_provider.openai_upload_file(file=sample_text_file, purpose=OpenAIFilePurpose.ASSISTANTS)
with patch("llama_stack.providers.utils.sqlstore.authorized_sqlstore.get_authenticated_user") as mock_get_user:
mock_get_user.return_value = user_b
listed = await s3_provider.openai_list_files()
assert all(f.id != uploaded.id for f in listed.data)
@pytest.mark.parametrize(
"op",
[S3FilesImpl.openai_retrieve_file, S3FilesImpl.openai_retrieve_file_content, S3FilesImpl.openai_delete_file],
ids=["retrieve", "content", "delete"],
)
async def test_cannot_access_other_user_file(s3_provider, sample_text_file, op):
"""Operations (metadata/content/delete) on another user's file should raise ResourceNotFoundError.
`op` is an async callable (provider, file_id) -> awaits the requested operation.
"""
user_a = User("user-a", {"roles": ["team-a"]})
user_b = User("user-b", {"roles": ["team-b"]})
with patch("llama_stack.providers.utils.sqlstore.authorized_sqlstore.get_authenticated_user") as mock_get_user:
mock_get_user.return_value = user_a
uploaded = await s3_provider.openai_upload_file(file=sample_text_file, purpose=OpenAIFilePurpose.ASSISTANTS)
with patch("llama_stack.providers.utils.sqlstore.authorized_sqlstore.get_authenticated_user") as mock_get_user:
mock_get_user.return_value = user_b
with pytest.raises(ResourceNotFoundError):
await op(s3_provider, uploaded.id)
async def test_shared_role_allows_listing(s3_provider, sample_text_file):
"""Listing should show files uploaded by other users when roles are shared."""
user_a = User("user-a", {"roles": ["shared-role"]})
user_b = User("user-b", {"roles": ["shared-role"]})
with patch("llama_stack.providers.utils.sqlstore.authorized_sqlstore.get_authenticated_user") as mock_get_user:
mock_get_user.return_value = user_a
uploaded = await s3_provider.openai_upload_file(file=sample_text_file, purpose=OpenAIFilePurpose.ASSISTANTS)
with patch("llama_stack.providers.utils.sqlstore.authorized_sqlstore.get_authenticated_user") as mock_get_user:
mock_get_user.return_value = user_b
listed = await s3_provider.openai_list_files()
assert any(f.id == uploaded.id for f in listed.data)
@pytest.mark.parametrize(
"op",
[S3FilesImpl.openai_retrieve_file, S3FilesImpl.openai_retrieve_file_content, S3FilesImpl.openai_delete_file],
ids=["retrieve", "content", "delete"],
)
async def test_shared_role_allows_access(s3_provider, sample_text_file, op):
"""Operations (metadata/content/delete) on another user's file should succeed when users share a role.
`op` is an async callable (provider, file_id) -> awaits the requested operation.
"""
user_x = User("user-x", {"roles": ["shared-role"]})
user_y = User("user-y", {"roles": ["shared-role"]})
with patch("llama_stack.providers.utils.sqlstore.authorized_sqlstore.get_authenticated_user") as mock_get_user:
mock_get_user.return_value = user_x
uploaded = await s3_provider.openai_upload_file(file=sample_text_file, purpose=OpenAIFilePurpose.ASSISTANTS)
with patch("llama_stack.providers.utils.sqlstore.authorized_sqlstore.get_authenticated_user") as mock_get_user:
mock_get_user.return_value = user_y
await op(s3_provider, uploaded.id)

View file

@ -0,0 +1,63 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import os
from unittest.mock import patch
from llama_stack.providers.utils.bedrock.config import BedrockBaseConfig
class TestBedrockBaseConfig:
def test_defaults_work_without_env_vars(self):
with patch.dict(os.environ, {}, clear=True):
config = BedrockBaseConfig()
# Basic creds should be None
assert config.aws_access_key_id is None
assert config.aws_secret_access_key is None
assert config.region_name is None
# Timeouts get defaults
assert config.connect_timeout == 60.0
assert config.read_timeout == 60.0
assert config.session_ttl == 3600
def test_env_vars_get_picked_up(self):
env_vars = {
"AWS_ACCESS_KEY_ID": "AKIATEST123",
"AWS_SECRET_ACCESS_KEY": "secret123",
"AWS_DEFAULT_REGION": "us-west-2",
"AWS_MAX_ATTEMPTS": "5",
"AWS_RETRY_MODE": "adaptive",
"AWS_CONNECT_TIMEOUT": "30",
}
with patch.dict(os.environ, env_vars, clear=True):
config = BedrockBaseConfig()
assert config.aws_access_key_id == "AKIATEST123"
assert config.aws_secret_access_key == "secret123"
assert config.region_name == "us-west-2"
assert config.total_max_attempts == 5
assert config.retry_mode == "adaptive"
assert config.connect_timeout == 30.0
def test_partial_env_setup(self):
# Just setting one timeout var
with patch.dict(os.environ, {"AWS_CONNECT_TIMEOUT": "120"}, clear=True):
config = BedrockBaseConfig()
assert config.connect_timeout == 120.0
assert config.read_timeout == 60.0 # still default
assert config.aws_access_key_id is None
def test_bad_max_attempts_breaks(self):
with patch.dict(os.environ, {"AWS_MAX_ATTEMPTS": "not_a_number"}, clear=True):
try:
BedrockBaseConfig()
raise AssertionError("Should have failed on bad int conversion")
except ValueError:
pass # expected

View file

@ -33,8 +33,7 @@ def test_groq_provider_openai_client_caching():
with request_provider_data_context(
{"x-llamastack-provider-data": json.dumps({inference_adapter.provider_data_api_key_field: api_key})}
):
openai_client = inference_adapter._get_openai_client()
assert openai_client.api_key == api_key
assert inference_adapter.client.api_key == api_key
def test_openai_provider_openai_client_caching():

View file

@ -6,19 +6,15 @@
import asyncio
import json
import logging # allow-direct-logging
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, MagicMock, PropertyMock, patch
import pytest
from openai.types.chat.chat_completion_chunk import (
ChatCompletionChunk as OpenAIChatCompletionChunk,
)
from openai.types.chat.chat_completion_chunk import (
Choice as OpenAIChoice,
Choice as OpenAIChoiceChunk,
)
from openai.types.chat.chat_completion_chunk import (
ChoiceDelta as OpenAIChoiceDelta,
@ -35,6 +31,9 @@ from llama_stack.apis.inference import (
ChatCompletionRequest,
ChatCompletionResponseEventType,
CompletionMessage,
OpenAIAssistantMessageParam,
OpenAIChatCompletion,
OpenAIChoice,
SystemMessage,
ToolChoice,
ToolConfig,
@ -61,41 +60,6 @@ from llama_stack.providers.remote.inference.vllm.vllm import (
# -v -s --tb=short --disable-warnings
class MockInferenceAdapterWithSleep:
def __init__(self, sleep_time: int, response: dict[str, Any]):
self.httpd = None
class DelayedRequestHandler(BaseHTTPRequestHandler):
# ruff: noqa: N802
def do_POST(self):
time.sleep(sleep_time)
response_body = json.dumps(response).encode("utf-8")
self.send_response(code=200)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", len(response_body))
self.end_headers()
self.wfile.write(response_body)
self.request_handler = DelayedRequestHandler
def __enter__(self):
httpd = HTTPServer(("", 0), self.request_handler)
self.httpd = httpd
host, port = httpd.server_address
httpd_thread = threading.Thread(target=httpd.serve_forever)
httpd_thread.daemon = True # stop server if this thread terminates
httpd_thread.start()
config = VLLMInferenceAdapterConfig(url=f"http://{host}:{port}")
inference_adapter = VLLMInferenceAdapter(config)
return inference_adapter
def __exit__(self, _exc_type, _exc_value, _traceback):
if self.httpd:
self.httpd.shutdown()
self.httpd.server_close()
@pytest.fixture(scope="module")
def mock_openai_models_list():
with patch("openai.resources.models.AsyncModels.list", new_callable=AsyncMock) as mock_list:
@ -150,10 +114,12 @@ async def test_tool_call_response(vllm_inference_adapter):
"""Verify that tool call arguments from a CompletionMessage are correctly converted
into the expected JSON format."""
# Patch the call to vllm so we can inspect the arguments sent were correct
with patch.object(
vllm_inference_adapter.client.chat.completions, "create", new_callable=AsyncMock
) as mock_nonstream_completion:
# Patch the client property to avoid instantiating a real AsyncOpenAI client
with patch.object(VLLMInferenceAdapter, "client", new_callable=PropertyMock) as mock_create_client:
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock()
mock_create_client.return_value = mock_client
messages = [
SystemMessage(content="You are a helpful assistant"),
UserMessage(content="How many?"),
@ -179,7 +145,7 @@ async def test_tool_call_response(vllm_inference_adapter):
tool_config=ToolConfig(tool_choice=ToolChoice.auto),
)
assert mock_nonstream_completion.call_args.kwargs["messages"][2]["tool_calls"] == [
assert mock_client.chat.completions.create.call_args.kwargs["messages"][2]["tool_calls"] == [
{
"id": "foo",
"type": "function",
@ -199,7 +165,7 @@ async def test_tool_call_delta_empty_tool_call_buf():
async def mock_stream():
delta = OpenAIChoiceDelta(content="", tool_calls=None)
choices = [OpenAIChoice(delta=delta, finish_reason="stop", index=0)]
choices = [OpenAIChoiceChunk(delta=delta, finish_reason="stop", index=0)]
mock_chunk = OpenAIChatCompletionChunk(
id="chunk-1",
created=1,
@ -225,7 +191,7 @@ async def test_tool_call_delta_streaming_arguments_dict():
model="foo",
object="chat.completion.chunk",
choices=[
OpenAIChoice(
OpenAIChoiceChunk(
delta=OpenAIChoiceDelta(
content="",
tool_calls=[
@ -250,7 +216,7 @@ async def test_tool_call_delta_streaming_arguments_dict():
model="foo",
object="chat.completion.chunk",
choices=[
OpenAIChoice(
OpenAIChoiceChunk(
delta=OpenAIChoiceDelta(
content="",
tool_calls=[
@ -275,7 +241,9 @@ async def test_tool_call_delta_streaming_arguments_dict():
model="foo",
object="chat.completion.chunk",
choices=[
OpenAIChoice(delta=OpenAIChoiceDelta(content="", tool_calls=None), finish_reason="tool_calls", index=0)
OpenAIChoiceChunk(
delta=OpenAIChoiceDelta(content="", tool_calls=None), finish_reason="tool_calls", index=0
)
],
)
for chunk in [mock_chunk_1, mock_chunk_2, mock_chunk_3]:
@ -299,7 +267,7 @@ async def test_multiple_tool_calls():
model="foo",
object="chat.completion.chunk",
choices=[
OpenAIChoice(
OpenAIChoiceChunk(
delta=OpenAIChoiceDelta(
content="",
tool_calls=[
@ -324,7 +292,7 @@ async def test_multiple_tool_calls():
model="foo",
object="chat.completion.chunk",
choices=[
OpenAIChoice(
OpenAIChoiceChunk(
delta=OpenAIChoiceDelta(
content="",
tool_calls=[
@ -349,7 +317,9 @@ async def test_multiple_tool_calls():
model="foo",
object="chat.completion.chunk",
choices=[
OpenAIChoice(delta=OpenAIChoiceDelta(content="", tool_calls=None), finish_reason="tool_calls", index=0)
OpenAIChoiceChunk(
delta=OpenAIChoiceDelta(content="", tool_calls=None), finish_reason="tool_calls", index=0
)
],
)
for chunk in [mock_chunk_1, mock_chunk_2, mock_chunk_3]:
@ -393,59 +363,6 @@ async def test_process_vllm_chat_completion_stream_response_no_choices():
assert chunks[0].event.event_type.value == "start"
@pytest.mark.allow_network
def test_chat_completion_doesnt_block_event_loop(caplog):
loop = asyncio.new_event_loop()
loop.set_debug(True)
caplog.set_level(logging.WARNING)
# Log when event loop is blocked for more than 200ms
loop.slow_callback_duration = 0.5
# Sleep for 500ms in our delayed http response
sleep_time = 0.5
mock_model = Model(identifier="mock-model", provider_resource_id="mock-model", provider_id="vllm-inference")
mock_response = {
"id": "chatcmpl-abc123",
"object": "chat.completion",
"created": 1,
"modle": "mock-model",
"choices": [
{
"message": {"content": ""},
"logprobs": None,
"finish_reason": "stop",
"index": 0,
}
],
}
async def do_chat_completion():
await inference_adapter.chat_completion(
"mock-model",
[],
stream=False,
tools=None,
tool_config=ToolConfig(tool_choice=ToolChoice.auto),
)
with MockInferenceAdapterWithSleep(sleep_time, mock_response) as inference_adapter:
inference_adapter.model_store = AsyncMock()
inference_adapter.model_store.get_model.return_value = mock_model
loop.run_until_complete(inference_adapter.initialize())
# Clear the logs so far and run the actual chat completion we care about
caplog.clear()
loop.run_until_complete(do_chat_completion())
# Ensure we don't have any asyncio warnings in the captured log
# records from our chat completion call. A message gets logged
# here any time we exceed the slow_callback_duration configured
# above.
asyncio_warnings = [record.message for record in caplog.records if record.name == "asyncio"]
assert not asyncio_warnings
async def test_get_params_empty_tools(vllm_inference_adapter):
request = ChatCompletionRequest(
tools=[],
@ -641,9 +558,7 @@ async def test_health_status_success(vllm_inference_adapter):
This test verifies that the health method returns a HealthResponse with status OK, only
when the connection to the vLLM server is successful.
"""
# Set vllm_inference_adapter.client to None to ensure _create_client is called
vllm_inference_adapter.client = None
with patch.object(vllm_inference_adapter, "_create_client") as mock_create_client:
with patch.object(VLLMInferenceAdapter, "client", new_callable=PropertyMock) as mock_create_client:
# Create mock client and models
mock_client = MagicMock()
mock_models = MagicMock()
@ -674,8 +589,7 @@ async def test_health_status_failure(vllm_inference_adapter):
This test verifies that the health method returns a HealthResponse with status ERROR
and an appropriate error message when the connection to the vLLM server fails.
"""
vllm_inference_adapter.client = None
with patch.object(vllm_inference_adapter, "_create_client") as mock_create_client:
with patch.object(VLLMInferenceAdapter, "client", new_callable=PropertyMock) as mock_create_client:
# Create mock client and models
mock_client = MagicMock()
mock_models = MagicMock()
@ -697,3 +611,48 @@ async def test_health_status_failure(vllm_inference_adapter):
assert "Health check failed: Connection failed" in health_response["message"]
mock_models.list.assert_called_once()
async def test_openai_chat_completion_is_async(vllm_inference_adapter):
"""
Verify that openai_chat_completion is async and doesn't block the event loop.
To do this we mock the underlying inference with a sleep, start multiple
inference calls in parallel, and ensure the total time taken is less
than the sum of the individual sleep times.
"""
sleep_time = 0.5
async def mock_create(*args, **kwargs):
await asyncio.sleep(sleep_time)
return OpenAIChatCompletion(
id="chatcmpl-abc123",
created=1,
model="mock-model",
choices=[
OpenAIChoice(
message=OpenAIAssistantMessageParam(
content="nothing interesting",
),
finish_reason="stop",
index=0,
)
],
)
async def do_inference():
await vllm_inference_adapter.openai_chat_completion(
"mock-model", messages=["one fish", "two fish"], stream=False
)
with patch.object(VLLMInferenceAdapter, "client", new_callable=PropertyMock) as mock_create_client:
mock_client = MagicMock()
mock_client.chat.completions.create = AsyncMock(side_effect=mock_create)
mock_create_client.return_value = mock_client
start_time = time.time()
await asyncio.gather(do_inference(), do_inference(), do_inference(), do_inference())
total_time = time.time() - start_time
assert mock_create_client.call_count == 4 # no cheating
assert total_time < (sleep_time * 2), f"Total time taken: {total_time}s exceeded expected max"

View file

@ -52,14 +52,19 @@ class TestNVIDIAEvalImpl(unittest.TestCase):
self.evaluator_post_patcher = patch(
"llama_stack.providers.remote.eval.nvidia.eval.NVIDIAEvalImpl._evaluator_post"
)
self.evaluator_delete_patcher = patch(
"llama_stack.providers.remote.eval.nvidia.eval.NVIDIAEvalImpl._evaluator_delete"
)
self.mock_evaluator_get = self.evaluator_get_patcher.start()
self.mock_evaluator_post = self.evaluator_post_patcher.start()
self.mock_evaluator_delete = self.evaluator_delete_patcher.start()
def tearDown(self):
"""Clean up after each test."""
self.evaluator_get_patcher.stop()
self.evaluator_post_patcher.stop()
self.evaluator_delete_patcher.stop()
def _assert_request_body(self, expected_json):
"""Helper method to verify request body in Evaluator POST request is correct"""
@ -115,6 +120,13 @@ class TestNVIDIAEvalImpl(unittest.TestCase):
self.mock_evaluator_post.assert_called_once()
self._assert_request_body({"namespace": benchmark.provider_id, "name": benchmark.identifier, **eval_config})
def test_unregister_benchmark(self):
# Unregister the benchmark
self.run_async(self.eval_impl.unregister_benchmark(benchmark_id=MOCK_BENCHMARK_ID))
# Verify the Evaluator API was called correctly
self.mock_evaluator_delete.assert_called_once_with(f"/v1/evaluation/configs/nvidia/{MOCK_BENCHMARK_ID}")
def test_run_eval(self):
benchmark_config = BenchmarkConfig(
eval_candidate=ModelCandidate(

View file

@ -0,0 +1,53 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from llama_stack.providers.remote.inference.bedrock.bedrock import (
_get_region_prefix,
_to_inference_profile_id,
)
def test_region_prefixes():
assert _get_region_prefix("us-east-1") == "us."
assert _get_region_prefix("eu-west-1") == "eu."
assert _get_region_prefix("ap-south-1") == "ap."
assert _get_region_prefix("ca-central-1") == "us."
# Test case insensitive
assert _get_region_prefix("US-EAST-1") == "us."
assert _get_region_prefix("EU-WEST-1") == "eu."
assert _get_region_prefix("Ap-South-1") == "ap."
# Test None region
assert _get_region_prefix(None) == "us."
def test_model_id_conversion():
# Basic conversion
assert (
_to_inference_profile_id("meta.llama3-1-70b-instruct-v1:0", "us-east-1") == "us.meta.llama3-1-70b-instruct-v1:0"
)
# Already has prefix
assert (
_to_inference_profile_id("us.meta.llama3-1-70b-instruct-v1:0", "us-east-1")
== "us.meta.llama3-1-70b-instruct-v1:0"
)
# ARN should be returned unchanged
arn = "arn:aws:bedrock:us-east-1:123456789012:inference-profile/us.meta.llama3-1-70b-instruct-v1:0"
assert _to_inference_profile_id(arn, "us-east-1") == arn
# ARN should be returned unchanged even without region
assert _to_inference_profile_id(arn) == arn
# Optional region parameter defaults to us-east-1
assert _to_inference_profile_id("meta.llama3-1-70b-instruct-v1:0") == "us.meta.llama3-1-70b-instruct-v1:0"
# Different regions work with optional parameter
assert (
_to_inference_profile_id("meta.llama3-1-70b-instruct-v1:0", "eu-west-1") == "eu.meta.llama3-1-70b-instruct-v1:0"
)

View file

@ -0,0 +1,248 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from llama_stack.providers.utils.memory.vector_store import RERANKER_TYPE_RRF, RERANKER_TYPE_WEIGHTED
from llama_stack.providers.utils.vector_io.vector_utils import WeightedInMemoryAggregator
class TestNormalizeScores:
"""Test cases for score normalization."""
def test_normalize_scores_basic(self):
"""Test basic score normalization."""
scores = {"doc1": 10.0, "doc2": 5.0, "doc3": 0.0}
normalized = WeightedInMemoryAggregator._normalize_scores(scores)
assert normalized["doc1"] == 1.0 # Max score
assert normalized["doc3"] == 0.0 # Min score
assert normalized["doc2"] == 0.5 # Middle score
assert all(0 <= score <= 1 for score in normalized.values())
def test_normalize_scores_identical(self):
"""Test normalization when all scores are identical."""
scores = {"doc1": 5.0, "doc2": 5.0, "doc3": 5.0}
normalized = WeightedInMemoryAggregator._normalize_scores(scores)
# All scores should be 1.0 when identical
assert all(score == 1.0 for score in normalized.values())
def test_normalize_scores_empty(self):
"""Test normalization with empty scores."""
scores = {}
normalized = WeightedInMemoryAggregator._normalize_scores(scores)
assert normalized == {}
def test_normalize_scores_single(self):
"""Test normalization with single score."""
scores = {"doc1": 7.5}
normalized = WeightedInMemoryAggregator._normalize_scores(scores)
assert normalized["doc1"] == 1.0
class TestWeightedRerank:
"""Test cases for weighted reranking."""
def test_weighted_rerank_basic(self):
"""Test basic weighted reranking."""
vector_scores = {"doc1": 0.9, "doc2": 0.7, "doc3": 0.5}
keyword_scores = {"doc1": 0.6, "doc2": 0.8, "doc4": 0.9}
combined = WeightedInMemoryAggregator.weighted_rerank(vector_scores, keyword_scores, alpha=0.5)
# Should include all documents
expected_docs = {"doc1", "doc2", "doc3", "doc4"}
assert set(combined.keys()) == expected_docs
# All scores should be between 0 and 1
assert all(0 <= score <= 1 for score in combined.values())
# doc1 appears in both searches, should have higher combined score
assert combined["doc1"] > 0
def test_weighted_rerank_alpha_zero(self):
"""Test weighted reranking with alpha=0 (keyword only)."""
vector_scores = {"doc1": 0.9, "doc2": 0.7, "doc3": 0.5} # All docs present in vector
keyword_scores = {"doc1": 0.1, "doc2": 0.3, "doc3": 0.9} # All docs present in keyword
combined = WeightedInMemoryAggregator.weighted_rerank(vector_scores, keyword_scores, alpha=0.0)
# Alpha=0 means vector scores are ignored, keyword scores dominate
# doc3 should score highest since it has highest keyword score
assert combined["doc3"] > combined["doc2"] > combined["doc1"]
def test_weighted_rerank_alpha_one(self):
"""Test weighted reranking with alpha=1 (vector only)."""
vector_scores = {"doc1": 0.9, "doc2": 0.7, "doc3": 0.5} # All docs present in vector
keyword_scores = {"doc1": 0.1, "doc2": 0.3, "doc3": 0.9} # All docs present in keyword
combined = WeightedInMemoryAggregator.weighted_rerank(vector_scores, keyword_scores, alpha=1.0)
# Alpha=1 means keyword scores are ignored, vector scores dominate
# doc1 should score highest since it has highest vector score
assert combined["doc1"] > combined["doc2"] > combined["doc3"]
def test_weighted_rerank_no_overlap(self):
"""Test weighted reranking with no overlapping documents."""
vector_scores = {"doc1": 0.9, "doc2": 0.7}
keyword_scores = {"doc3": 0.8, "doc4": 0.6}
combined = WeightedInMemoryAggregator.weighted_rerank(vector_scores, keyword_scores, alpha=0.5)
assert len(combined) == 4
# With min-max normalization, lowest scoring docs in each group get 0.0
# but highest scoring docs should get positive scores
assert all(score >= 0 for score in combined.values())
assert combined["doc1"] > 0 # highest vector score
assert combined["doc3"] > 0 # highest keyword score
class TestRRFRerank:
"""Test cases for RRF (Reciprocal Rank Fusion) reranking."""
def test_rrf_rerank_basic(self):
"""Test basic RRF reranking."""
vector_scores = {"doc1": 0.9, "doc2": 0.7, "doc3": 0.5}
keyword_scores = {"doc1": 0.6, "doc2": 0.8, "doc4": 0.9}
combined = WeightedInMemoryAggregator.rrf_rerank(vector_scores, keyword_scores, impact_factor=60.0)
# Should include all documents
expected_docs = {"doc1", "doc2", "doc3", "doc4"}
assert set(combined.keys()) == expected_docs
# All scores should be positive
assert all(score > 0 for score in combined.values())
# Documents appearing in both searches should have higher scores
# doc1 and doc2 appear in both, doc3 and doc4 appear in only one
assert combined["doc1"] > combined["doc3"]
assert combined["doc2"] > combined["doc4"]
def test_rrf_rerank_rank_calculation(self):
"""Test that RRF correctly calculates ranks."""
# Create clear ranking order
vector_scores = {"doc1": 1.0, "doc2": 0.8, "doc3": 0.6} # Ranks: 1, 2, 3
keyword_scores = {"doc1": 0.5, "doc2": 1.0, "doc3": 0.7} # Ranks: 3, 1, 2
combined = WeightedInMemoryAggregator.rrf_rerank(vector_scores, keyword_scores, impact_factor=60.0)
# doc1: rank 1 in vector, rank 3 in keyword
# doc2: rank 2 in vector, rank 1 in keyword
# doc3: rank 3 in vector, rank 2 in keyword
# doc2 should have the highest combined score (ranks 2+1=3)
# followed by doc1 (ranks 1+3=4) and doc3 (ranks 3+2=5)
# Remember: lower rank sum = higher RRF score
assert combined["doc2"] > combined["doc1"] > combined["doc3"]
def test_rrf_rerank_impact_factor(self):
"""Test that impact factor affects RRF scores."""
vector_scores = {"doc1": 0.9, "doc2": 0.7}
keyword_scores = {"doc1": 0.8, "doc2": 0.6}
combined_low = WeightedInMemoryAggregator.rrf_rerank(vector_scores, keyword_scores, impact_factor=10.0)
combined_high = WeightedInMemoryAggregator.rrf_rerank(vector_scores, keyword_scores, impact_factor=100.0)
# Higher impact factor should generally result in lower scores
# (because 1/(k+r) decreases as k increases)
assert combined_low["doc1"] > combined_high["doc1"]
assert combined_low["doc2"] > combined_high["doc2"]
def test_rrf_rerank_missing_documents(self):
"""Test RRF handling of documents missing from one search."""
vector_scores = {"doc1": 0.9, "doc2": 0.7}
keyword_scores = {"doc1": 0.8, "doc3": 0.6}
combined = WeightedInMemoryAggregator.rrf_rerank(vector_scores, keyword_scores, impact_factor=60.0)
# Should include all documents
assert len(combined) == 3
# doc1 appears in both searches, should have highest score
assert combined["doc1"] > combined["doc2"]
assert combined["doc1"] > combined["doc3"]
class TestCombineSearchResults:
"""Test cases for the main combine_search_results function."""
def test_combine_search_results_rrf_default(self):
"""Test combining with RRF as default."""
vector_scores = {"doc1": 0.9, "doc2": 0.7}
keyword_scores = {"doc1": 0.6, "doc3": 0.8}
combined = WeightedInMemoryAggregator.combine_search_results(vector_scores, keyword_scores)
# Should default to RRF
assert len(combined) == 3
assert all(score > 0 for score in combined.values())
def test_combine_search_results_rrf_explicit(self):
"""Test combining with explicit RRF."""
vector_scores = {"doc1": 0.9, "doc2": 0.7}
keyword_scores = {"doc1": 0.6, "doc3": 0.8}
combined = WeightedInMemoryAggregator.combine_search_results(
vector_scores, keyword_scores, reranker_type=RERANKER_TYPE_RRF, reranker_params={"impact_factor": 50.0}
)
assert len(combined) == 3
assert all(score > 0 for score in combined.values())
def test_combine_search_results_weighted(self):
"""Test combining with weighted reranking."""
vector_scores = {"doc1": 0.9, "doc2": 0.7}
keyword_scores = {"doc1": 0.6, "doc3": 0.8}
combined = WeightedInMemoryAggregator.combine_search_results(
vector_scores, keyword_scores, reranker_type=RERANKER_TYPE_WEIGHTED, reranker_params={"alpha": 0.3}
)
assert len(combined) == 3
assert all(0 <= score <= 1 for score in combined.values())
def test_combine_search_results_unknown_type(self):
"""Test combining with unknown reranker type defaults to RRF."""
vector_scores = {"doc1": 0.9}
keyword_scores = {"doc2": 0.8}
combined = WeightedInMemoryAggregator.combine_search_results(
vector_scores, keyword_scores, reranker_type="unknown_type"
)
# Should fall back to RRF
assert len(combined) == 2
assert all(score > 0 for score in combined.values())
def test_combine_search_results_empty_params(self):
"""Test combining with empty parameters."""
vector_scores = {"doc1": 0.9}
keyword_scores = {"doc2": 0.8}
combined = WeightedInMemoryAggregator.combine_search_results(vector_scores, keyword_scores, reranker_params={})
# Should use default parameters
assert len(combined) == 2
assert all(score > 0 for score in combined.values())
def test_combine_search_results_empty_scores(self):
"""Test combining with empty score dictionaries."""
# Test with empty vector scores
combined = WeightedInMemoryAggregator.combine_search_results({}, {"doc1": 0.8})
assert len(combined) == 1
assert combined["doc1"] > 0
# Test with empty keyword scores
combined = WeightedInMemoryAggregator.combine_search_results({"doc1": 0.9}, {})
assert len(combined) == 1
assert combined["doc1"] > 0
# Test with both empty
combined = WeightedInMemoryAggregator.combine_search_results({}, {})
assert len(combined) == 0

View file

@ -178,3 +178,41 @@ def test_content_from_data_and_mime_type_both_encodings_fail():
# Should raise an exception instead of returning empty string
with pytest.raises(UnicodeDecodeError):
content_from_data_and_mime_type(data, mime_type)
async def test_memory_tool_error_handling():
"""Test that memory tool handles various failures gracefully without crashing."""
from llama_stack.providers.inline.tool_runtime.rag.config import RagToolRuntimeConfig
from llama_stack.providers.inline.tool_runtime.rag.memory import MemoryToolRuntimeImpl
config = RagToolRuntimeConfig()
memory_tool = MemoryToolRuntimeImpl(
config=config,
vector_io_api=AsyncMock(),
inference_api=AsyncMock(),
files_api=AsyncMock(),
)
docs = [
RAGDocument(document_id="good_doc", content="Good content", metadata={}),
RAGDocument(document_id="bad_url_doc", content=URL(uri="https://bad.url"), metadata={}),
RAGDocument(document_id="another_good_doc", content="Another good content", metadata={}),
]
mock_file1 = MagicMock()
mock_file1.id = "file_good1"
mock_file2 = MagicMock()
mock_file2.id = "file_good2"
memory_tool.files_api.openai_upload_file.side_effect = [mock_file1, mock_file2]
with patch("httpx.AsyncClient") as mock_client:
mock_instance = AsyncMock()
mock_instance.get.side_effect = Exception("Bad URL")
mock_client.return_value.__aenter__.return_value = mock_instance
# won't raise exception despite one document failing
await memory_tool.insert(docs, "vector_store_123")
# processed 2 documents successfully, skipped 1
assert memory_tool.files_api.openai_upload_file.call_count == 2
assert memory_tool.vector_io_api.openai_attach_file_to_vector_store.call_count == 2

View file

@ -5,6 +5,7 @@
# the root directory of this source tree.
import random
from unittest.mock import AsyncMock, MagicMock, patch
import numpy as np
import pytest
@ -12,7 +13,7 @@ from chromadb import PersistentClient
from pymilvus import MilvusClient, connections
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.apis.vector_io import Chunk, ChunkMetadata
from llama_stack.apis.vector_io import Chunk, ChunkMetadata, QueryChunksResponse
from llama_stack.providers.inline.vector_io.chroma.config import ChromaVectorIOConfig
from llama_stack.providers.inline.vector_io.faiss.config import FaissVectorIOConfig
from llama_stack.providers.inline.vector_io.faiss.faiss import FaissIndex, FaissVectorIOAdapter
@ -22,6 +23,8 @@ from llama_stack.providers.inline.vector_io.sqlite_vec import SQLiteVectorIOConf
from llama_stack.providers.inline.vector_io.sqlite_vec.sqlite_vec import SQLiteVecIndex, SQLiteVecVectorIOAdapter
from llama_stack.providers.remote.vector_io.chroma.chroma import ChromaIndex, ChromaVectorIOAdapter, maybe_await
from llama_stack.providers.remote.vector_io.milvus.milvus import MilvusIndex, MilvusVectorIOAdapter
from llama_stack.providers.remote.vector_io.pgvector.config import PGVectorVectorIOConfig
from llama_stack.providers.remote.vector_io.pgvector.pgvector import PGVectorIndex, PGVectorVectorIOAdapter
from llama_stack.providers.remote.vector_io.qdrant.qdrant import QdrantVectorIOAdapter
EMBEDDING_DIMENSION = 384
@ -29,7 +32,7 @@ COLLECTION_PREFIX = "test_collection"
MILVUS_ALIAS = "test_milvus"
@pytest.fixture(params=["milvus", "sqlite_vec", "faiss", "chroma"])
@pytest.fixture(params=["milvus", "sqlite_vec", "faiss", "chroma", "pgvector"])
def vector_provider(request):
return request.param
@ -333,15 +336,127 @@ async def qdrant_vec_index(qdrant_vec_db_path, embedding_dimension):
await index.delete()
@pytest.fixture
def mock_psycopg2_connection():
connection = MagicMock()
cursor = MagicMock()
cursor.__enter__ = MagicMock(return_value=cursor)
cursor.__exit__ = MagicMock()
connection.cursor.return_value = cursor
return connection, cursor
@pytest.fixture
async def pgvector_vec_index(embedding_dimension, mock_psycopg2_connection):
connection, cursor = mock_psycopg2_connection
vector_db = VectorDB(
identifier="test-vector-db",
embedding_model="test-model",
embedding_dimension=embedding_dimension,
provider_id="pgvector",
provider_resource_id="pgvector:test-vector-db",
)
with patch("llama_stack.providers.remote.vector_io.pgvector.pgvector.psycopg2"):
with patch("llama_stack.providers.remote.vector_io.pgvector.pgvector.execute_values"):
index = PGVectorIndex(vector_db, embedding_dimension, connection, distance_metric="COSINE")
index._test_chunks = []
original_add_chunks = index.add_chunks
async def mock_add_chunks(chunks, embeddings):
index._test_chunks = list(chunks)
await original_add_chunks(chunks, embeddings)
index.add_chunks = mock_add_chunks
async def mock_query_vector(embedding, k, score_threshold):
chunks = index._test_chunks[:k] if hasattr(index, "_test_chunks") else []
scores = [1.0] * len(chunks)
return QueryChunksResponse(chunks=chunks, scores=scores)
index.query_vector = mock_query_vector
yield index
@pytest.fixture
async def pgvector_vec_adapter(mock_inference_api, embedding_dimension):
config = PGVectorVectorIOConfig(
host="localhost",
port=5432,
db="test_db",
user="test_user",
password="test_password",
kvstore=SqliteKVStoreConfig(),
)
adapter = PGVectorVectorIOAdapter(config, mock_inference_api, None)
with patch("llama_stack.providers.remote.vector_io.pgvector.pgvector.psycopg2.connect") as mock_connect:
mock_conn = MagicMock()
mock_cursor = MagicMock()
mock_cursor.__enter__ = MagicMock(return_value=mock_cursor)
mock_cursor.__exit__ = MagicMock()
mock_conn.cursor.return_value = mock_cursor
mock_conn.autocommit = True
mock_connect.return_value = mock_conn
with patch(
"llama_stack.providers.remote.vector_io.pgvector.pgvector.check_extension_version"
) as mock_check_version:
mock_check_version.return_value = "0.5.1"
with patch("llama_stack.providers.utils.kvstore.kvstore_impl") as mock_kvstore_impl:
mock_kvstore = AsyncMock()
mock_kvstore_impl.return_value = mock_kvstore
with patch.object(adapter, "initialize_openai_vector_stores", new_callable=AsyncMock):
with patch("llama_stack.providers.remote.vector_io.pgvector.pgvector.upsert_models"):
await adapter.initialize()
adapter.conn = mock_conn
async def mock_insert_chunks(vector_db_id, chunks, ttl_seconds=None):
index = await adapter._get_and_cache_vector_db_index(vector_db_id)
if not index:
raise ValueError(f"Vector DB {vector_db_id} not found")
await index.insert_chunks(chunks)
adapter.insert_chunks = mock_insert_chunks
async def mock_query_chunks(vector_db_id, query, params=None):
index = await adapter._get_and_cache_vector_db_index(vector_db_id)
if not index:
raise ValueError(f"Vector DB {vector_db_id} not found")
return await index.query_chunks(query, params)
adapter.query_chunks = mock_query_chunks
test_vector_db = VectorDB(
identifier=f"pgvector_test_collection_{random.randint(1, 1_000_000)}",
provider_id="test_provider",
embedding_model="test_model",
embedding_dimension=embedding_dimension,
)
await adapter.register_vector_db(test_vector_db)
adapter.test_collection_id = test_vector_db.identifier
yield adapter
await adapter.shutdown()
@pytest.fixture
def vector_io_adapter(vector_provider, request):
"""Returns the appropriate vector IO adapter based on the provider parameter."""
vector_provider_dict = {
"milvus": "milvus_vec_adapter",
"faiss": "faiss_vec_adapter",
"sqlite_vec": "sqlite_vec_adapter",
"chroma": "chroma_vec_adapter",
"qdrant": "qdrant_vec_adapter",
"pgvector": "pgvector_vec_adapter",
}
return request.getfixturevalue(vector_provider_dict[vector_provider])

View file

@ -0,0 +1,138 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
from unittest.mock import patch
import pytest
from llama_stack.apis.vector_dbs import VectorDB
from llama_stack.providers.remote.vector_io.pgvector.pgvector import PGVectorIndex
PGVECTOR_PROVIDER = "pgvector"
@pytest.fixture(scope="session")
def loop():
return asyncio.new_event_loop()
@pytest.fixture
def embedding_dimension():
"""Default embedding dimension for tests."""
return 384
@pytest.fixture
async def pgvector_index(embedding_dimension, mock_psycopg2_connection):
"""Create a PGVectorIndex instance with mocked database connection."""
connection, cursor = mock_psycopg2_connection
vector_db = VectorDB(
identifier="test-vector-db",
embedding_model="test-model",
embedding_dimension=embedding_dimension,
provider_id=PGVECTOR_PROVIDER,
provider_resource_id=f"{PGVECTOR_PROVIDER}:test-vector-db",
)
with patch("llama_stack.providers.remote.vector_io.pgvector.pgvector.psycopg2"):
# Use explicit COSINE distance metric for consistent testing
index = PGVectorIndex(vector_db, embedding_dimension, connection, distance_metric="COSINE")
return index, cursor
class TestPGVectorIndex:
def test_distance_metric_validation(self, embedding_dimension, mock_psycopg2_connection):
connection, cursor = mock_psycopg2_connection
vector_db = VectorDB(
identifier="test-vector-db",
embedding_model="test-model",
embedding_dimension=embedding_dimension,
provider_id=PGVECTOR_PROVIDER,
provider_resource_id=f"{PGVECTOR_PROVIDER}:test-vector-db",
)
with patch("llama_stack.providers.remote.vector_io.pgvector.pgvector.psycopg2"):
index = PGVectorIndex(vector_db, embedding_dimension, connection, distance_metric="L2")
assert index.distance_metric == "L2"
with pytest.raises(ValueError, match="Distance metric 'INVALID' is not supported"):
PGVectorIndex(vector_db, embedding_dimension, connection, distance_metric="INVALID")
def test_get_pgvector_search_function(self, pgvector_index):
index, cursor = pgvector_index
supported_metrics = index.PGVECTOR_DISTANCE_METRIC_TO_SEARCH_FUNCTION
for metric, function in supported_metrics.items():
index.distance_metric = metric
assert index.get_pgvector_search_function() == function
def test_check_distance_metric_availability(self, pgvector_index):
index, cursor = pgvector_index
supported_metrics = index.PGVECTOR_DISTANCE_METRIC_TO_SEARCH_FUNCTION
for metric in supported_metrics:
index.check_distance_metric_availability(metric)
with pytest.raises(ValueError, match="Distance metric 'INVALID' is not supported"):
index.check_distance_metric_availability("INVALID")
def test_constructor_invalid_distance_metric(self, embedding_dimension, mock_psycopg2_connection):
connection, cursor = mock_psycopg2_connection
vector_db = VectorDB(
identifier="test-vector-db",
embedding_model="test-model",
embedding_dimension=embedding_dimension,
provider_id=PGVECTOR_PROVIDER,
provider_resource_id=f"{PGVECTOR_PROVIDER}:test-vector-db",
)
with patch("llama_stack.providers.remote.vector_io.pgvector.pgvector.psycopg2"):
with pytest.raises(ValueError, match="Distance metric 'INVALID_METRIC' is not supported by PGVector"):
PGVectorIndex(vector_db, embedding_dimension, connection, distance_metric="INVALID_METRIC")
with pytest.raises(ValueError, match="Supported metrics are:"):
PGVectorIndex(vector_db, embedding_dimension, connection, distance_metric="UNKNOWN")
try:
index = PGVectorIndex(vector_db, embedding_dimension, connection, distance_metric="COSINE")
assert index.distance_metric == "COSINE"
except ValueError:
pytest.fail("Valid distance metric 'COSINE' should not raise ValueError")
def test_constructor_all_supported_distance_metrics(self, embedding_dimension, mock_psycopg2_connection):
connection, cursor = mock_psycopg2_connection
vector_db = VectorDB(
identifier="test-vector-db",
embedding_model="test-model",
embedding_dimension=embedding_dimension,
provider_id=PGVECTOR_PROVIDER,
provider_resource_id=f"{PGVECTOR_PROVIDER}:test-vector-db",
)
supported_metrics = ["L2", "L1", "COSINE", "INNER_PRODUCT", "HAMMING", "JACCARD"]
with patch("llama_stack.providers.remote.vector_io.pgvector.pgvector.psycopg2"):
for metric in supported_metrics:
try:
index = PGVectorIndex(vector_db, embedding_dimension, connection, distance_metric=metric)
assert index.distance_metric == metric
expected_operators = {
"L2": "<->",
"L1": "<+>",
"COSINE": "<=>",
"INNER_PRODUCT": "<#>",
"HAMMING": "<~>",
"JACCARD": "<%>",
}
assert index.get_pgvector_search_function() == expected_operators[metric]
except Exception as e:
pytest.fail(f"Valid distance metric '{metric}' should not raise exception: {e}")

View file

@ -11,7 +11,8 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from llama_stack.apis.inference import EmbeddingsResponse, Inference
from llama_stack.apis.inference import Inference
from llama_stack.apis.inference.inference import OpenAIEmbeddingData, OpenAIEmbeddingsResponse, OpenAIEmbeddingUsage
from llama_stack.apis.vector_io import (
QueryChunksResponse,
VectorDB,
@ -53,7 +54,9 @@ def mock_vector_db(vector_db_id) -> MagicMock:
mock_vector_db.identifier = vector_db_id
mock_vector_db.embedding_dimension = 384
mock_vector_db.model_dump_json.return_value = (
'{"identifier": "' + vector_db_id + '", "embedding_model": "embedding_model", "embedding_dimension": 384}'
'{"identifier": "'
+ vector_db_id
+ '", "provider_id": "qdrant", "embedding_model": "embedding_model", "embedding_dimension": 384}'
)
return mock_vector_db
@ -68,7 +71,13 @@ def mock_vector_db_store(mock_vector_db) -> MagicMock:
@pytest.fixture
def mock_api_service(sample_embeddings):
mock_api_service = MagicMock(spec=Inference)
mock_api_service.embeddings = AsyncMock(return_value=EmbeddingsResponse(embeddings=sample_embeddings))
mock_api_service.openai_embeddings = AsyncMock(
return_value=OpenAIEmbeddingsResponse(
model="mock-embedding-model",
data=[OpenAIEmbeddingData(embedding=sample, index=i) for i, sample in enumerate(sample_embeddings)],
usage=OpenAIEmbeddingUsage(prompt_tokens=10, total_tokens=10),
)
)
return mock_api_service

View file

@ -26,9 +26,9 @@ def test_generate_chunk_id():
chunk_ids = sorted([chunk.chunk_id for chunk in chunks])
assert chunk_ids == [
"177a1368-f6a8-0c50-6e92-18677f2c3de3",
"bc744db3-1b25-0a9c-cdff-b6ba3df73c36",
"f68df25d-d9aa-ab4d-5684-64a233add20d",
"31d1f9a3-c8d2-66e7-3c37-af2acd329778",
"d07dade7-29c0-cda7-df29-0249a1dcbc3e",
"d14f75a1-5855-7f72-2c78-d9fc4275a346",
]
@ -36,14 +36,14 @@ def test_generate_chunk_id_with_window():
chunk = Chunk(content="test", metadata={"document_id": "doc-1"})
chunk_id1 = generate_chunk_id("doc-1", chunk, chunk_window="0-1")
chunk_id2 = generate_chunk_id("doc-1", chunk, chunk_window="1-2")
assert chunk_id1 == "149018fe-d0eb-0f8d-5f7f-726bdd2aeedb"
assert chunk_id2 == "4562c1ee-9971-1f3b-51a6-7d05e5211154"
assert chunk_id1 == "8630321a-d9cb-2bb6-cd28-ebf68dafd866"
assert chunk_id2 == "13a1c09a-cbda-b61a-2d1a-7baa90888685"
def test_chunk_id():
# Test with existing chunk ID
chunk_with_id = Chunk(content="test", metadata={"document_id": "existing-id"})
assert chunk_with_id.chunk_id == "84ededcc-b80b-a83e-1a20-ca6515a11350"
assert chunk_with_id.chunk_id == "11704f92-42b6-61df-bf85-6473e7708fbd"
# Test with document ID in metadata
chunk_with_doc_id = Chunk(content="test", metadata={"document_id": "doc-1"})

View file

@ -19,12 +19,16 @@ from llama_stack.providers.inline.tool_runtime.rag.memory import MemoryToolRunti
class TestRagQuery:
async def test_query_raises_on_empty_vector_db_ids(self):
rag_tool = MemoryToolRuntimeImpl(config=MagicMock(), vector_io_api=MagicMock(), inference_api=MagicMock())
rag_tool = MemoryToolRuntimeImpl(
config=MagicMock(), vector_io_api=MagicMock(), inference_api=MagicMock(), files_api=MagicMock()
)
with pytest.raises(ValueError):
await rag_tool.query(content=MagicMock(), vector_db_ids=[])
async def test_query_chunk_metadata_handling(self):
rag_tool = MemoryToolRuntimeImpl(config=MagicMock(), vector_io_api=MagicMock(), inference_api=MagicMock())
rag_tool = MemoryToolRuntimeImpl(
config=MagicMock(), vector_io_api=MagicMock(), inference_api=MagicMock(), files_api=MagicMock()
)
content = "test query content"
vector_db_ids = ["db1"]
@ -77,3 +81,58 @@ class TestRagQuery:
# Test that invalid mode raises an error
with pytest.raises(ValueError):
RAGQueryConfig(mode="wrong_mode")
async def test_query_adds_vector_db_id_to_chunk_metadata(self):
rag_tool = MemoryToolRuntimeImpl(
config=MagicMock(),
vector_io_api=MagicMock(),
inference_api=MagicMock(),
files_api=MagicMock(),
)
vector_db_ids = ["db1", "db2"]
# Fake chunks from each DB
chunk_metadata1 = ChunkMetadata(
document_id="doc1",
chunk_id="chunk1",
source="test_source1",
metadata_token_count=5,
)
chunk1 = Chunk(
content="chunk from db1",
metadata={"vector_db_id": "db1", "document_id": "doc1"},
stored_chunk_id="c1",
chunk_metadata=chunk_metadata1,
)
chunk_metadata2 = ChunkMetadata(
document_id="doc2",
chunk_id="chunk2",
source="test_source2",
metadata_token_count=5,
)
chunk2 = Chunk(
content="chunk from db2",
metadata={"vector_db_id": "db2", "document_id": "doc2"},
stored_chunk_id="c2",
chunk_metadata=chunk_metadata2,
)
rag_tool.vector_io_api.query_chunks = AsyncMock(
side_effect=[
QueryChunksResponse(chunks=[chunk1], scores=[0.9]),
QueryChunksResponse(chunks=[chunk2], scores=[0.8]),
]
)
result = await rag_tool.query(content="test", vector_db_ids=vector_db_ids)
returned_chunks = result.metadata["chunks"]
returned_scores = result.metadata["scores"]
returned_doc_ids = result.metadata["document_ids"]
returned_vector_db_ids = result.metadata["vector_db_ids"]
assert returned_chunks == ["chunk from db1", "chunk from db2"]
assert returned_scores == (0.9, 0.8)
assert returned_doc_ids == ["doc1", "doc2"]
assert returned_vector_db_ids == ["db1", "db2"]

View file

@ -13,6 +13,7 @@ from unittest.mock import AsyncMock, MagicMock
import numpy as np
import pytest
from llama_stack.apis.inference.inference import OpenAIEmbeddingData
from llama_stack.apis.tools import RAGDocument
from llama_stack.apis.vector_io import Chunk
from llama_stack.providers.utils.memory.vector_store import (
@ -218,11 +219,16 @@ class TestVectorDBWithIndex:
Chunk(content="Test 2", embedding=None, metadata={}),
]
mock_inference_api.embeddings.return_value.embeddings = [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]]
mock_inference_api.openai_embeddings.return_value.data = [
OpenAIEmbeddingData(embedding=[0.1, 0.2, 0.3], index=0),
OpenAIEmbeddingData(embedding=[0.4, 0.5, 0.6], index=1),
]
await vector_db_with_index.insert_chunks(chunks)
mock_inference_api.embeddings.assert_called_once_with("test-model without embeddings", ["Test 1", "Test 2"])
mock_inference_api.openai_embeddings.assert_called_once_with(
"test-model without embeddings", ["Test 1", "Test 2"]
)
mock_index.add_chunks.assert_called_once()
args = mock_index.add_chunks.call_args[0]
assert args[0] == chunks
@ -246,7 +252,7 @@ class TestVectorDBWithIndex:
await vector_db_with_index.insert_chunks(chunks)
mock_inference_api.embeddings.assert_not_called()
mock_inference_api.openai_embeddings.assert_not_called()
mock_index.add_chunks.assert_called_once()
args = mock_index.add_chunks.call_args[0]
assert args[0] == chunks
@ -288,7 +294,7 @@ class TestVectorDBWithIndex:
with pytest.raises(ValueError, match="has dimension 4, expected 3"):
await vector_db_with_index.insert_chunks(chunks_wrong_dim)
mock_inference_api.embeddings.assert_not_called()
mock_inference_api.openai_embeddings.assert_not_called()
mock_index.add_chunks.assert_not_called()
async def test_insert_chunks_with_partially_precomputed_embeddings(self):
@ -308,11 +314,14 @@ class TestVectorDBWithIndex:
Chunk(content="Test 3", embedding=None, metadata={}),
]
mock_inference_api.embeddings.return_value.embeddings = [[0.1, 0.1, 0.1], [0.3, 0.3, 0.3]]
mock_inference_api.openai_embeddings.return_value.data = [
OpenAIEmbeddingData(embedding=[0.1, 0.1, 0.1], index=0),
OpenAIEmbeddingData(embedding=[0.3, 0.3, 0.3], index=1),
]
await vector_db_with_index.insert_chunks(chunks)
mock_inference_api.embeddings.assert_called_once_with(
mock_inference_api.openai_embeddings.assert_called_once_with(
"test-model with partial embeddings", ["Test 1", "Test 3"]
)
mock_index.add_chunks.assert_called_once()

View file

@ -129,7 +129,7 @@ async def test_duplicate_provider_registration(cached_disk_dist_registry):
result = await cached_disk_dist_registry.get("vector_db", "test_vector_db_2")
assert result is not None
assert result.embedding_model == original_vector_db.embedding_model # Original values preserved
assert result.embedding_model == duplicate_vector_db.embedding_model # Original values preserved
async def test_get_all_objects(cached_disk_dist_registry):
@ -174,10 +174,14 @@ async def test_parse_registry_values_error_handling(sqlite_kvstore):
)
await sqlite_kvstore.set(
KEY_FORMAT.format(type="vector_db", identifier="valid_vector_db"), valid_db.model_dump_json()
KEY_FORMAT.format(type="vector_db", identifier="valid_vector_db"),
valid_db.model_dump_json(),
)
await sqlite_kvstore.set(KEY_FORMAT.format(type="vector_db", identifier="corrupted_json"), "{not valid json")
await sqlite_kvstore.set(
KEY_FORMAT.format(type="vector_db", identifier="corrupted_json"),
"{not valid json",
)
await sqlite_kvstore.set(
KEY_FORMAT.format(type="vector_db", identifier="missing_fields"),
@ -212,7 +216,8 @@ async def test_cached_registry_error_handling(sqlite_kvstore):
)
await sqlite_kvstore.set(
KEY_FORMAT.format(type="vector_db", identifier="valid_cached_db"), valid_db.model_dump_json()
KEY_FORMAT.format(type="vector_db", identifier="valid_cached_db"),
valid_db.model_dump_json(),
)
await sqlite_kvstore.set(

View file

@ -774,3 +774,136 @@ def test_has_required_scope_function():
# Test no user (auth disabled)
assert _has_required_scope("test.read", None)
@pytest.fixture
def mock_kubernetes_api_server():
return "https://api.cluster.example.com:6443"
@pytest.fixture
def kubernetes_auth_app(mock_kubernetes_api_server):
app = FastAPI()
auth_config = AuthenticationConfig(
provider_config={
"type": "kubernetes",
"api_server_url": mock_kubernetes_api_server,
"verify_tls": False,
"claims_mapping": {
"username": "roles",
"groups": "roles",
"uid": "uid_attr",
},
},
)
app.add_middleware(AuthenticationMiddleware, auth_config=auth_config, impls={})
@app.get("/test")
def test_endpoint():
return {"message": "Authentication successful"}
return app
@pytest.fixture
def kubernetes_auth_client(kubernetes_auth_app):
return TestClient(kubernetes_auth_app)
def test_missing_auth_header_kubernetes_auth(kubernetes_auth_client):
response = kubernetes_auth_client.get("/test")
assert response.status_code == 401
assert "Authentication required" in response.json()["error"]["message"]
def test_invalid_auth_header_format_kubernetes_auth(kubernetes_auth_client):
response = kubernetes_auth_client.get("/test", headers={"Authorization": "InvalidFormat token123"})
assert response.status_code == 401
assert "Invalid Authorization header format" in response.json()["error"]["message"]
async def mock_kubernetes_selfsubjectreview_success(*args, **kwargs):
return MockResponse(
201,
{
"apiVersion": "authentication.k8s.io/v1",
"kind": "SelfSubjectReview",
"metadata": {"creationTimestamp": "2025-07-15T13:53:56Z"},
"status": {
"userInfo": {
"username": "alice",
"uid": "alice-uid-123",
"groups": ["system:authenticated", "developers", "admins"],
"extra": {"scopes.authorization.openshift.io": ["user:full"]},
}
},
},
)
async def mock_kubernetes_selfsubjectreview_failure(*args, **kwargs):
return MockResponse(401, {"message": "Unauthorized"})
async def mock_kubernetes_selfsubjectreview_http_error(*args, **kwargs):
return MockResponse(500, {"message": "Internal Server Error"})
@patch("httpx.AsyncClient.post", new=mock_kubernetes_selfsubjectreview_success)
def test_valid_kubernetes_auth_authentication(kubernetes_auth_client, valid_token):
response = kubernetes_auth_client.get("/test", headers={"Authorization": f"Bearer {valid_token}"})
assert response.status_code == 200
assert response.json() == {"message": "Authentication successful"}
@patch("httpx.AsyncClient.post", new=mock_kubernetes_selfsubjectreview_failure)
def test_invalid_kubernetes_auth_authentication(kubernetes_auth_client, invalid_token):
response = kubernetes_auth_client.get("/test", headers={"Authorization": f"Bearer {invalid_token}"})
assert response.status_code == 401
assert "Invalid token" in response.json()["error"]["message"]
@patch("httpx.AsyncClient.post", new=mock_kubernetes_selfsubjectreview_http_error)
def test_kubernetes_auth_http_error(kubernetes_auth_client, valid_token):
response = kubernetes_auth_client.get("/test", headers={"Authorization": f"Bearer {valid_token}"})
assert response.status_code == 401
assert "Token validation failed" in response.json()["error"]["message"]
def test_kubernetes_auth_request_payload(kubernetes_auth_client, valid_token, mock_kubernetes_api_server):
with patch("httpx.AsyncClient.post") as mock_post:
mock_response = MockResponse(
200,
{
"apiVersion": "authentication.k8s.io/v1",
"kind": "SelfSubjectReview",
"metadata": {"creationTimestamp": "2025-07-15T13:53:56Z"},
"status": {
"userInfo": {
"username": "test-user",
"uid": "test-uid",
"groups": ["test-group"],
}
},
},
)
mock_post.return_value = mock_response
kubernetes_auth_client.get("/test", headers={"Authorization": f"Bearer {valid_token}"})
# Verify the request was made with correct parameters
mock_post.assert_called_once()
call_args = mock_post.call_args
# Check URL (passed as positional argument)
assert call_args[0][0] == f"{mock_kubernetes_api_server}/apis/authentication.k8s.io/v1/selfsubjectreviews"
# Check headers (passed as keyword argument)
headers = call_args[1]["headers"]
assert headers["Authorization"] == f"Bearer {valid_token}"
assert headers["Content-Type"] == "application/json"
# Check request body (passed as keyword argument)
request_body = call_args[1]["json"]
assert request_body["apiVersion"] == "authentication.k8s.io/v1"
assert request_body["kind"] == "SelfSubjectReview"

View file

@ -88,3 +88,10 @@ def test_nested_structures(setup_env_vars):
}
expected = {"key1": "test_value", "key2": ["default", "conditional"], "key3": {"nested": None}}
assert replace_env_vars(data) == expected
def test_explicit_strings_preserved(setup_env_vars):
# Explicit strings that look like numbers/booleans should remain strings
data = {"port": "8080", "enabled": "true", "count": "123", "ratio": "3.14"}
expected = {"port": "8080", "enabled": "true", "count": "123", "ratio": "3.14"}
assert replace_env_vars(data) == expected

View file

@ -113,6 +113,15 @@ class TestTranslateException:
assert result.status_code == 504
assert result.detail == "Operation timed out: "
def test_translate_connection_error(self):
"""Test that ConnectionError is translated to 502 HTTP status."""
exc = ConnectionError("Failed to connect to MCP server at http://localhost:9999/sse: Connection refused")
result = translate_exception(exc)
assert isinstance(result, HTTPException)
assert result.status_code == 502
assert result.detail == "Failed to connect to MCP server at http://localhost:9999/sse: Connection refused"
def test_translate_not_implemented_error(self):
"""Test that NotImplementedError is translated to 501 HTTP status."""
exc = NotImplementedError("Not implemented")

View file

@ -65,6 +65,9 @@ async def test_inference_store_pagination_basic():
input_messages = [OpenAIUserMessageParam(role="user", content=f"Test message for {completion_id}")]
await store.store_chat_completion(completion, input_messages)
# Wait for all queued writes to complete
await store.flush()
# Test 1: First page with limit=2, descending order (default)
result = await store.list_chat_completions(limit=2, order=Order.desc)
assert len(result.data) == 2
@ -108,6 +111,9 @@ async def test_inference_store_pagination_ascending():
input_messages = [OpenAIUserMessageParam(role="user", content=f"Test message for {completion_id}")]
await store.store_chat_completion(completion, input_messages)
# Wait for all queued writes to complete
await store.flush()
# Test ascending order pagination
result = await store.list_chat_completions(limit=1, order=Order.asc)
assert len(result.data) == 1
@ -143,6 +149,9 @@ async def test_inference_store_pagination_with_model_filter():
input_messages = [OpenAIUserMessageParam(role="user", content=f"Test message for {completion_id}")]
await store.store_chat_completion(completion, input_messages)
# Wait for all queued writes to complete
await store.flush()
# Test pagination with model filter
result = await store.list_chat_completions(limit=1, model="model-a", order=Order.desc)
assert len(result.data) == 1
@ -190,6 +199,9 @@ async def test_inference_store_pagination_no_limit():
input_messages = [OpenAIUserMessageParam(role="user", content=f"Test message for {completion_id}")]
await store.store_chat_completion(completion, input_messages)
# Wait for all queued writes to complete
await store.flush()
# Test without limit
result = await store.list_chat_completions(order=Order.desc)
assert len(result.data) == 2

View file

@ -332,6 +332,63 @@ async def test_sqlstore_pagination_error_handling():
)
async def test_where_operator_gt_and_update_delete():
with TemporaryDirectory() as tmp_dir:
db_path = tmp_dir + "/test.db"
store = SqlAlchemySqlStoreImpl(SqliteSqlStoreConfig(db_path=db_path))
await store.create_table(
"items",
{
"id": ColumnType.INTEGER,
"value": ColumnType.INTEGER,
"name": ColumnType.STRING,
},
)
await store.insert("items", {"id": 1, "value": 10, "name": "one"})
await store.insert("items", {"id": 2, "value": 20, "name": "two"})
await store.insert("items", {"id": 3, "value": 30, "name": "three"})
result = await store.fetch_all("items", where={"value": {">": 15}})
assert {r["id"] for r in result.data} == {2, 3}
row = await store.fetch_one("items", where={"value": {">=": 30}})
assert row["id"] == 3
await store.update("items", {"name": "small"}, {"value": {"<": 25}})
rows = (await store.fetch_all("items")).data
names = {r["id"]: r["name"] for r in rows}
assert names[1] == "small"
assert names[2] == "small"
assert names[3] == "three"
await store.delete("items", {"id": {"==": 2}})
rows_after = (await store.fetch_all("items")).data
assert {r["id"] for r in rows_after} == {1, 3}
async def test_where_operator_edge_cases():
with TemporaryDirectory() as tmp_dir:
db_path = tmp_dir + "/test.db"
store = SqlAlchemySqlStoreImpl(SqliteSqlStoreConfig(db_path=db_path))
await store.create_table(
"events",
{"id": ColumnType.STRING, "ts": ColumnType.INTEGER},
)
base = 1024
await store.insert("events", {"id": "a", "ts": base - 10})
await store.insert("events", {"id": "b", "ts": base + 10})
row = await store.fetch_one("events", where={"id": "a"})
assert row["id"] == "a"
with pytest.raises(ValueError, match="Unsupported operator"):
await store.fetch_all("events", where={"ts": {"!=": base}})
async def test_sqlstore_pagination_custom_key_column():
"""Test pagination with custom primary key column (not 'id')."""
with TemporaryDirectory() as tmp_dir: