mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-10-22 16:23:08 +00:00
feat(stores)!: use backend storage references instead of configs (#3697)
**This PR changes configurations in a backward incompatible way.** Run configs today repeat full SQLite/Postgres snippets everywhere a store is needed, which means duplicated credentials, extra connection pools, and lots of drift between files. This PR introduces named storage backends so the stack and providers can share a single catalog and reference those backends by name. ## Key Changes - Add `storage.backends` to `StackRunConfig`, register each KV/SQL backend once at startup, and validate that references point to the right family. - Move server stores under `storage.stores` with lightweight references (backend + namespace/table) instead of full configs. - Update every provider/config/doc to use the new reference style; docs/codegen now surface the simplified YAML. ## Migration Before: ```yaml metadata_store: type: sqlite db_path: ~/.llama/distributions/foo/registry.db inference_store: type: postgres host: ${env.POSTGRES_HOST} port: ${env.POSTGRES_PORT} db: ${env.POSTGRES_DB} user: ${env.POSTGRES_USER} password: ${env.POSTGRES_PASSWORD} conversations_store: type: postgres host: ${env.POSTGRES_HOST} port: ${env.POSTGRES_PORT} db: ${env.POSTGRES_DB} user: ${env.POSTGRES_USER} password: ${env.POSTGRES_PASSWORD} ``` After: ```yaml storage: backends: kv_default: type: kv_sqlite db_path: ~/.llama/distributions/foo/kvstore.db sql_default: type: sql_postgres host: ${env.POSTGRES_HOST} port: ${env.POSTGRES_PORT} db: ${env.POSTGRES_DB} user: ${env.POSTGRES_USER} password: ${env.POSTGRES_PASSWORD} stores: metadata: backend: kv_default namespace: registry inference: backend: sql_default table_name: inference_store max_write_queue_size: 10000 num_writers: 4 conversations: backend: sql_default table_name: openai_conversations ``` Provider configs follow the same pattern—for example, a Chroma vector adapter switches from: ```yaml providers: vector_io: - provider_id: chromadb provider_type: remote::chromadb config: url: ${env.CHROMADB_URL} kvstore: type: sqlite db_path: ~/.llama/distributions/foo/chroma.db ``` to: ```yaml providers: vector_io: - provider_id: chromadb provider_type: remote::chromadb config: url: ${env.CHROMADB_URL} persistence: backend: kv_default namespace: vector_io::chroma_remote ``` Once the backends are declared, everything else just points at them, so rotating credentials or swapping to Postgres happens in one place and the stack reuses a single connection pool.
This commit is contained in:
parent
add64e8e2a
commit
2c43285e22
105 changed files with 2290 additions and 1292 deletions
|
@ -15,12 +15,13 @@ from llama_stack.apis.inference import (
|
|||
OpenAIMessageParam,
|
||||
Order,
|
||||
)
|
||||
from llama_stack.core.datatypes import AccessRule, InferenceStoreConfig
|
||||
from llama_stack.core.datatypes import AccessRule
|
||||
from llama_stack.core.storage.datatypes import InferenceStoreReference, StorageBackendType
|
||||
from llama_stack.log import get_logger
|
||||
|
||||
from ..sqlstore.api import ColumnDefinition, ColumnType
|
||||
from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore
|
||||
from ..sqlstore.sqlstore import SqlStoreConfig, SqlStoreType, sqlstore_impl
|
||||
from ..sqlstore.sqlstore import _SQLSTORE_BACKENDS, sqlstore_impl
|
||||
|
||||
logger = get_logger(name=__name__, category="inference")
|
||||
|
||||
|
@ -28,33 +29,32 @@ logger = get_logger(name=__name__, category="inference")
|
|||
class InferenceStore:
|
||||
def __init__(
|
||||
self,
|
||||
config: InferenceStoreConfig | SqlStoreConfig,
|
||||
reference: InferenceStoreReference,
|
||||
policy: list[AccessRule],
|
||||
):
|
||||
# Handle backward compatibility
|
||||
if not isinstance(config, InferenceStoreConfig):
|
||||
# Legacy: SqlStoreConfig passed directly as config
|
||||
config = InferenceStoreConfig(
|
||||
sql_store_config=config,
|
||||
)
|
||||
|
||||
self.config = config
|
||||
self.sql_store_config = config.sql_store_config
|
||||
self.reference = reference
|
||||
self.sql_store = None
|
||||
self.policy = policy
|
||||
|
||||
# Disable write queue for SQLite to avoid concurrency issues
|
||||
self.enable_write_queue = self.sql_store_config.type != SqlStoreType.sqlite
|
||||
|
||||
# Async write queue and worker control
|
||||
self._queue: asyncio.Queue[tuple[OpenAIChatCompletion, list[OpenAIMessageParam]]] | None = None
|
||||
self._worker_tasks: list[asyncio.Task[Any]] = []
|
||||
self._max_write_queue_size: int = config.max_write_queue_size
|
||||
self._num_writers: int = max(1, config.num_writers)
|
||||
self._max_write_queue_size: int = reference.max_write_queue_size
|
||||
self._num_writers: int = max(1, reference.num_writers)
|
||||
|
||||
async def initialize(self):
|
||||
"""Create the necessary tables if they don't exist."""
|
||||
self.sql_store = AuthorizedSqlStore(sqlstore_impl(self.sql_store_config), self.policy)
|
||||
base_store = sqlstore_impl(self.reference)
|
||||
self.sql_store = AuthorizedSqlStore(base_store, self.policy)
|
||||
|
||||
# Disable write queue for SQLite to avoid concurrency issues
|
||||
backend_name = self.reference.backend
|
||||
backend_config = _SQLSTORE_BACKENDS.get(backend_name)
|
||||
if backend_config is None:
|
||||
raise ValueError(
|
||||
f"Unregistered SQL backend '{backend_name}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}"
|
||||
)
|
||||
self.enable_write_queue = backend_config.type != StorageBackendType.SQL_SQLITE
|
||||
await self.sql_store.create_table(
|
||||
"chat_completions",
|
||||
{
|
||||
|
|
|
@ -4,143 +4,20 @@
|
|||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
import re
|
||||
from enum import Enum
|
||||
from typing import Annotated, Literal
|
||||
from typing import Annotated
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
|
||||
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
|
||||
|
||||
|
||||
class KVStoreType(Enum):
|
||||
redis = "redis"
|
||||
sqlite = "sqlite"
|
||||
postgres = "postgres"
|
||||
mongodb = "mongodb"
|
||||
|
||||
|
||||
class CommonConfig(BaseModel):
|
||||
namespace: str | None = Field(
|
||||
default=None,
|
||||
description="All keys will be prefixed with this namespace",
|
||||
)
|
||||
|
||||
|
||||
class RedisKVStoreConfig(CommonConfig):
|
||||
type: Literal["redis"] = KVStoreType.redis.value
|
||||
host: str = "localhost"
|
||||
port: int = 6379
|
||||
|
||||
@property
|
||||
def url(self) -> str:
|
||||
return f"redis://{self.host}:{self.port}"
|
||||
|
||||
@classmethod
|
||||
def pip_packages(cls) -> list[str]:
|
||||
return ["redis"]
|
||||
|
||||
@classmethod
|
||||
def sample_run_config(cls):
|
||||
return {
|
||||
"type": "redis",
|
||||
"host": "${env.REDIS_HOST:=localhost}",
|
||||
"port": "${env.REDIS_PORT:=6379}",
|
||||
}
|
||||
|
||||
|
||||
class SqliteKVStoreConfig(CommonConfig):
|
||||
type: Literal["sqlite"] = KVStoreType.sqlite.value
|
||||
db_path: str = Field(
|
||||
default=(RUNTIME_BASE_DIR / "kvstore.db").as_posix(),
|
||||
description="File path for the sqlite database",
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def pip_packages(cls) -> list[str]:
|
||||
return ["aiosqlite"]
|
||||
|
||||
@classmethod
|
||||
def sample_run_config(cls, __distro_dir__: str, db_name: str = "kvstore.db"):
|
||||
return {
|
||||
"type": "sqlite",
|
||||
"db_path": "${env.SQLITE_STORE_DIR:=" + __distro_dir__ + "}/" + db_name,
|
||||
}
|
||||
|
||||
|
||||
class PostgresKVStoreConfig(CommonConfig):
|
||||
type: Literal["postgres"] = KVStoreType.postgres.value
|
||||
host: str = "localhost"
|
||||
port: int = 5432
|
||||
db: str = "llamastack"
|
||||
user: str
|
||||
password: str | None = None
|
||||
ssl_mode: str | None = None
|
||||
ca_cert_path: str | None = None
|
||||
table_name: str = "llamastack_kvstore"
|
||||
|
||||
@classmethod
|
||||
def sample_run_config(cls, table_name: str = "llamastack_kvstore", **kwargs):
|
||||
return {
|
||||
"type": "postgres",
|
||||
"host": "${env.POSTGRES_HOST:=localhost}",
|
||||
"port": "${env.POSTGRES_PORT:=5432}",
|
||||
"db": "${env.POSTGRES_DB:=llamastack}",
|
||||
"user": "${env.POSTGRES_USER:=llamastack}",
|
||||
"password": "${env.POSTGRES_PASSWORD:=llamastack}",
|
||||
"table_name": "${env.POSTGRES_TABLE_NAME:=" + table_name + "}",
|
||||
}
|
||||
|
||||
@classmethod
|
||||
@field_validator("table_name")
|
||||
def validate_table_name(cls, v: str) -> str:
|
||||
# PostgreSQL identifiers rules:
|
||||
# - Must start with a letter or underscore
|
||||
# - Can contain letters, numbers, and underscores
|
||||
# - Maximum length is 63 bytes
|
||||
pattern = r"^[a-zA-Z_][a-zA-Z0-9_]*$"
|
||||
if not re.match(pattern, v):
|
||||
raise ValueError(
|
||||
"Invalid table name. Must start with letter or underscore and contain only letters, numbers, and underscores"
|
||||
)
|
||||
if len(v) > 63:
|
||||
raise ValueError("Table name must be less than 63 characters")
|
||||
return v
|
||||
|
||||
@classmethod
|
||||
def pip_packages(cls) -> list[str]:
|
||||
return ["psycopg2-binary"]
|
||||
|
||||
|
||||
class MongoDBKVStoreConfig(CommonConfig):
|
||||
type: Literal["mongodb"] = KVStoreType.mongodb.value
|
||||
host: str = "localhost"
|
||||
port: int = 27017
|
||||
db: str = "llamastack"
|
||||
user: str | None = None
|
||||
password: str | None = None
|
||||
collection_name: str = "llamastack_kvstore"
|
||||
|
||||
@classmethod
|
||||
def pip_packages(cls) -> list[str]:
|
||||
return ["pymongo"]
|
||||
|
||||
@classmethod
|
||||
def sample_run_config(cls, collection_name: str = "llamastack_kvstore"):
|
||||
return {
|
||||
"type": "mongodb",
|
||||
"host": "${env.MONGODB_HOST:=localhost}",
|
||||
"port": "${env.MONGODB_PORT:=5432}",
|
||||
"db": "${env.MONGODB_DB}",
|
||||
"user": "${env.MONGODB_USER}",
|
||||
"password": "${env.MONGODB_PASSWORD}",
|
||||
"collection_name": "${env.MONGODB_COLLECTION_NAME:=" + collection_name + "}",
|
||||
}
|
||||
from pydantic import Field
|
||||
|
||||
from llama_stack.core.storage.datatypes import (
|
||||
MongoDBKVStoreConfig,
|
||||
PostgresKVStoreConfig,
|
||||
RedisKVStoreConfig,
|
||||
SqliteKVStoreConfig,
|
||||
StorageBackendType,
|
||||
)
|
||||
|
||||
KVStoreConfig = Annotated[
|
||||
RedisKVStoreConfig | SqliteKVStoreConfig | PostgresKVStoreConfig | MongoDBKVStoreConfig,
|
||||
Field(discriminator="type", default=KVStoreType.sqlite.value),
|
||||
RedisKVStoreConfig | SqliteKVStoreConfig | PostgresKVStoreConfig | MongoDBKVStoreConfig, Field(discriminator="type")
|
||||
]
|
||||
|
||||
|
||||
|
@ -148,13 +25,13 @@ def get_pip_packages(store_config: dict | KVStoreConfig) -> list[str]:
|
|||
"""Get pip packages for KV store config, handling both dict and object cases."""
|
||||
if isinstance(store_config, dict):
|
||||
store_type = store_config.get("type")
|
||||
if store_type == "sqlite":
|
||||
if store_type == StorageBackendType.KV_SQLITE.value:
|
||||
return SqliteKVStoreConfig.pip_packages()
|
||||
elif store_type == "postgres":
|
||||
elif store_type == StorageBackendType.KV_POSTGRES.value:
|
||||
return PostgresKVStoreConfig.pip_packages()
|
||||
elif store_type == "redis":
|
||||
elif store_type == StorageBackendType.KV_REDIS.value:
|
||||
return RedisKVStoreConfig.pip_packages()
|
||||
elif store_type == "mongodb":
|
||||
elif store_type == StorageBackendType.KV_MONGODB.value:
|
||||
return MongoDBKVStoreConfig.pip_packages()
|
||||
else:
|
||||
raise ValueError(f"Unknown KV store type: {store_type}")
|
||||
|
|
|
@ -4,9 +4,17 @@
|
|||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
# Copyright (c) Meta Platforms, Inc. and affiliates.
|
||||
# All rights reserved.
|
||||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from llama_stack.core.storage.datatypes import KVStoreReference, StorageBackendConfig, StorageBackendType
|
||||
|
||||
from .api import KVStore
|
||||
from .config import KVStoreConfig, KVStoreType
|
||||
from .config import KVStoreConfig
|
||||
|
||||
|
||||
def kvstore_dependencies():
|
||||
|
@ -44,20 +52,41 @@ class InmemoryKVStoreImpl(KVStore):
|
|||
del self._store[key]
|
||||
|
||||
|
||||
async def kvstore_impl(config: KVStoreConfig) -> KVStore:
|
||||
if config.type == KVStoreType.redis.value:
|
||||
_KVSTORE_BACKENDS: dict[str, KVStoreConfig] = {}
|
||||
|
||||
|
||||
def register_kvstore_backends(backends: dict[str, StorageBackendConfig]) -> None:
|
||||
"""Register the set of available KV store backends for reference resolution."""
|
||||
global _KVSTORE_BACKENDS
|
||||
|
||||
_KVSTORE_BACKENDS.clear()
|
||||
for name, cfg in backends.items():
|
||||
_KVSTORE_BACKENDS[name] = cfg
|
||||
|
||||
|
||||
async def kvstore_impl(reference: KVStoreReference) -> KVStore:
|
||||
backend_name = reference.backend
|
||||
|
||||
backend_config = _KVSTORE_BACKENDS.get(backend_name)
|
||||
if backend_config is None:
|
||||
raise ValueError(f"Unknown KVStore backend '{backend_name}'. Registered backends: {sorted(_KVSTORE_BACKENDS)}")
|
||||
|
||||
config = backend_config.model_copy()
|
||||
config.namespace = reference.namespace
|
||||
|
||||
if config.type == StorageBackendType.KV_REDIS.value:
|
||||
from .redis import RedisKVStoreImpl
|
||||
|
||||
impl = RedisKVStoreImpl(config)
|
||||
elif config.type == KVStoreType.sqlite.value:
|
||||
elif config.type == StorageBackendType.KV_SQLITE.value:
|
||||
from .sqlite import SqliteKVStoreImpl
|
||||
|
||||
impl = SqliteKVStoreImpl(config)
|
||||
elif config.type == KVStoreType.postgres.value:
|
||||
elif config.type == StorageBackendType.KV_POSTGRES.value:
|
||||
from .postgres import PostgresKVStoreImpl
|
||||
|
||||
impl = PostgresKVStoreImpl(config)
|
||||
elif config.type == KVStoreType.mongodb.value:
|
||||
elif config.type == StorageBackendType.KV_MONGODB.value:
|
||||
from .mongodb import MongoDBKVStoreImpl
|
||||
|
||||
impl = MongoDBKVStoreImpl(config)
|
||||
|
|
|
@ -18,13 +18,13 @@ from llama_stack.apis.agents.openai_responses import (
|
|||
OpenAIResponseObjectWithInput,
|
||||
)
|
||||
from llama_stack.apis.inference import OpenAIMessageParam
|
||||
from llama_stack.core.datatypes import AccessRule, ResponsesStoreConfig
|
||||
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
|
||||
from llama_stack.core.datatypes import AccessRule
|
||||
from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference, StorageBackendType
|
||||
from llama_stack.log import get_logger
|
||||
|
||||
from ..sqlstore.api import ColumnDefinition, ColumnType
|
||||
from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore
|
||||
from ..sqlstore.sqlstore import SqliteSqlStoreConfig, SqlStoreConfig, SqlStoreType, sqlstore_impl
|
||||
from ..sqlstore.sqlstore import _SQLSTORE_BACKENDS, sqlstore_impl
|
||||
|
||||
logger = get_logger(name=__name__, category="openai_responses")
|
||||
|
||||
|
@ -45,39 +45,38 @@ class _OpenAIResponseObjectWithInputAndMessages(OpenAIResponseObjectWithInput):
|
|||
class ResponsesStore:
|
||||
def __init__(
|
||||
self,
|
||||
config: ResponsesStoreConfig | SqlStoreConfig,
|
||||
reference: ResponsesStoreReference | SqlStoreReference,
|
||||
policy: list[AccessRule],
|
||||
):
|
||||
# Handle backward compatibility
|
||||
if not isinstance(config, ResponsesStoreConfig):
|
||||
# Legacy: SqlStoreConfig passed directly as config
|
||||
config = ResponsesStoreConfig(
|
||||
sql_store_config=config,
|
||||
)
|
||||
if isinstance(reference, ResponsesStoreReference):
|
||||
self.reference = reference
|
||||
else:
|
||||
self.reference = ResponsesStoreReference(**reference.model_dump())
|
||||
|
||||
self.config = config
|
||||
self.sql_store_config = config.sql_store_config
|
||||
if not self.sql_store_config:
|
||||
self.sql_store_config = SqliteSqlStoreConfig(
|
||||
db_path=(RUNTIME_BASE_DIR / "sqlstore.db").as_posix(),
|
||||
)
|
||||
self.sql_store = None
|
||||
self.policy = policy
|
||||
|
||||
# Disable write queue for SQLite to avoid concurrency issues
|
||||
self.enable_write_queue = self.sql_store_config.type != SqlStoreType.sqlite
|
||||
self.sql_store = None
|
||||
self.enable_write_queue = True
|
||||
|
||||
# Async write queue and worker control
|
||||
self._queue: (
|
||||
asyncio.Queue[tuple[OpenAIResponseObject, list[OpenAIResponseInput], list[OpenAIMessageParam]]] | None
|
||||
) = None
|
||||
self._worker_tasks: list[asyncio.Task[Any]] = []
|
||||
self._max_write_queue_size: int = config.max_write_queue_size
|
||||
self._num_writers: int = max(1, config.num_writers)
|
||||
self._max_write_queue_size: int = self.reference.max_write_queue_size
|
||||
self._num_writers: int = max(1, self.reference.num_writers)
|
||||
|
||||
async def initialize(self):
|
||||
"""Create the necessary tables if they don't exist."""
|
||||
self.sql_store = AuthorizedSqlStore(sqlstore_impl(self.sql_store_config), self.policy)
|
||||
base_store = sqlstore_impl(self.reference)
|
||||
self.sql_store = AuthorizedSqlStore(base_store, self.policy)
|
||||
|
||||
backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend)
|
||||
if backend_config is None:
|
||||
raise ValueError(
|
||||
f"Unregistered SQL backend '{self.reference.backend}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}"
|
||||
)
|
||||
if backend_config.type == StorageBackendType.SQL_SQLITE:
|
||||
self.enable_write_queue = False
|
||||
await self.sql_store.create_table(
|
||||
"openai_responses",
|
||||
{
|
||||
|
|
|
@ -12,10 +12,10 @@ from llama_stack.core.access_control.conditions import ProtectedResource
|
|||
from llama_stack.core.access_control.datatypes import AccessRule, Action, Scope
|
||||
from llama_stack.core.datatypes import User
|
||||
from llama_stack.core.request_headers import get_authenticated_user
|
||||
from llama_stack.core.storage.datatypes import StorageBackendType
|
||||
from llama_stack.log import get_logger
|
||||
|
||||
from .api import ColumnDefinition, ColumnType, PaginatedResponse, SqlStore
|
||||
from .sqlstore import SqlStoreType
|
||||
|
||||
logger = get_logger(name=__name__, category="providers::utils")
|
||||
|
||||
|
@ -82,8 +82,8 @@ class AuthorizedSqlStore:
|
|||
if not hasattr(self.sql_store, "config"):
|
||||
raise ValueError("SqlStore must have a config attribute to be used with AuthorizedSqlStore")
|
||||
|
||||
self.database_type = self.sql_store.config.type
|
||||
if self.database_type not in [SqlStoreType.postgres, SqlStoreType.sqlite]:
|
||||
self.database_type = self.sql_store.config.type.value
|
||||
if self.database_type not in [StorageBackendType.SQL_POSTGRES.value, StorageBackendType.SQL_SQLITE.value]:
|
||||
raise ValueError(f"Unsupported database type: {self.database_type}")
|
||||
|
||||
def _validate_sql_optimized_policy(self) -> None:
|
||||
|
@ -220,9 +220,9 @@ class AuthorizedSqlStore:
|
|||
Returns:
|
||||
SQL expression to extract JSON value
|
||||
"""
|
||||
if self.database_type == SqlStoreType.postgres:
|
||||
if self.database_type == StorageBackendType.SQL_POSTGRES.value:
|
||||
return f"{column}->'{path}'"
|
||||
elif self.database_type == SqlStoreType.sqlite:
|
||||
elif self.database_type == StorageBackendType.SQL_SQLITE.value:
|
||||
return f"JSON_EXTRACT({column}, '$.{path}')"
|
||||
else:
|
||||
raise ValueError(f"Unsupported database type: {self.database_type}")
|
||||
|
@ -237,9 +237,9 @@ class AuthorizedSqlStore:
|
|||
Returns:
|
||||
SQL expression to extract JSON value as text
|
||||
"""
|
||||
if self.database_type == SqlStoreType.postgres:
|
||||
if self.database_type == StorageBackendType.SQL_POSTGRES.value:
|
||||
return f"{column}->>'{path}'"
|
||||
elif self.database_type == SqlStoreType.sqlite:
|
||||
elif self.database_type == StorageBackendType.SQL_SQLITE.value:
|
||||
return f"JSON_EXTRACT({column}, '$.{path}')"
|
||||
else:
|
||||
raise ValueError(f"Unsupported database type: {self.database_type}")
|
||||
|
@ -248,10 +248,10 @@ class AuthorizedSqlStore:
|
|||
"""Get the SQL conditions for public access."""
|
||||
# Public records are records that have no owner_principal or access_attributes
|
||||
conditions = ["owner_principal = ''"]
|
||||
if self.database_type == SqlStoreType.postgres:
|
||||
if self.database_type == StorageBackendType.SQL_POSTGRES.value:
|
||||
# Postgres stores JSON null as 'null'
|
||||
conditions.append("access_attributes::text = 'null'")
|
||||
elif self.database_type == SqlStoreType.sqlite:
|
||||
elif self.database_type == StorageBackendType.SQL_SQLITE.value:
|
||||
conditions.append("access_attributes = 'null'")
|
||||
else:
|
||||
raise ValueError(f"Unsupported database type: {self.database_type}")
|
||||
|
|
|
@ -26,10 +26,10 @@ from sqlalchemy.ext.asyncio.engine import AsyncEngine
|
|||
from sqlalchemy.sql.elements import ColumnElement
|
||||
|
||||
from llama_stack.apis.common.responses import PaginatedResponse
|
||||
from llama_stack.core.storage.datatypes import SqlAlchemySqlStoreConfig
|
||||
from llama_stack.log import get_logger
|
||||
|
||||
from .api import ColumnDefinition, ColumnType, SqlStore
|
||||
from .sqlstore import SqlAlchemySqlStoreConfig
|
||||
|
||||
logger = get_logger(name=__name__, category="providers::utils")
|
||||
|
||||
|
|
|
@ -4,90 +4,28 @@
|
|||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
from abc import abstractmethod
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
from typing import Annotated, Literal
|
||||
from typing import Annotated, cast
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic import Field
|
||||
|
||||
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
|
||||
from llama_stack.core.storage.datatypes import (
|
||||
PostgresSqlStoreConfig,
|
||||
SqliteSqlStoreConfig,
|
||||
SqlStoreReference,
|
||||
StorageBackendConfig,
|
||||
StorageBackendType,
|
||||
)
|
||||
|
||||
from .api import SqlStore
|
||||
|
||||
sql_store_pip_packages = ["sqlalchemy[asyncio]", "aiosqlite", "asyncpg"]
|
||||
|
||||
|
||||
class SqlStoreType(StrEnum):
|
||||
sqlite = "sqlite"
|
||||
postgres = "postgres"
|
||||
|
||||
|
||||
class SqlAlchemySqlStoreConfig(BaseModel):
|
||||
@property
|
||||
@abstractmethod
|
||||
def engine_str(self) -> str: ...
|
||||
|
||||
# TODO: move this when we have a better way to specify dependencies with internal APIs
|
||||
@classmethod
|
||||
def pip_packages(cls) -> list[str]:
|
||||
return ["sqlalchemy[asyncio]"]
|
||||
|
||||
|
||||
class SqliteSqlStoreConfig(SqlAlchemySqlStoreConfig):
|
||||
type: Literal[SqlStoreType.sqlite] = SqlStoreType.sqlite
|
||||
db_path: str = Field(
|
||||
default=(RUNTIME_BASE_DIR / "sqlstore.db").as_posix(),
|
||||
description="Database path, e.g. ~/.llama/distributions/ollama/sqlstore.db",
|
||||
)
|
||||
|
||||
@property
|
||||
def engine_str(self) -> str:
|
||||
return "sqlite+aiosqlite:///" + Path(self.db_path).expanduser().as_posix()
|
||||
|
||||
@classmethod
|
||||
def sample_run_config(cls, __distro_dir__: str, db_name: str = "sqlstore.db"):
|
||||
return {
|
||||
"type": "sqlite",
|
||||
"db_path": "${env.SQLITE_STORE_DIR:=" + __distro_dir__ + "}/" + db_name,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def pip_packages(cls) -> list[str]:
|
||||
return super().pip_packages() + ["aiosqlite"]
|
||||
|
||||
|
||||
class PostgresSqlStoreConfig(SqlAlchemySqlStoreConfig):
|
||||
type: Literal[SqlStoreType.postgres] = SqlStoreType.postgres
|
||||
host: str = "localhost"
|
||||
port: int = 5432
|
||||
db: str = "llamastack"
|
||||
user: str
|
||||
password: str | None = None
|
||||
|
||||
@property
|
||||
def engine_str(self) -> str:
|
||||
return f"postgresql+asyncpg://{self.user}:{self.password}@{self.host}:{self.port}/{self.db}"
|
||||
|
||||
@classmethod
|
||||
def pip_packages(cls) -> list[str]:
|
||||
return super().pip_packages() + ["asyncpg"]
|
||||
|
||||
@classmethod
|
||||
def sample_run_config(cls, **kwargs):
|
||||
return {
|
||||
"type": "postgres",
|
||||
"host": "${env.POSTGRES_HOST:=localhost}",
|
||||
"port": "${env.POSTGRES_PORT:=5432}",
|
||||
"db": "${env.POSTGRES_DB:=llamastack}",
|
||||
"user": "${env.POSTGRES_USER:=llamastack}",
|
||||
"password": "${env.POSTGRES_PASSWORD:=llamastack}",
|
||||
}
|
||||
_SQLSTORE_BACKENDS: dict[str, StorageBackendConfig] = {}
|
||||
|
||||
|
||||
SqlStoreConfig = Annotated[
|
||||
SqliteSqlStoreConfig | PostgresSqlStoreConfig,
|
||||
Field(discriminator="type", default=SqlStoreType.sqlite.value),
|
||||
Field(discriminator="type"),
|
||||
]
|
||||
|
||||
|
||||
|
@ -95,9 +33,9 @@ def get_pip_packages(store_config: dict | SqlStoreConfig) -> list[str]:
|
|||
"""Get pip packages for SQL store config, handling both dict and object cases."""
|
||||
if isinstance(store_config, dict):
|
||||
store_type = store_config.get("type")
|
||||
if store_type == "sqlite":
|
||||
if store_type == StorageBackendType.SQL_SQLITE.value:
|
||||
return SqliteSqlStoreConfig.pip_packages()
|
||||
elif store_type == "postgres":
|
||||
elif store_type == StorageBackendType.SQL_POSTGRES.value:
|
||||
return PostgresSqlStoreConfig.pip_packages()
|
||||
else:
|
||||
raise ValueError(f"Unknown SQL store type: {store_type}")
|
||||
|
@ -105,12 +43,28 @@ def get_pip_packages(store_config: dict | SqlStoreConfig) -> list[str]:
|
|||
return store_config.pip_packages()
|
||||
|
||||
|
||||
def sqlstore_impl(config: SqlStoreConfig) -> SqlStore:
|
||||
if config.type in [SqlStoreType.sqlite, SqlStoreType.postgres]:
|
||||
def sqlstore_impl(reference: SqlStoreReference) -> SqlStore:
|
||||
backend_name = reference.backend
|
||||
|
||||
backend_config = _SQLSTORE_BACKENDS.get(backend_name)
|
||||
if backend_config is None:
|
||||
raise ValueError(
|
||||
f"Unknown SQL store backend '{backend_name}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}"
|
||||
)
|
||||
|
||||
if isinstance(backend_config, SqliteSqlStoreConfig | PostgresSqlStoreConfig):
|
||||
from .sqlalchemy_sqlstore import SqlAlchemySqlStoreImpl
|
||||
|
||||
impl = SqlAlchemySqlStoreImpl(config)
|
||||
config = cast(SqliteSqlStoreConfig | PostgresSqlStoreConfig, backend_config).model_copy()
|
||||
return SqlAlchemySqlStoreImpl(config)
|
||||
else:
|
||||
raise ValueError(f"Unknown sqlstore type {config.type}")
|
||||
raise ValueError(f"Unknown sqlstore type {backend_config.type}")
|
||||
|
||||
return impl
|
||||
|
||||
def register_sqlstore_backends(backends: dict[str, StorageBackendConfig]) -> None:
|
||||
"""Register the set of available SQL store backends for reference resolution."""
|
||||
global _SQLSTORE_BACKENDS
|
||||
|
||||
_SQLSTORE_BACKENDS.clear()
|
||||
for name, cfg in backends.items():
|
||||
_SQLSTORE_BACKENDS[name] = cfg
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue