mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-24 04:18:04 +00:00
Merge branch 'main' into vectordb_name
This commit is contained in:
commit
67f1131040
36 changed files with 803 additions and 696 deletions
|
|
@ -93,7 +93,7 @@ def run_stack_build_command(args: argparse.Namespace) -> None:
|
|||
)
|
||||
sys.exit(1)
|
||||
elif args.providers:
|
||||
providers = dict()
|
||||
providers_list: dict[str, str | list[str]] = dict()
|
||||
for api_provider in args.providers.split(","):
|
||||
if "=" not in api_provider:
|
||||
cprint(
|
||||
|
|
@ -112,7 +112,15 @@ def run_stack_build_command(args: argparse.Namespace) -> None:
|
|||
)
|
||||
sys.exit(1)
|
||||
if provider in providers_for_api:
|
||||
providers.setdefault(api, []).append(provider)
|
||||
if api not in providers_list:
|
||||
providers_list[api] = []
|
||||
# Use type guarding to ensure we have a list
|
||||
provider_value = providers_list[api]
|
||||
if isinstance(provider_value, list):
|
||||
provider_value.append(provider)
|
||||
else:
|
||||
# Convert string to list and append
|
||||
providers_list[api] = [provider_value, provider]
|
||||
else:
|
||||
cprint(
|
||||
f"{provider} is not a valid provider for the {api} API.",
|
||||
|
|
@ -121,7 +129,7 @@ def run_stack_build_command(args: argparse.Namespace) -> None:
|
|||
)
|
||||
sys.exit(1)
|
||||
distribution_spec = DistributionSpec(
|
||||
providers=providers,
|
||||
providers=providers_list,
|
||||
description=",".join(args.providers),
|
||||
)
|
||||
if not args.image_type:
|
||||
|
|
@ -182,7 +190,7 @@ def run_stack_build_command(args: argparse.Namespace) -> None:
|
|||
|
||||
cprint("Tip: use <TAB> to see options for the providers.\n", color="green", file=sys.stderr)
|
||||
|
||||
providers = dict()
|
||||
providers: dict[str, str | list[str]] = dict()
|
||||
for api, providers_for_api in get_provider_registry().items():
|
||||
available_providers = [x for x in providers_for_api.keys() if x not in ("remote", "remote::sample")]
|
||||
if not available_providers:
|
||||
|
|
@ -371,10 +379,16 @@ def _run_stack_build_command_from_build_config(
|
|||
if not image_name:
|
||||
raise ValueError("Please specify an image name when building a venv image")
|
||||
|
||||
# At this point, image_name should be guaranteed to be a string
|
||||
if image_name is None:
|
||||
raise ValueError("image_name should not be None after validation")
|
||||
|
||||
if template_name:
|
||||
build_dir = DISTRIBS_BASE_DIR / template_name
|
||||
build_file_path = build_dir / f"{template_name}-build.yaml"
|
||||
else:
|
||||
if image_name is None:
|
||||
raise ValueError("image_name cannot be None")
|
||||
build_dir = DISTRIBS_BASE_DIR / image_name
|
||||
build_file_path = build_dir / f"{image_name}-build.yaml"
|
||||
|
||||
|
|
@ -395,7 +409,7 @@ def _run_stack_build_command_from_build_config(
|
|||
build_file_path,
|
||||
image_name,
|
||||
template_or_config=template_name or config_path or str(build_file_path),
|
||||
run_config=run_config_file,
|
||||
run_config=run_config_file.as_posix() if run_config_file else None,
|
||||
)
|
||||
if return_code != 0:
|
||||
raise RuntimeError(f"Failed to build image {image_name}")
|
||||
|
|
|
|||
|
|
@ -83,46 +83,57 @@ class StackRun(Subcommand):
|
|||
return ImageType.CONDA.value, args.image_name
|
||||
return args.image_type, args.image_name
|
||||
|
||||
def _resolve_config_and_template(self, args: argparse.Namespace) -> tuple[Path | None, str | None]:
|
||||
"""Resolve config file path and template name from args.config"""
|
||||
from llama_stack.distribution.utils.config_dirs import DISTRIBS_BASE_DIR
|
||||
|
||||
if not args.config:
|
||||
return None, None
|
||||
|
||||
config_file = Path(args.config)
|
||||
has_yaml_suffix = args.config.endswith(".yaml")
|
||||
template_name = None
|
||||
|
||||
if not config_file.exists() and not has_yaml_suffix:
|
||||
# check if this is a template
|
||||
config_file = Path(REPO_ROOT) / "llama_stack" / "templates" / args.config / "run.yaml"
|
||||
if config_file.exists():
|
||||
template_name = args.config
|
||||
|
||||
if not config_file.exists() and not has_yaml_suffix:
|
||||
# check if it's a build config saved to ~/.llama dir
|
||||
config_file = Path(DISTRIBS_BASE_DIR / f"llamastack-{args.config}" / f"{args.config}-run.yaml")
|
||||
|
||||
if not config_file.exists():
|
||||
self.parser.error(
|
||||
f"File {str(config_file)} does not exist.\n\nPlease run `llama stack build` to generate (and optionally edit) a run.yaml file"
|
||||
)
|
||||
|
||||
if not config_file.is_file():
|
||||
self.parser.error(
|
||||
f"Config file must be a valid file path, '{config_file}' is not a file: type={type(config_file)}"
|
||||
)
|
||||
|
||||
return config_file, template_name
|
||||
|
||||
def _run_stack_run_cmd(self, args: argparse.Namespace) -> None:
|
||||
import yaml
|
||||
|
||||
from llama_stack.distribution.configure import parse_and_maybe_upgrade_config
|
||||
from llama_stack.distribution.utils.config_dirs import DISTRIBS_BASE_DIR
|
||||
from llama_stack.distribution.utils.exec import formulate_run_args, run_command
|
||||
|
||||
if args.enable_ui:
|
||||
self._start_ui_development_server(args.port)
|
||||
image_type, image_name = self._get_image_type_and_name(args)
|
||||
|
||||
# Resolve config file and template name first
|
||||
config_file, template_name = self._resolve_config_and_template(args)
|
||||
|
||||
# Check if config is required based on image type
|
||||
if (image_type in [ImageType.CONDA.value, ImageType.VENV.value]) and not args.config:
|
||||
if (image_type in [ImageType.CONDA.value, ImageType.VENV.value]) and not config_file:
|
||||
self.parser.error("Config file is required for venv and conda environments")
|
||||
|
||||
if args.config:
|
||||
config_file = Path(args.config)
|
||||
has_yaml_suffix = args.config.endswith(".yaml")
|
||||
template_name = None
|
||||
|
||||
if not config_file.exists() and not has_yaml_suffix:
|
||||
# check if this is a template
|
||||
config_file = Path(REPO_ROOT) / "llama_stack" / "templates" / args.config / "run.yaml"
|
||||
if config_file.exists():
|
||||
template_name = args.config
|
||||
|
||||
if not config_file.exists() and not has_yaml_suffix:
|
||||
# check if it's a build config saved to ~/.llama dir
|
||||
config_file = Path(DISTRIBS_BASE_DIR / f"llamastack-{args.config}" / f"{args.config}-run.yaml")
|
||||
|
||||
if not config_file.exists():
|
||||
self.parser.error(
|
||||
f"File {str(config_file)} does not exist.\n\nPlease run `llama stack build` to generate (and optionally edit) a run.yaml file"
|
||||
)
|
||||
|
||||
if not config_file.is_file():
|
||||
self.parser.error(
|
||||
f"Config file must be a valid file path, '{config_file}' is not a file: type={type(config_file)}"
|
||||
)
|
||||
|
||||
if config_file:
|
||||
logger.info(f"Using run configuration: {config_file}")
|
||||
|
||||
try:
|
||||
|
|
@ -138,8 +149,6 @@ class StackRun(Subcommand):
|
|||
self.parser.error(f"failed to parse config file '{config_file}':\n {e}")
|
||||
else:
|
||||
config = None
|
||||
config_file = None
|
||||
template_name = None
|
||||
|
||||
# If neither image type nor image name is provided, assume the server should be run directly
|
||||
# using the current environment packages.
|
||||
|
|
@ -172,10 +181,7 @@ class StackRun(Subcommand):
|
|||
run_args.extend([str(args.port)])
|
||||
|
||||
if config_file:
|
||||
if template_name:
|
||||
run_args.extend(["--template", str(template_name)])
|
||||
else:
|
||||
run_args.extend(["--config", str(config_file)])
|
||||
run_args.extend(["--config", str(config_file)])
|
||||
|
||||
if args.env:
|
||||
for env_var in args.env:
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ def is_action_allowed(
|
|||
if not len(policy):
|
||||
policy = default_policy()
|
||||
|
||||
qualified_resource_id = resource.type + "::" + resource.identifier
|
||||
qualified_resource_id = f"{resource.type}::{resource.identifier}"
|
||||
for rule in policy:
|
||||
if rule.forbid and matches_scope(rule.forbid, action, qualified_resource_id, user.principal):
|
||||
if rule.when:
|
||||
|
|
|
|||
|
|
@ -96,7 +96,7 @@ FROM $container_base
|
|||
WORKDIR /app
|
||||
|
||||
# We install the Python 3.12 dev headers and build tools so that any
|
||||
# C‑extension wheels (e.g. polyleven, faiss‑cpu) can compile successfully.
|
||||
# C-extension wheels (e.g. polyleven, faiss-cpu) can compile successfully.
|
||||
|
||||
RUN dnf -y update && dnf install -y iputils git net-tools wget \
|
||||
vim-minimal python3.12 python3.12-pip python3.12-wheel \
|
||||
|
|
@ -169,7 +169,7 @@ if [ -n "$run_config" ]; then
|
|||
echo "Copying external providers directory: $external_providers_dir"
|
||||
cp -r "$external_providers_dir" "$BUILD_CONTEXT_DIR/providers.d"
|
||||
add_to_container << EOF
|
||||
COPY --chmod=g+w providers.d /.llama/providers.d
|
||||
COPY providers.d /.llama/providers.d
|
||||
EOF
|
||||
fi
|
||||
|
||||
|
|
|
|||
|
|
@ -445,7 +445,7 @@ def main(args: argparse.Namespace | None = None):
|
|||
logger.info(log_line)
|
||||
|
||||
logger.info("Run configuration:")
|
||||
safe_config = redact_sensitive_fields(config.model_dump())
|
||||
safe_config = redact_sensitive_fields(config.model_dump(mode="json"))
|
||||
logger.info(yaml.dump(safe_config, indent=2))
|
||||
|
||||
app = FastAPI(
|
||||
|
|
|
|||
|
|
@ -6,12 +6,9 @@
|
|||
|
||||
from collections.abc import AsyncGenerator
|
||||
from contextvars import ContextVar
|
||||
from typing import TypeVar
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def preserve_contexts_async_generator(
|
||||
def preserve_contexts_async_generator[T](
|
||||
gen: AsyncGenerator[T, None], context_vars: list[ContextVar]
|
||||
) -> AsyncGenerator[T, None]:
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ from llama_stack.schema_utils import json_schema_type
|
|||
@json_schema_type
|
||||
class MilvusVectorIOConfig(BaseModel):
|
||||
db_path: str
|
||||
kvstore: KVStoreConfig
|
||||
kvstore: KVStoreConfig = Field(description="Config for KV store backend (SQLite only for now)")
|
||||
consistency_level: str = Field(description="The consistency level of the Milvus server", default="Strong")
|
||||
|
||||
@classmethod
|
||||
|
|
|
|||
|
|
@ -6,14 +6,24 @@
|
|||
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from llama_stack.providers.utils.kvstore.config import (
|
||||
KVStoreConfig,
|
||||
SqliteKVStoreConfig,
|
||||
)
|
||||
|
||||
|
||||
class SQLiteVectorIOConfig(BaseModel):
|
||||
db_path: str
|
||||
db_path: str = Field(description="Path to the SQLite database file")
|
||||
kvstore: KVStoreConfig = Field(description="Config for KV store backend (SQLite only for now)")
|
||||
|
||||
@classmethod
|
||||
def sample_run_config(cls, __distro_dir__: str) -> dict[str, Any]:
|
||||
return {
|
||||
"db_path": "${env.SQLITE_STORE_DIR:=" + __distro_dir__ + "}/" + "sqlite_vec.db",
|
||||
"kvstore": SqliteKVStoreConfig.sample_run_config(
|
||||
__distro_dir__=__distro_dir__,
|
||||
db_name="sqlite_vec_registry.db",
|
||||
),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,6 +24,8 @@ from llama_stack.apis.vector_io import (
|
|||
VectorIO,
|
||||
)
|
||||
from llama_stack.providers.datatypes import VectorDBsProtocolPrivate
|
||||
from llama_stack.providers.utils.kvstore import kvstore_impl
|
||||
from llama_stack.providers.utils.kvstore.api import KVStore
|
||||
from llama_stack.providers.utils.memory.openai_vector_store_mixin import OpenAIVectorStoreMixin
|
||||
from llama_stack.providers.utils.memory.vector_store import (
|
||||
RERANKER_TYPE_RRF,
|
||||
|
|
@ -40,6 +42,13 @@ KEYWORD_SEARCH = "keyword"
|
|||
HYBRID_SEARCH = "hybrid"
|
||||
SEARCH_MODES = {VECTOR_SEARCH, KEYWORD_SEARCH, HYBRID_SEARCH}
|
||||
|
||||
VERSION = "v3"
|
||||
VECTOR_DBS_PREFIX = f"vector_dbs:sqlite_vec:{VERSION}::"
|
||||
VECTOR_INDEX_PREFIX = f"vector_index:sqlite_vec:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_PREFIX = f"openai_vector_stores:sqlite_vec:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_PREFIX = f"openai_vector_stores_files:sqlite_vec:{VERSION}::"
|
||||
OPENAI_VECTOR_STORES_FILES_CONTENTS_PREFIX = f"openai_vector_stores_files_contents:sqlite_vec:{VERSION}::"
|
||||
|
||||
|
||||
def serialize_vector(vector: list[float]) -> bytes:
|
||||
"""Serialize a list of floats into a compact binary representation."""
|
||||
|
|
@ -117,13 +126,14 @@ class SQLiteVecIndex(EmbeddingIndex):
|
|||
- An FTS5 table (fts_chunks_{bank_id}) for full-text keyword search.
|
||||
"""
|
||||
|
||||
def __init__(self, dimension: int, db_path: str, bank_id: str):
|
||||
def __init__(self, dimension: int, db_path: str, bank_id: str, kvstore: KVStore | None = None):
|
||||
self.dimension = dimension
|
||||
self.db_path = db_path
|
||||
self.bank_id = bank_id
|
||||
self.metadata_table = f"chunks_{bank_id}".replace("-", "_")
|
||||
self.vector_table = f"vec_chunks_{bank_id}".replace("-", "_")
|
||||
self.fts_table = f"fts_chunks_{bank_id}".replace("-", "_")
|
||||
self.kvstore = kvstore
|
||||
|
||||
@classmethod
|
||||
async def create(cls, dimension: int, db_path: str, bank_id: str):
|
||||
|
|
@ -425,27 +435,116 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
|
|||
self.files_api = files_api
|
||||
self.cache: dict[str, VectorDBWithIndex] = {}
|
||||
self.openai_vector_stores: dict[str, dict[str, Any]] = {}
|
||||
self.kvstore: KVStore | None = None
|
||||
|
||||
async def initialize(self) -> None:
|
||||
def _setup_connection():
|
||||
# Open a connection to the SQLite database (the file is specified in the config).
|
||||
self.kvstore = await kvstore_impl(self.config.kvstore)
|
||||
|
||||
start_key = VECTOR_DBS_PREFIX
|
||||
end_key = f"{VECTOR_DBS_PREFIX}\xff"
|
||||
stored_vector_dbs = await self.kvstore.values_in_range(start_key, end_key)
|
||||
for db_json in stored_vector_dbs:
|
||||
vector_db = VectorDB.model_validate_json(db_json)
|
||||
index = await SQLiteVecIndex.create(
|
||||
vector_db.embedding_dimension,
|
||||
self.config.db_path,
|
||||
vector_db.identifier,
|
||||
)
|
||||
self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api)
|
||||
|
||||
# load any existing OpenAI vector stores
|
||||
self.openai_vector_stores = await self._load_openai_vector_stores()
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
# nothing to do since we don't maintain a persistent connection
|
||||
pass
|
||||
|
||||
async def list_vector_dbs(self) -> list[VectorDB]:
|
||||
return [v.vector_db for v in self.cache.values()]
|
||||
|
||||
async def register_vector_db(self, vector_db: VectorDB) -> None:
|
||||
index = await SQLiteVecIndex.create(
|
||||
vector_db.embedding_dimension,
|
||||
self.config.db_path,
|
||||
vector_db.identifier,
|
||||
)
|
||||
self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api)
|
||||
|
||||
async def _get_and_cache_vector_db_index(self, vector_db_id: str) -> VectorDBWithIndex | None:
|
||||
if vector_db_id in self.cache:
|
||||
return self.cache[vector_db_id]
|
||||
|
||||
if self.vector_db_store is None:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
|
||||
vector_db = self.vector_db_store.get_vector_db(vector_db_id)
|
||||
if not vector_db:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
|
||||
index = VectorDBWithIndex(
|
||||
vector_db=vector_db,
|
||||
index=SQLiteVecIndex(
|
||||
dimension=vector_db.embedding_dimension,
|
||||
db_path=self.config.db_path,
|
||||
bank_id=vector_db.identifier,
|
||||
kvstore=self.kvstore,
|
||||
),
|
||||
inference_api=self.inference_api,
|
||||
)
|
||||
self.cache[vector_db_id] = index
|
||||
return index
|
||||
|
||||
async def unregister_vector_db(self, vector_db_id: str) -> None:
|
||||
if vector_db_id not in self.cache:
|
||||
logger.warning(f"Vector DB {vector_db_id} not found")
|
||||
return
|
||||
await self.cache[vector_db_id].index.delete()
|
||||
del self.cache[vector_db_id]
|
||||
|
||||
# OpenAI Vector Store Mixin abstract method implementations
|
||||
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
|
||||
"""Save vector store metadata to SQLite database."""
|
||||
assert self.kvstore is not None
|
||||
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
|
||||
await self.kvstore.set(key=key, value=json.dumps(store_info))
|
||||
self.openai_vector_stores[store_id] = store_info
|
||||
|
||||
async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]:
|
||||
"""Load all vector store metadata from SQLite database."""
|
||||
assert self.kvstore is not None
|
||||
start_key = OPENAI_VECTOR_STORES_PREFIX
|
||||
end_key = f"{OPENAI_VECTOR_STORES_PREFIX}\xff"
|
||||
stored_openai_stores = await self.kvstore.values_in_range(start_key, end_key)
|
||||
stores = {}
|
||||
for store_data in stored_openai_stores:
|
||||
store_info = json.loads(store_data)
|
||||
stores[store_info["id"]] = store_info
|
||||
return stores
|
||||
|
||||
async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
|
||||
"""Update vector store metadata in SQLite database."""
|
||||
assert self.kvstore is not None
|
||||
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
|
||||
await self.kvstore.set(key=key, value=json.dumps(store_info))
|
||||
self.openai_vector_stores[store_id] = store_info
|
||||
|
||||
async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None:
|
||||
"""Delete vector store metadata from SQLite database."""
|
||||
assert self.kvstore is not None
|
||||
key = f"{OPENAI_VECTOR_STORES_PREFIX}{store_id}"
|
||||
await self.kvstore.delete(key)
|
||||
if store_id in self.openai_vector_stores:
|
||||
del self.openai_vector_stores[store_id]
|
||||
|
||||
async def _save_openai_vector_store_file(
|
||||
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Save vector store file metadata to SQLite database."""
|
||||
|
||||
def _create_or_store():
|
||||
connection = _create_sqlite_connection(self.config.db_path)
|
||||
cur = connection.cursor()
|
||||
try:
|
||||
# Create a table to persist vector DB registrations.
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS vector_dbs (
|
||||
id TEXT PRIMARY KEY,
|
||||
metadata TEXT
|
||||
);
|
||||
""")
|
||||
# Create a table to persist OpenAI vector stores.
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS openai_vector_stores (
|
||||
id TEXT PRIMARY KEY,
|
||||
metadata TEXT
|
||||
);
|
||||
""")
|
||||
# Create a table to persist OpenAI vector store files.
|
||||
cur.execute("""
|
||||
CREATE TABLE IF NOT EXISTS openai_vector_store_files (
|
||||
|
|
@ -464,168 +563,6 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
|
|||
);
|
||||
""")
|
||||
connection.commit()
|
||||
# Load any existing vector DB registrations.
|
||||
cur.execute("SELECT metadata FROM vector_dbs")
|
||||
vector_db_rows = cur.fetchall()
|
||||
return vector_db_rows
|
||||
finally:
|
||||
cur.close()
|
||||
connection.close()
|
||||
|
||||
vector_db_rows = await asyncio.to_thread(_setup_connection)
|
||||
|
||||
# Load existing vector DBs
|
||||
for row in vector_db_rows:
|
||||
vector_db_data = row[0]
|
||||
vector_db = VectorDB.model_validate_json(vector_db_data)
|
||||
index = await SQLiteVecIndex.create(
|
||||
vector_db.embedding_dimension,
|
||||
self.config.db_path,
|
||||
vector_db.identifier,
|
||||
)
|
||||
self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api)
|
||||
|
||||
# Load existing OpenAI vector stores using the mixin method
|
||||
self.openai_vector_stores = await self._load_openai_vector_stores()
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
# nothing to do since we don't maintain a persistent connection
|
||||
pass
|
||||
|
||||
async def register_vector_db(self, vector_db: VectorDB) -> None:
|
||||
def _register_db():
|
||||
connection = _create_sqlite_connection(self.config.db_path)
|
||||
cur = connection.cursor()
|
||||
try:
|
||||
cur.execute(
|
||||
"INSERT OR REPLACE INTO vector_dbs (id, metadata) VALUES (?, ?)",
|
||||
(vector_db.identifier, vector_db.model_dump_json()),
|
||||
)
|
||||
connection.commit()
|
||||
finally:
|
||||
cur.close()
|
||||
connection.close()
|
||||
|
||||
await asyncio.to_thread(_register_db)
|
||||
index = await SQLiteVecIndex.create(
|
||||
vector_db.embedding_dimension,
|
||||
self.config.db_path,
|
||||
vector_db.identifier,
|
||||
)
|
||||
self.cache[vector_db.identifier] = VectorDBWithIndex(vector_db, index, self.inference_api)
|
||||
|
||||
async def list_vector_dbs(self) -> list[VectorDB]:
|
||||
return [v.vector_db for v in self.cache.values()]
|
||||
|
||||
async def unregister_vector_db(self, vector_db_id: str) -> None:
|
||||
if vector_db_id not in self.cache:
|
||||
logger.warning(f"Vector DB {vector_db_id} not found")
|
||||
return
|
||||
await self.cache[vector_db_id].index.delete()
|
||||
del self.cache[vector_db_id]
|
||||
|
||||
def _delete_vector_db_from_registry():
|
||||
connection = _create_sqlite_connection(self.config.db_path)
|
||||
cur = connection.cursor()
|
||||
try:
|
||||
cur.execute("DELETE FROM vector_dbs WHERE id = ?", (vector_db_id,))
|
||||
connection.commit()
|
||||
finally:
|
||||
cur.close()
|
||||
connection.close()
|
||||
|
||||
await asyncio.to_thread(_delete_vector_db_from_registry)
|
||||
|
||||
# OpenAI Vector Store Mixin abstract method implementations
|
||||
async def _save_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
|
||||
"""Save vector store metadata to SQLite database."""
|
||||
|
||||
def _store():
|
||||
connection = _create_sqlite_connection(self.config.db_path)
|
||||
cur = connection.cursor()
|
||||
try:
|
||||
cur.execute(
|
||||
"INSERT OR REPLACE INTO openai_vector_stores (id, metadata) VALUES (?, ?)",
|
||||
(store_id, json.dumps(store_info)),
|
||||
)
|
||||
connection.commit()
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving openai vector store {store_id}: {e}")
|
||||
raise
|
||||
finally:
|
||||
cur.close()
|
||||
connection.close()
|
||||
|
||||
try:
|
||||
await asyncio.to_thread(_store)
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving openai vector store {store_id}: {e}")
|
||||
raise
|
||||
|
||||
async def _load_openai_vector_stores(self) -> dict[str, dict[str, Any]]:
|
||||
"""Load all vector store metadata from SQLite database."""
|
||||
|
||||
def _load():
|
||||
connection = _create_sqlite_connection(self.config.db_path)
|
||||
cur = connection.cursor()
|
||||
try:
|
||||
cur.execute("SELECT metadata FROM openai_vector_stores")
|
||||
rows = cur.fetchall()
|
||||
return rows
|
||||
finally:
|
||||
cur.close()
|
||||
connection.close()
|
||||
|
||||
rows = await asyncio.to_thread(_load)
|
||||
stores = {}
|
||||
for row in rows:
|
||||
store_data = row[0]
|
||||
store_info = json.loads(store_data)
|
||||
stores[store_info["id"]] = store_info
|
||||
return stores
|
||||
|
||||
async def _update_openai_vector_store(self, store_id: str, store_info: dict[str, Any]) -> None:
|
||||
"""Update vector store metadata in SQLite database."""
|
||||
|
||||
def _update():
|
||||
connection = _create_sqlite_connection(self.config.db_path)
|
||||
cur = connection.cursor()
|
||||
try:
|
||||
cur.execute(
|
||||
"UPDATE openai_vector_stores SET metadata = ? WHERE id = ?",
|
||||
(json.dumps(store_info), store_id),
|
||||
)
|
||||
connection.commit()
|
||||
finally:
|
||||
cur.close()
|
||||
connection.close()
|
||||
|
||||
await asyncio.to_thread(_update)
|
||||
|
||||
async def _delete_openai_vector_store_from_storage(self, store_id: str) -> None:
|
||||
"""Delete vector store metadata from SQLite database."""
|
||||
|
||||
def _delete():
|
||||
connection = _create_sqlite_connection(self.config.db_path)
|
||||
cur = connection.cursor()
|
||||
try:
|
||||
cur.execute("DELETE FROM openai_vector_stores WHERE id = ?", (store_id,))
|
||||
connection.commit()
|
||||
finally:
|
||||
cur.close()
|
||||
connection.close()
|
||||
|
||||
await asyncio.to_thread(_delete)
|
||||
|
||||
async def _save_openai_vector_store_file(
|
||||
self, store_id: str, file_id: str, file_info: dict[str, Any], file_contents: list[dict[str, Any]]
|
||||
) -> None:
|
||||
"""Save vector store file metadata to SQLite database."""
|
||||
|
||||
def _store():
|
||||
connection = _create_sqlite_connection(self.config.db_path)
|
||||
cur = connection.cursor()
|
||||
try:
|
||||
cur.execute(
|
||||
"INSERT OR REPLACE INTO openai_vector_store_files (store_id, file_id, metadata) VALUES (?, ?, ?)",
|
||||
(store_id, file_id, json.dumps(file_info)),
|
||||
|
|
@ -643,7 +580,7 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
|
|||
connection.close()
|
||||
|
||||
try:
|
||||
await asyncio.to_thread(_store)
|
||||
await asyncio.to_thread(_create_or_store)
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving openai vector store file {store_id} {file_id}: {e}")
|
||||
raise
|
||||
|
|
@ -722,6 +659,10 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
|
|||
cur.execute(
|
||||
"DELETE FROM openai_vector_store_files WHERE store_id = ? AND file_id = ?", (store_id, file_id)
|
||||
)
|
||||
cur.execute(
|
||||
"DELETE FROM openai_vector_store_files_contents WHERE store_id = ? AND file_id = ?",
|
||||
(store_id, file_id),
|
||||
)
|
||||
connection.commit()
|
||||
finally:
|
||||
cur.close()
|
||||
|
|
@ -730,15 +671,17 @@ class SQLiteVecVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtoc
|
|||
await asyncio.to_thread(_delete)
|
||||
|
||||
async def insert_chunks(self, vector_db_id: str, chunks: list[Chunk], ttl_seconds: int | None = None) -> None:
|
||||
if vector_db_id not in self.cache:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found. Found: {list(self.cache.keys())}")
|
||||
index = await self._get_and_cache_vector_db_index(vector_db_id)
|
||||
if not index:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
# The VectorDBWithIndex helper is expected to compute embeddings via the inference_api
|
||||
# and then call our index's add_chunks.
|
||||
await self.cache[vector_db_id].insert_chunks(chunks)
|
||||
await index.insert_chunks(chunks)
|
||||
|
||||
async def query_chunks(
|
||||
self, vector_db_id: str, query: Any, params: dict[str, Any] | None = None
|
||||
) -> QueryChunksResponse:
|
||||
if vector_db_id not in self.cache:
|
||||
index = await self._get_and_cache_vector_db_index(vector_db_id)
|
||||
if not index:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
return await self.cache[vector_db_id].query_chunks(query, params)
|
||||
return await index.query_chunks(query, params)
|
||||
|
|
|
|||
|
|
@ -61,6 +61,11 @@ class MilvusIndex(EmbeddingIndex):
|
|||
self.consistency_level = consistency_level
|
||||
self.kvstore = kvstore
|
||||
|
||||
async def initialize(self):
|
||||
# MilvusIndex does not require explicit initialization
|
||||
# TODO: could move collection creation into initialization but it is not really necessary
|
||||
pass
|
||||
|
||||
async def delete(self):
|
||||
if await asyncio.to_thread(self.client.has_collection, self.collection_name):
|
||||
await asyncio.to_thread(self.client.drop_collection, collection_name=self.collection_name)
|
||||
|
|
@ -199,6 +204,9 @@ class MilvusVectorIOAdapter(OpenAIVectorStoreMixin, VectorIO, VectorDBsProtocolP
|
|||
if vector_db_id in self.cache:
|
||||
return self.cache[vector_db_id]
|
||||
|
||||
if self.vector_db_store is None:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
|
||||
vector_db = await self.vector_db_store.get_vector_db(vector_db_id)
|
||||
if not vector_db:
|
||||
raise ValueError(f"Vector DB {vector_db_id} not found")
|
||||
|
|
|
|||
|
|
@ -39,22 +39,10 @@ SQL_OPTIMIZED_POLICY = [
|
|||
|
||||
|
||||
class SqlRecord(ProtectedResource):
|
||||
"""Simple ProtectedResource implementation for SQL records."""
|
||||
|
||||
def __init__(self, record_id: str, table_name: str, access_attributes: dict[str, list[str]] | None = None):
|
||||
def __init__(self, record_id: str, table_name: str, owner: User):
|
||||
self.type = f"sql_record::{table_name}"
|
||||
self.identifier = record_id
|
||||
|
||||
if access_attributes:
|
||||
self.owner = User(
|
||||
principal="system",
|
||||
attributes=access_attributes,
|
||||
)
|
||||
else:
|
||||
self.owner = User(
|
||||
principal="system_public",
|
||||
attributes=None,
|
||||
)
|
||||
self.owner = owner
|
||||
|
||||
|
||||
class AuthorizedSqlStore:
|
||||
|
|
@ -101,22 +89,27 @@ class AuthorizedSqlStore:
|
|||
|
||||
async def create_table(self, table: str, schema: Mapping[str, ColumnType | ColumnDefinition]) -> None:
|
||||
"""Create a table with built-in access control support."""
|
||||
await self.sql_store.add_column_if_not_exists(table, "access_attributes", ColumnType.JSON)
|
||||
|
||||
enhanced_schema = dict(schema)
|
||||
if "access_attributes" not in enhanced_schema:
|
||||
enhanced_schema["access_attributes"] = ColumnType.JSON
|
||||
if "owner_principal" not in enhanced_schema:
|
||||
enhanced_schema["owner_principal"] = ColumnType.STRING
|
||||
|
||||
await self.sql_store.create_table(table, enhanced_schema)
|
||||
await self.sql_store.add_column_if_not_exists(table, "access_attributes", ColumnType.JSON)
|
||||
await self.sql_store.add_column_if_not_exists(table, "owner_principal", ColumnType.STRING)
|
||||
|
||||
async def insert(self, table: str, data: Mapping[str, Any]) -> None:
|
||||
"""Insert a row with automatic access control attribute capture."""
|
||||
enhanced_data = dict(data)
|
||||
|
||||
current_user = get_authenticated_user()
|
||||
if current_user and current_user.attributes:
|
||||
if current_user:
|
||||
enhanced_data["owner_principal"] = current_user.principal
|
||||
enhanced_data["access_attributes"] = current_user.attributes
|
||||
else:
|
||||
enhanced_data["owner_principal"] = None
|
||||
enhanced_data["access_attributes"] = None
|
||||
|
||||
await self.sql_store.insert(table, enhanced_data)
|
||||
|
|
@ -146,9 +139,12 @@ class AuthorizedSqlStore:
|
|||
|
||||
for row in rows.data:
|
||||
stored_access_attrs = row.get("access_attributes")
|
||||
stored_owner_principal = row.get("owner_principal") or ""
|
||||
|
||||
record_id = row.get("id", "unknown")
|
||||
sql_record = SqlRecord(str(record_id), table, stored_access_attrs)
|
||||
sql_record = SqlRecord(
|
||||
str(record_id), table, User(principal=stored_owner_principal, attributes=stored_access_attrs)
|
||||
)
|
||||
|
||||
if is_action_allowed(policy, Action.READ, sql_record, current_user):
|
||||
filtered_rows.append(row)
|
||||
|
|
@ -186,8 +182,10 @@ class AuthorizedSqlStore:
|
|||
Only applies SQL filtering for the default policy to ensure correctness.
|
||||
For custom policies, uses conservative filtering to avoid blocking legitimate access.
|
||||
"""
|
||||
current_user = get_authenticated_user()
|
||||
|
||||
if not policy or policy == SQL_OPTIMIZED_POLICY:
|
||||
return self._build_default_policy_where_clause()
|
||||
return self._build_default_policy_where_clause(current_user)
|
||||
else:
|
||||
return self._build_conservative_where_clause()
|
||||
|
||||
|
|
@ -227,29 +225,27 @@ class AuthorizedSqlStore:
|
|||
|
||||
def _get_public_access_conditions(self) -> list[str]:
|
||||
"""Get the SQL conditions for public access."""
|
||||
# Public records are records that have no owner_principal or access_attributes
|
||||
conditions = ["owner_principal = ''"]
|
||||
if self.database_type == SqlStoreType.postgres:
|
||||
# Postgres stores JSON null as 'null'
|
||||
return ["access_attributes::text = 'null'"]
|
||||
conditions.append("access_attributes::text = 'null'")
|
||||
elif self.database_type == SqlStoreType.sqlite:
|
||||
return ["access_attributes = 'null'"]
|
||||
conditions.append("access_attributes = 'null'")
|
||||
else:
|
||||
raise ValueError(f"Unsupported database type: {self.database_type}")
|
||||
return conditions
|
||||
|
||||
def _build_default_policy_where_clause(self) -> str:
|
||||
def _build_default_policy_where_clause(self, current_user: User | None) -> str:
|
||||
"""Build SQL WHERE clause for the default policy.
|
||||
|
||||
Default policy: permit all actions when user in owners [roles, teams, projects, namespaces]
|
||||
This means user must match ALL attribute categories that exist in the resource.
|
||||
"""
|
||||
current_user = get_authenticated_user()
|
||||
|
||||
base_conditions = self._get_public_access_conditions()
|
||||
if not current_user or not current_user.attributes:
|
||||
# Only allow public records
|
||||
return f"({' OR '.join(base_conditions)})"
|
||||
else:
|
||||
user_attr_conditions = []
|
||||
user_attr_conditions = []
|
||||
|
||||
if current_user and current_user.attributes:
|
||||
for attr_key, user_values in current_user.attributes.items():
|
||||
if user_values:
|
||||
value_conditions = []
|
||||
|
|
@ -269,7 +265,7 @@ class AuthorizedSqlStore:
|
|||
all_requirements_met = f"({' AND '.join(user_attr_conditions)})"
|
||||
base_conditions.append(all_requirements_met)
|
||||
|
||||
return f"({' OR '.join(base_conditions)})"
|
||||
return f"({' OR '.join(base_conditions)})"
|
||||
|
||||
def _build_conservative_where_clause(self) -> str:
|
||||
"""Conservative SQL filtering for custom policies.
|
||||
|
|
|
|||
|
|
@ -244,35 +244,41 @@ class SqlAlchemySqlStoreImpl(SqlStore):
|
|||
engine = create_async_engine(self.config.engine_str)
|
||||
|
||||
try:
|
||||
inspector = inspect(engine)
|
||||
|
||||
table_names = inspector.get_table_names()
|
||||
if table not in table_names:
|
||||
return
|
||||
|
||||
existing_columns = inspector.get_columns(table)
|
||||
column_names = [col["name"] for col in existing_columns]
|
||||
|
||||
if column_name in column_names:
|
||||
return
|
||||
|
||||
sqlalchemy_type = TYPE_MAPPING.get(column_type)
|
||||
if not sqlalchemy_type:
|
||||
raise ValueError(f"Unsupported column type '{column_type}' for column '{column_name}'.")
|
||||
|
||||
# Create the ALTER TABLE statement
|
||||
# Note: We need to get the dialect-specific type name
|
||||
dialect = engine.dialect
|
||||
type_impl = sqlalchemy_type()
|
||||
compiled_type = type_impl.compile(dialect=dialect)
|
||||
|
||||
nullable_clause = "" if nullable else " NOT NULL"
|
||||
add_column_sql = text(f"ALTER TABLE {table} ADD COLUMN {column_name} {compiled_type}{nullable_clause}")
|
||||
|
||||
async with engine.begin() as conn:
|
||||
|
||||
def check_column_exists(sync_conn):
|
||||
inspector = inspect(sync_conn)
|
||||
|
||||
table_names = inspector.get_table_names()
|
||||
if table not in table_names:
|
||||
return False, False # table doesn't exist, column doesn't exist
|
||||
|
||||
existing_columns = inspector.get_columns(table)
|
||||
column_names = [col["name"] for col in existing_columns]
|
||||
|
||||
return True, column_name in column_names # table exists, column exists or not
|
||||
|
||||
table_exists, column_exists = await conn.run_sync(check_column_exists)
|
||||
if not table_exists or column_exists:
|
||||
return
|
||||
|
||||
sqlalchemy_type = TYPE_MAPPING.get(column_type)
|
||||
if not sqlalchemy_type:
|
||||
raise ValueError(f"Unsupported column type '{column_type}' for column '{column_name}'.")
|
||||
|
||||
# Create the ALTER TABLE statement
|
||||
# Note: We need to get the dialect-specific type name
|
||||
dialect = engine.dialect
|
||||
type_impl = sqlalchemy_type()
|
||||
compiled_type = type_impl.compile(dialect=dialect)
|
||||
|
||||
nullable_clause = "" if nullable else " NOT NULL"
|
||||
add_column_sql = text(f"ALTER TABLE {table} ADD COLUMN {column_name} {compiled_type}{nullable_clause}")
|
||||
|
||||
await conn.execute(add_column_sql)
|
||||
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
# If any error occurs during migration, log it but don't fail
|
||||
# The table creation will handle adding the column
|
||||
logger.error(f"Error adding column {column_name} to table {table}: {e}")
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -9,14 +9,12 @@ import inspect
|
|||
import json
|
||||
from collections.abc import AsyncGenerator, Callable
|
||||
from functools import wraps
|
||||
from typing import Any, TypeVar
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from llama_stack.models.llama.datatypes import Primitive
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def serialize_value(value: Any) -> Primitive:
|
||||
return str(_prepare_for_json(value))
|
||||
|
|
@ -44,7 +42,7 @@ def _prepare_for_json(value: Any) -> str:
|
|||
return str(value)
|
||||
|
||||
|
||||
def trace_protocol(cls: type[T]) -> type[T]:
|
||||
def trace_protocol[T](cls: type[T]) -> type[T]:
|
||||
"""
|
||||
A class decorator that automatically traces all methods in a protocol/base class
|
||||
and its inheriting classes.
|
||||
|
|
|
|||
|
|
@ -39,6 +39,9 @@ providers:
|
|||
provider_type: inline::sqlite-vec
|
||||
config:
|
||||
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/open-benchmark}/sqlite_vec.db
|
||||
kvstore:
|
||||
type: sqlite
|
||||
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/open-benchmark}/sqlite_vec_registry.db
|
||||
- provider_id: ${env.ENABLE_CHROMADB:+chromadb}
|
||||
provider_type: remote::chromadb
|
||||
config:
|
||||
|
|
|
|||
|
|
@ -144,6 +144,9 @@ providers:
|
|||
provider_type: inline::sqlite-vec
|
||||
config:
|
||||
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter}/sqlite_vec.db
|
||||
kvstore:
|
||||
type: sqlite
|
||||
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter}/sqlite_vec_registry.db
|
||||
- provider_id: ${env.ENABLE_MILVUS:=__disabled__}
|
||||
provider_type: inline::milvus
|
||||
config:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue