feat(stores)!: use backend storage references instead of configs

This commit is contained in:
Ashwin Bharambe 2025-10-16 16:24:31 -07:00
parent b3099d40e2
commit ea9664874d
47 changed files with 893 additions and 696 deletions

View file

@ -4,7 +4,6 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import os
import secrets
import time
from typing import Any
@ -21,16 +20,11 @@ from llama_stack.apis.conversations.conversations import (
Conversations,
Metadata,
)
from llama_stack.core.datatypes import AccessRule
from llama_stack.core.utils.config_dirs import DISTRIBS_BASE_DIR
from llama_stack.core.datatypes import AccessRule, StackRunConfig
from llama_stack.log import get_logger
from llama_stack.providers.utils.sqlstore.api import ColumnDefinition, ColumnType
from llama_stack.providers.utils.sqlstore.authorized_sqlstore import AuthorizedSqlStore
from llama_stack.providers.utils.sqlstore.sqlstore import (
SqliteSqlStoreConfig,
SqlStoreConfig,
sqlstore_impl,
)
from llama_stack.providers.utils.sqlstore.sqlstore import sqlstore_impl
logger = get_logger(name=__name__, category="openai_conversations")
@ -38,13 +32,11 @@ logger = get_logger(name=__name__, category="openai_conversations")
class ConversationServiceConfig(BaseModel):
"""Configuration for the built-in conversation service.
:param conversations_store: SQL store configuration for conversations (defaults to SQLite)
:param run_config: Stack run configuration for resolving persistence
:param policy: Access control rules
"""
conversations_store: SqlStoreConfig = SqliteSqlStoreConfig(
db_path=(DISTRIBS_BASE_DIR / "conversations.db").as_posix()
)
run_config: StackRunConfig
policy: list[AccessRule] = []
@ -63,14 +55,16 @@ class ConversationServiceImpl(Conversations):
self.deps = deps
self.policy = config.policy
base_sql_store = sqlstore_impl(config.conversations_store)
# Use conversations store reference from storage config
conversations_ref = config.run_config.storage.conversations
if not conversations_ref:
raise ValueError("storage.conversations must be configured in run config")
base_sql_store = sqlstore_impl(conversations_ref)
self.sql_store = AuthorizedSqlStore(base_sql_store, self.policy)
async def initialize(self) -> None:
"""Initialize the store and create tables."""
if isinstance(self.config.conversations_store, SqliteSqlStoreConfig):
os.makedirs(os.path.dirname(self.config.conversations_store.db_path), exist_ok=True)
await self.sql_store.create_table(
"openai_conversations",
{

View file

@ -26,9 +26,8 @@ from llama_stack.apis.tools import ToolGroup, ToolGroupInput, ToolRuntime
from llama_stack.apis.vector_dbs import VectorDB, VectorDBInput
from llama_stack.apis.vector_io import VectorIO
from llama_stack.core.access_control.datatypes import AccessRule
from llama_stack.core.storage.datatypes import KVStoreReference, StorageConfig
from llama_stack.providers.datatypes import Api, ProviderSpec
from llama_stack.providers.utils.kvstore.config import KVStoreConfig, SqliteKVStoreConfig
from llama_stack.providers.utils.sqlstore.sqlstore import SqlStoreConfig
LLAMA_STACK_BUILD_CONFIG_VERSION = 2
LLAMA_STACK_RUN_CONFIG_VERSION = 2
@ -356,7 +355,7 @@ class QuotaPeriod(StrEnum):
class QuotaConfig(BaseModel):
kvstore: SqliteKVStoreConfig = Field(description="Config for KV store backend (SQLite only for now)")
kvstore: KVStoreReference = Field(description="Config for KV store backend (SQLite only for now)")
anonymous_max_requests: int = Field(default=100, description="Max requests for unauthenticated clients per period")
authenticated_max_requests: int = Field(
default=1000, description="Max requests for authenticated clients per period"
@ -438,18 +437,6 @@ class ServerConfig(BaseModel):
)
class InferenceStoreConfig(BaseModel):
sql_store_config: SqlStoreConfig
max_write_queue_size: int = Field(default=10000, description="Max queued writes for inference store")
num_writers: int = Field(default=4, description="Number of concurrent background writers")
class ResponsesStoreConfig(BaseModel):
sql_store_config: SqlStoreConfig
max_write_queue_size: int = Field(default=10000, description="Max queued writes for responses store")
num_writers: int = Field(default=4, description="Number of concurrent background writers")
class StackRunConfig(BaseModel):
version: int = LLAMA_STACK_RUN_CONFIG_VERSION
@ -476,26 +463,11 @@ One or more providers to use for each API. The same provider_type (e.g., meta-re
can be instantiated multiple times (with different configs) if necessary.
""",
)
metadata_store: KVStoreConfig | None = Field(
default=None,
storage: StorageConfig = Field(
description="""
Configuration for the persistence store used by the distribution registry. If not specified,
a default SQLite store will be used.""",
)
inference_store: InferenceStoreConfig | SqlStoreConfig | None = Field(
default=None,
description="""
Configuration for the persistence store used by the inference API. Can be either a
InferenceStoreConfig (with queue tuning parameters) or a SqlStoreConfig (deprecated).
If not specified, a default SQLite store will be used.""",
)
conversations_store: SqlStoreConfig | None = Field(
default=None,
description="""
Configuration for the persistence store used by the conversations API.
If not specified, a default SQLite store will be used.""",
Storage backend configurations. Each backend is named, and can be referenced by various components
throughout the Stack (both by its core as well as providers).
""",
)
# registry of "resources" in the distribution

View file

@ -11,9 +11,8 @@ from pydantic import BaseModel
from llama_stack.apis.prompts import ListPromptsResponse, Prompt, Prompts
from llama_stack.core.datatypes import StackRunConfig
from llama_stack.core.utils.config_dirs import DISTRIBS_BASE_DIR
from llama_stack.core.storage.datatypes import KVStoreReference
from llama_stack.providers.utils.kvstore import KVStore, kvstore_impl
from llama_stack.providers.utils.kvstore.config import SqliteKVStoreConfig
class PromptServiceConfig(BaseModel):
@ -41,10 +40,13 @@ class PromptServiceImpl(Prompts):
self.kvstore: KVStore
async def initialize(self) -> None:
kvstore_config = SqliteKVStoreConfig(
db_path=(DISTRIBS_BASE_DIR / self.config.run_config.image_name / "prompts.db").as_posix()
# Use metadata store backend with prompts-specific namespace
metadata_ref = self.config.run_config.storage.metadata
prompts_ref = KVStoreReference(
namespace="prompts",
backend=metadata_ref.backend if metadata_ref else None,
)
self.kvstore = await kvstore_impl(kvstore_config)
self.kvstore = await kvstore_impl(prompts_ref)
def _get_default_key(self, prompt_id: str) -> str:
"""Get the KVStore key that stores the default version number."""

View file

@ -6,7 +6,10 @@
from typing import Any
from llama_stack.core.datatypes import AccessRule, RoutedProtocol
from llama_stack.core.datatypes import (
AccessRule,
RoutedProtocol,
)
from llama_stack.core.stack import StackRunConfig
from llama_stack.core.store import DistributionRegistry
from llama_stack.providers.datatypes import Api, RoutingTable
@ -76,9 +79,13 @@ async def get_auto_router_impl(
api_to_dep_impl[dep_name] = deps[dep_api]
# TODO: move pass configs to routers instead
if api == Api.inference and run_config.inference_store:
if api == Api.inference:
inference_ref = run_config.storage.inference
if not inference_ref:
raise ValueError("storage.inference must be configured in run config")
inference_store = InferenceStore(
config=run_config.inference_store,
reference=inference_ref,
policy=policy,
)
await inference_store.initialize()

View file

@ -42,6 +42,7 @@ from llama_stack.core.prompts.prompts import PromptServiceConfig, PromptServiceI
from llama_stack.core.providers import ProviderImpl, ProviderImplConfig
from llama_stack.core.resolver import ProviderRegistry, resolve_impls
from llama_stack.core.routing_tables.common import CommonRoutingTableImpl
from llama_stack.core.storage.datatypes import StorageBackendConfig
from llama_stack.core.store.registry import create_dist_registry
from llama_stack.core.utils.dynamic import instantiate_class_type
from llama_stack.log import get_logger
@ -329,6 +330,25 @@ def add_internal_implementations(impls: dict[Api, Any], run_config: StackRunConf
impls[Api.conversations] = conversations_impl
def _initialize_storage(run_config: StackRunConfig):
kv_backends: dict[str, StorageBackendConfig] = {}
sql_backends: dict[str, StorageBackendConfig] = {}
for backend_name, backend_config in run_config.storage.backends.items():
type = backend_config.type.value
if type.startswith("kv_"):
kv_backends[backend_name] = backend_config
elif type.startswith("sql_"):
sql_backends[backend_name] = backend_config
else:
raise ValueError(f"Unknown storage backend type: {type}")
from llama_stack.providers.utils.kvstore.kvstore import register_kvstore_backends
from llama_stack.providers.utils.sqlstore.sqlstore import register_sqlstore_backends
register_kvstore_backends(kv_backends)
register_sqlstore_backends(sql_backends)
class Stack:
def __init__(self, run_config: StackRunConfig, provider_registry: ProviderRegistry | None = None):
self.run_config = run_config
@ -347,7 +367,8 @@ class Stack:
TEST_RECORDING_CONTEXT.__enter__()
logger.info(f"API recording enabled: mode={os.environ.get('LLAMA_STACK_TEST_INFERENCE_MODE')}")
dist_registry, _ = await create_dist_registry(self.run_config.metadata_store, self.run_config.image_name)
_initialize_storage(self.run_config)
dist_registry, _ = await create_dist_registry(self.run_config.storage, self.run_config.image_name)
policy = self.run_config.server.auth.access_policy if self.run_config.server.auth else []
internal_impls = {}

View file

@ -0,0 +1,5 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

View file

@ -0,0 +1,283 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import re
from abc import abstractmethod
from enum import StrEnum
from pathlib import Path
from typing import Annotated, Literal
from pydantic import BaseModel, Field, field_validator
class StorageBackendType(StrEnum):
KV_REDIS = "kv_redis"
KV_SQLITE = "kv_sqlite"
KV_POSTGRES = "kv_postgres"
KV_MONGODB = "kv_mongodb"
SQL_SQLITE = "sql_sqlite"
SQL_POSTGRES = "sql_postgres"
class CommonConfig(BaseModel):
namespace: str | None = Field(
default=None,
description="All keys will be prefixed with this namespace",
)
default: bool = Field(
default=False,
description="Mark this KV store as the default choice when a reference omits the backend name",
)
class RedisKVStoreConfig(CommonConfig):
type: Literal[StorageBackendType.KV_REDIS] = StorageBackendType.KV_REDIS
host: str = "localhost"
port: int = 6379
@property
def url(self) -> str:
return f"redis://{self.host}:{self.port}"
@classmethod
def pip_packages(cls) -> list[str]:
return ["redis"]
@classmethod
def sample_run_config(cls):
return {
"type": StorageBackendType.KV_REDIS.value,
"host": "${env.REDIS_HOST:=localhost}",
"port": "${env.REDIS_PORT:=6379}",
}
class SqliteKVStoreConfig(CommonConfig):
type: Literal[StorageBackendType.KV_SQLITE] = StorageBackendType.KV_SQLITE
db_path: str = Field(
description="File path for the sqlite database",
)
@classmethod
def pip_packages(cls) -> list[str]:
return ["aiosqlite"]
@classmethod
def sample_run_config(cls, __distro_dir__: str, db_name: str = "kvstore.db"):
return {
"type": StorageBackendType.KV_SQLITE.value,
"db_path": "${env.SQLITE_STORE_DIR:=" + __distro_dir__ + "}/" + db_name,
}
class PostgresKVStoreConfig(CommonConfig):
type: Literal[StorageBackendType.KV_POSTGRES] = StorageBackendType.KV_POSTGRES
host: str = "localhost"
port: int = 5432
db: str = "llamastack"
user: str
password: str | None = None
ssl_mode: str | None = None
ca_cert_path: str | None = None
table_name: str = "llamastack_kvstore"
@classmethod
def sample_run_config(cls, table_name: str = "llamastack_kvstore", **kwargs):
return {
"type": StorageBackendType.KV_POSTGRES.value,
"host": "${env.POSTGRES_HOST:=localhost}",
"port": "${env.POSTGRES_PORT:=5432}",
"db": "${env.POSTGRES_DB:=llamastack}",
"user": "${env.POSTGRES_USER:=llamastack}",
"password": "${env.POSTGRES_PASSWORD:=llamastack}",
"table_name": "${env.POSTGRES_TABLE_NAME:=" + table_name + "}",
}
@classmethod
@field_validator("table_name")
def validate_table_name(cls, v: str) -> str:
# PostgreSQL identifiers rules:
# - Must start with a letter or underscore
# - Can contain letters, numbers, and underscores
# - Maximum length is 63 bytes
pattern = r"^[a-zA-Z_][a-zA-Z0-9_]*$"
if not re.match(pattern, v):
raise ValueError(
"Invalid table name. Must start with letter or underscore and contain only letters, numbers, and underscores"
)
if len(v) > 63:
raise ValueError("Table name must be less than 63 characters")
return v
@classmethod
def pip_packages(cls) -> list[str]:
return ["psycopg2-binary"]
class MongoDBKVStoreConfig(CommonConfig):
type: Literal[StorageBackendType.KV_MONGODB] = StorageBackendType.KV_MONGODB
host: str = "localhost"
port: int = 27017
db: str = "llamastack"
user: str | None = None
password: str | None = None
collection_name: str = "llamastack_kvstore"
@classmethod
def pip_packages(cls) -> list[str]:
return ["pymongo"]
@classmethod
def sample_run_config(cls, collection_name: str = "llamastack_kvstore"):
return {
"type": StorageBackendType.KV_MONGODB.value,
"host": "${env.MONGODB_HOST:=localhost}",
"port": "${env.MONGODB_PORT:=5432}",
"db": "${env.MONGODB_DB}",
"user": "${env.MONGODB_USER}",
"password": "${env.MONGODB_PASSWORD}",
"collection_name": "${env.MONGODB_COLLECTION_NAME:=" + collection_name + "}",
}
class CommonSqlStoreConfig(BaseModel):
default: bool = Field(
default=False,
description="Mark this SQL store as the default choice when a reference omits the backend name",
)
class SqlAlchemySqlStoreConfig(BaseModel):
@property
@abstractmethod
def engine_str(self) -> str: ...
# TODO: move this when we have a better way to specify dependencies with internal APIs
@classmethod
def pip_packages(cls) -> list[str]:
return ["sqlalchemy[asyncio]"]
class SqliteSqlStoreConfig(SqlAlchemySqlStoreConfig, CommonSqlStoreConfig):
type: Literal[StorageBackendType.SQL_SQLITE] = StorageBackendType.SQL_SQLITE
db_path: str = Field(
description="Database path, e.g. ~/.llama/distributions/ollama/sqlstore.db",
)
@property
def engine_str(self) -> str:
return "sqlite+aiosqlite:///" + Path(self.db_path).expanduser().as_posix()
@classmethod
def sample_run_config(cls, __distro_dir__: str, db_name: str = "sqlstore.db"):
return {
"type": StorageBackendType.SQL_SQLITE.value,
"db_path": "${env.SQLITE_STORE_DIR:=" + __distro_dir__ + "}/" + db_name,
}
@classmethod
def pip_packages(cls) -> list[str]:
return super().pip_packages() + ["aiosqlite"]
class PostgresSqlStoreConfig(SqlAlchemySqlStoreConfig):
type: Literal[StorageBackendType.SQL_POSTGRES] = StorageBackendType.SQL_POSTGRES
host: str = "localhost"
port: int = 5432
db: str = "llamastack"
user: str
password: str | None = None
@property
def engine_str(self) -> str:
return f"postgresql+asyncpg://{self.user}:{self.password}@{self.host}:{self.port}/{self.db}"
@classmethod
def pip_packages(cls) -> list[str]:
return super().pip_packages() + ["asyncpg"]
@classmethod
def sample_run_config(cls, **kwargs):
return {
"type": StorageBackendType.SQL_POSTGRES.value,
"host": "${env.POSTGRES_HOST:=localhost}",
"port": "${env.POSTGRES_PORT:=5432}",
"db": "${env.POSTGRES_DB:=llamastack}",
"user": "${env.POSTGRES_USER:=llamastack}",
"password": "${env.POSTGRES_PASSWORD:=llamastack}",
}
# reference = (backend_name, table_name)
class SqlStoreReference(BaseModel):
"""A reference to a 'SQL-like' persistent store. A table name must be provided."""
table_name: str = Field(
description="Name of the table to use for the SqlStore",
)
backend: str | None = Field(
description="Name of backend from persistence.backends, a default will be used if not specified",
default=None,
)
# reference = (backend_name, namespace)
class KVStoreReference(BaseModel):
"""A reference to a 'key-value' persistent store. A namespace must be provided."""
namespace: str = Field(
description="Key prefix for KVStore backends",
)
backend: str | None = Field(
description="Name of backend from persistence.backends, a default will be used if not specified",
default=None,
)
StorageBackendConfig = Annotated[
RedisKVStoreConfig
| SqliteKVStoreConfig
| PostgresKVStoreConfig
| MongoDBKVStoreConfig
| SqliteSqlStoreConfig
| PostgresSqlStoreConfig,
Field(discriminator="type"),
]
class InferenceStoreReference(SqlStoreReference):
"""Inference store configuration with queue tuning."""
max_write_queue_size: int = Field(
default=10000,
description="Max queued writes for inference store",
)
num_writers: int = Field(
default=4,
description="Number of concurrent background writers",
)
class StorageConfig(BaseModel):
backends: dict[str, StorageBackendConfig] = Field(
description="Named backend configurations (e.g., 'default', 'cache')",
)
# these are stores used natively by the Stack
metadata: KVStoreReference | None = Field(
default=None,
description="Metadata store configuration (uses KVStore backend)",
)
inference: InferenceStoreReference | None = Field(
default=None,
description="Inference store configuration (uses SqlStore backend)",
)
conversations: SqlStoreReference | None = Field(
default=None,
description="Conversations store configuration (uses SqlStore backend)",
)

View file

@ -11,10 +11,9 @@ from typing import Protocol
import pydantic
from llama_stack.core.datatypes import RoutableObjectWithProvider
from llama_stack.core.utils.config_dirs import DISTRIBS_BASE_DIR
from llama_stack.core.storage.datatypes import KVStoreReference, StorageConfig
from llama_stack.log import get_logger
from llama_stack.providers.utils.kvstore import KVStore, kvstore_impl
from llama_stack.providers.utils.kvstore.config import KVStoreConfig, SqliteKVStoreConfig
logger = get_logger(__name__, category="core::registry")
@ -191,16 +190,17 @@ class CachedDiskDistributionRegistry(DiskDistributionRegistry):
async def create_dist_registry(
metadata_store: KVStoreConfig | None,
storage: StorageConfig,
image_name: str,
) -> tuple[CachedDiskDistributionRegistry, KVStore]:
# instantiate kvstore for storing and retrieving distribution metadata
if metadata_store:
dist_kvstore = await kvstore_impl(metadata_store)
else:
dist_kvstore = await kvstore_impl(
SqliteKVStoreConfig(db_path=(DISTRIBS_BASE_DIR / image_name / "kvstore.db").as_posix())
)
# Use metadata store backend with registry-specific namespace
metadata_ref = storage.metadata
registry_ref = KVStoreReference(
namespace="registry",
backend=metadata_ref.backend if metadata_ref else None,
)
dist_kvstore = await kvstore_impl(registry_ref)
dist_registry = CachedDiskDistributionRegistry(dist_kvstore)
await dist_registry.initialize()
return dist_registry, dist_kvstore