Merge remote-tracking branch 'origin/main' into stack-config-default-embed

This commit is contained in:
Ashwin Bharambe 2025-10-20 13:29:19 -07:00
commit 31249a1a75
237 changed files with 30895 additions and 15441 deletions

View file

@ -15,12 +15,13 @@ from llama_stack.apis.inference import (
OpenAIMessageParam,
Order,
)
from llama_stack.core.datatypes import AccessRule, InferenceStoreConfig
from llama_stack.core.datatypes import AccessRule
from llama_stack.core.storage.datatypes import InferenceStoreReference, StorageBackendType
from llama_stack.log import get_logger
from ..sqlstore.api import ColumnDefinition, ColumnType
from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore
from ..sqlstore.sqlstore import SqlStoreConfig, SqlStoreType, sqlstore_impl
from ..sqlstore.sqlstore import _SQLSTORE_BACKENDS, sqlstore_impl
logger = get_logger(name=__name__, category="inference")
@ -28,33 +29,32 @@ logger = get_logger(name=__name__, category="inference")
class InferenceStore:
def __init__(
self,
config: InferenceStoreConfig | SqlStoreConfig,
reference: InferenceStoreReference,
policy: list[AccessRule],
):
# Handle backward compatibility
if not isinstance(config, InferenceStoreConfig):
# Legacy: SqlStoreConfig passed directly as config
config = InferenceStoreConfig(
sql_store_config=config,
)
self.config = config
self.sql_store_config = config.sql_store_config
self.reference = reference
self.sql_store = None
self.policy = policy
# Disable write queue for SQLite to avoid concurrency issues
self.enable_write_queue = self.sql_store_config.type != SqlStoreType.sqlite
# Async write queue and worker control
self._queue: asyncio.Queue[tuple[OpenAIChatCompletion, list[OpenAIMessageParam]]] | None = None
self._worker_tasks: list[asyncio.Task[Any]] = []
self._max_write_queue_size: int = config.max_write_queue_size
self._num_writers: int = max(1, config.num_writers)
self._max_write_queue_size: int = reference.max_write_queue_size
self._num_writers: int = max(1, reference.num_writers)
async def initialize(self):
"""Create the necessary tables if they don't exist."""
self.sql_store = AuthorizedSqlStore(sqlstore_impl(self.sql_store_config), self.policy)
base_store = sqlstore_impl(self.reference)
self.sql_store = AuthorizedSqlStore(base_store, self.policy)
# Disable write queue for SQLite to avoid concurrency issues
backend_name = self.reference.backend
backend_config = _SQLSTORE_BACKENDS.get(backend_name)
if backend_config is None:
raise ValueError(
f"Unregistered SQL backend '{backend_name}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}"
)
self.enable_write_queue = backend_config.type != StorageBackendType.SQL_SQLITE
await self.sql_store.create_table(
"chat_completions",
{

View file

@ -4,143 +4,20 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import re
from enum import Enum
from typing import Annotated, Literal
from typing import Annotated
from pydantic import BaseModel, Field, field_validator
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
class KVStoreType(Enum):
redis = "redis"
sqlite = "sqlite"
postgres = "postgres"
mongodb = "mongodb"
class CommonConfig(BaseModel):
namespace: str | None = Field(
default=None,
description="All keys will be prefixed with this namespace",
)
class RedisKVStoreConfig(CommonConfig):
type: Literal["redis"] = KVStoreType.redis.value
host: str = "localhost"
port: int = 6379
@property
def url(self) -> str:
return f"redis://{self.host}:{self.port}"
@classmethod
def pip_packages(cls) -> list[str]:
return ["redis"]
@classmethod
def sample_run_config(cls):
return {
"type": "redis",
"host": "${env.REDIS_HOST:=localhost}",
"port": "${env.REDIS_PORT:=6379}",
}
class SqliteKVStoreConfig(CommonConfig):
type: Literal["sqlite"] = KVStoreType.sqlite.value
db_path: str = Field(
default=(RUNTIME_BASE_DIR / "kvstore.db").as_posix(),
description="File path for the sqlite database",
)
@classmethod
def pip_packages(cls) -> list[str]:
return ["aiosqlite"]
@classmethod
def sample_run_config(cls, __distro_dir__: str, db_name: str = "kvstore.db"):
return {
"type": "sqlite",
"db_path": "${env.SQLITE_STORE_DIR:=" + __distro_dir__ + "}/" + db_name,
}
class PostgresKVStoreConfig(CommonConfig):
type: Literal["postgres"] = KVStoreType.postgres.value
host: str = "localhost"
port: int = 5432
db: str = "llamastack"
user: str
password: str | None = None
ssl_mode: str | None = None
ca_cert_path: str | None = None
table_name: str = "llamastack_kvstore"
@classmethod
def sample_run_config(cls, table_name: str = "llamastack_kvstore", **kwargs):
return {
"type": "postgres",
"host": "${env.POSTGRES_HOST:=localhost}",
"port": "${env.POSTGRES_PORT:=5432}",
"db": "${env.POSTGRES_DB:=llamastack}",
"user": "${env.POSTGRES_USER:=llamastack}",
"password": "${env.POSTGRES_PASSWORD:=llamastack}",
"table_name": "${env.POSTGRES_TABLE_NAME:=" + table_name + "}",
}
@classmethod
@field_validator("table_name")
def validate_table_name(cls, v: str) -> str:
# PostgreSQL identifiers rules:
# - Must start with a letter or underscore
# - Can contain letters, numbers, and underscores
# - Maximum length is 63 bytes
pattern = r"^[a-zA-Z_][a-zA-Z0-9_]*$"
if not re.match(pattern, v):
raise ValueError(
"Invalid table name. Must start with letter or underscore and contain only letters, numbers, and underscores"
)
if len(v) > 63:
raise ValueError("Table name must be less than 63 characters")
return v
@classmethod
def pip_packages(cls) -> list[str]:
return ["psycopg2-binary"]
class MongoDBKVStoreConfig(CommonConfig):
type: Literal["mongodb"] = KVStoreType.mongodb.value
host: str = "localhost"
port: int = 27017
db: str = "llamastack"
user: str | None = None
password: str | None = None
collection_name: str = "llamastack_kvstore"
@classmethod
def pip_packages(cls) -> list[str]:
return ["pymongo"]
@classmethod
def sample_run_config(cls, collection_name: str = "llamastack_kvstore"):
return {
"type": "mongodb",
"host": "${env.MONGODB_HOST:=localhost}",
"port": "${env.MONGODB_PORT:=5432}",
"db": "${env.MONGODB_DB}",
"user": "${env.MONGODB_USER}",
"password": "${env.MONGODB_PASSWORD}",
"collection_name": "${env.MONGODB_COLLECTION_NAME:=" + collection_name + "}",
}
from pydantic import Field
from llama_stack.core.storage.datatypes import (
MongoDBKVStoreConfig,
PostgresKVStoreConfig,
RedisKVStoreConfig,
SqliteKVStoreConfig,
StorageBackendType,
)
KVStoreConfig = Annotated[
RedisKVStoreConfig | SqliteKVStoreConfig | PostgresKVStoreConfig | MongoDBKVStoreConfig,
Field(discriminator="type", default=KVStoreType.sqlite.value),
RedisKVStoreConfig | SqliteKVStoreConfig | PostgresKVStoreConfig | MongoDBKVStoreConfig, Field(discriminator="type")
]
@ -148,13 +25,13 @@ def get_pip_packages(store_config: dict | KVStoreConfig) -> list[str]:
"""Get pip packages for KV store config, handling both dict and object cases."""
if isinstance(store_config, dict):
store_type = store_config.get("type")
if store_type == "sqlite":
if store_type == StorageBackendType.KV_SQLITE.value:
return SqliteKVStoreConfig.pip_packages()
elif store_type == "postgres":
elif store_type == StorageBackendType.KV_POSTGRES.value:
return PostgresKVStoreConfig.pip_packages()
elif store_type == "redis":
elif store_type == StorageBackendType.KV_REDIS.value:
return RedisKVStoreConfig.pip_packages()
elif store_type == "mongodb":
elif store_type == StorageBackendType.KV_MONGODB.value:
return MongoDBKVStoreConfig.pip_packages()
else:
raise ValueError(f"Unknown KV store type: {store_type}")

View file

@ -4,9 +4,17 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from __future__ import annotations
from llama_stack.core.storage.datatypes import KVStoreReference, StorageBackendConfig, StorageBackendType
from .api import KVStore
from .config import KVStoreConfig, KVStoreType
from .config import KVStoreConfig
def kvstore_dependencies():
@ -44,20 +52,41 @@ class InmemoryKVStoreImpl(KVStore):
del self._store[key]
async def kvstore_impl(config: KVStoreConfig) -> KVStore:
if config.type == KVStoreType.redis.value:
_KVSTORE_BACKENDS: dict[str, KVStoreConfig] = {}
def register_kvstore_backends(backends: dict[str, StorageBackendConfig]) -> None:
"""Register the set of available KV store backends for reference resolution."""
global _KVSTORE_BACKENDS
_KVSTORE_BACKENDS.clear()
for name, cfg in backends.items():
_KVSTORE_BACKENDS[name] = cfg
async def kvstore_impl(reference: KVStoreReference) -> KVStore:
backend_name = reference.backend
backend_config = _KVSTORE_BACKENDS.get(backend_name)
if backend_config is None:
raise ValueError(f"Unknown KVStore backend '{backend_name}'. Registered backends: {sorted(_KVSTORE_BACKENDS)}")
config = backend_config.model_copy()
config.namespace = reference.namespace
if config.type == StorageBackendType.KV_REDIS.value:
from .redis import RedisKVStoreImpl
impl = RedisKVStoreImpl(config)
elif config.type == KVStoreType.sqlite.value:
elif config.type == StorageBackendType.KV_SQLITE.value:
from .sqlite import SqliteKVStoreImpl
impl = SqliteKVStoreImpl(config)
elif config.type == KVStoreType.postgres.value:
elif config.type == StorageBackendType.KV_POSTGRES.value:
from .postgres import PostgresKVStoreImpl
impl = PostgresKVStoreImpl(config)
elif config.type == KVStoreType.mongodb.value:
elif config.type == StorageBackendType.KV_MONGODB.value:
from .mongodb import MongoDBKVStoreImpl
impl = MongoDBKVStoreImpl(config)

View file

@ -18,13 +18,13 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseObjectWithInput,
)
from llama_stack.apis.inference import OpenAIMessageParam
from llama_stack.core.datatypes import AccessRule, ResponsesStoreConfig
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
from llama_stack.core.datatypes import AccessRule
from llama_stack.core.storage.datatypes import ResponsesStoreReference, SqlStoreReference, StorageBackendType
from llama_stack.log import get_logger
from ..sqlstore.api import ColumnDefinition, ColumnType
from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore
from ..sqlstore.sqlstore import SqliteSqlStoreConfig, SqlStoreConfig, SqlStoreType, sqlstore_impl
from ..sqlstore.sqlstore import _SQLSTORE_BACKENDS, sqlstore_impl
logger = get_logger(name=__name__, category="openai_responses")
@ -45,39 +45,38 @@ class _OpenAIResponseObjectWithInputAndMessages(OpenAIResponseObjectWithInput):
class ResponsesStore:
def __init__(
self,
config: ResponsesStoreConfig | SqlStoreConfig,
reference: ResponsesStoreReference | SqlStoreReference,
policy: list[AccessRule],
):
# Handle backward compatibility
if not isinstance(config, ResponsesStoreConfig):
# Legacy: SqlStoreConfig passed directly as config
config = ResponsesStoreConfig(
sql_store_config=config,
)
if isinstance(reference, ResponsesStoreReference):
self.reference = reference
else:
self.reference = ResponsesStoreReference(**reference.model_dump())
self.config = config
self.sql_store_config = config.sql_store_config
if not self.sql_store_config:
self.sql_store_config = SqliteSqlStoreConfig(
db_path=(RUNTIME_BASE_DIR / "sqlstore.db").as_posix(),
)
self.sql_store = None
self.policy = policy
# Disable write queue for SQLite to avoid concurrency issues
self.enable_write_queue = self.sql_store_config.type != SqlStoreType.sqlite
self.sql_store = None
self.enable_write_queue = True
# Async write queue and worker control
self._queue: (
asyncio.Queue[tuple[OpenAIResponseObject, list[OpenAIResponseInput], list[OpenAIMessageParam]]] | None
) = None
self._worker_tasks: list[asyncio.Task[Any]] = []
self._max_write_queue_size: int = config.max_write_queue_size
self._num_writers: int = max(1, config.num_writers)
self._max_write_queue_size: int = self.reference.max_write_queue_size
self._num_writers: int = max(1, self.reference.num_writers)
async def initialize(self):
"""Create the necessary tables if they don't exist."""
self.sql_store = AuthorizedSqlStore(sqlstore_impl(self.sql_store_config), self.policy)
base_store = sqlstore_impl(self.reference)
self.sql_store = AuthorizedSqlStore(base_store, self.policy)
backend_config = _SQLSTORE_BACKENDS.get(self.reference.backend)
if backend_config is None:
raise ValueError(
f"Unregistered SQL backend '{self.reference.backend}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}"
)
if backend_config.type == StorageBackendType.SQL_SQLITE:
self.enable_write_queue = False
await self.sql_store.create_table(
"openai_responses",
{

View file

@ -12,10 +12,10 @@ from llama_stack.core.access_control.conditions import ProtectedResource
from llama_stack.core.access_control.datatypes import AccessRule, Action, Scope
from llama_stack.core.datatypes import User
from llama_stack.core.request_headers import get_authenticated_user
from llama_stack.core.storage.datatypes import StorageBackendType
from llama_stack.log import get_logger
from .api import ColumnDefinition, ColumnType, PaginatedResponse, SqlStore
from .sqlstore import SqlStoreType
logger = get_logger(name=__name__, category="providers::utils")
@ -82,8 +82,8 @@ class AuthorizedSqlStore:
if not hasattr(self.sql_store, "config"):
raise ValueError("SqlStore must have a config attribute to be used with AuthorizedSqlStore")
self.database_type = self.sql_store.config.type
if self.database_type not in [SqlStoreType.postgres, SqlStoreType.sqlite]:
self.database_type = self.sql_store.config.type.value
if self.database_type not in [StorageBackendType.SQL_POSTGRES.value, StorageBackendType.SQL_SQLITE.value]:
raise ValueError(f"Unsupported database type: {self.database_type}")
def _validate_sql_optimized_policy(self) -> None:
@ -220,9 +220,9 @@ class AuthorizedSqlStore:
Returns:
SQL expression to extract JSON value
"""
if self.database_type == SqlStoreType.postgres:
if self.database_type == StorageBackendType.SQL_POSTGRES.value:
return f"{column}->'{path}'"
elif self.database_type == SqlStoreType.sqlite:
elif self.database_type == StorageBackendType.SQL_SQLITE.value:
return f"JSON_EXTRACT({column}, '$.{path}')"
else:
raise ValueError(f"Unsupported database type: {self.database_type}")
@ -237,9 +237,9 @@ class AuthorizedSqlStore:
Returns:
SQL expression to extract JSON value as text
"""
if self.database_type == SqlStoreType.postgres:
if self.database_type == StorageBackendType.SQL_POSTGRES.value:
return f"{column}->>'{path}'"
elif self.database_type == SqlStoreType.sqlite:
elif self.database_type == StorageBackendType.SQL_SQLITE.value:
return f"JSON_EXTRACT({column}, '$.{path}')"
else:
raise ValueError(f"Unsupported database type: {self.database_type}")
@ -248,10 +248,10 @@ class AuthorizedSqlStore:
"""Get the SQL conditions for public access."""
# Public records are records that have no owner_principal or access_attributes
conditions = ["owner_principal = ''"]
if self.database_type == SqlStoreType.postgres:
if self.database_type == StorageBackendType.SQL_POSTGRES.value:
# Postgres stores JSON null as 'null'
conditions.append("access_attributes::text = 'null'")
elif self.database_type == SqlStoreType.sqlite:
elif self.database_type == StorageBackendType.SQL_SQLITE.value:
conditions.append("access_attributes = 'null'")
else:
raise ValueError(f"Unsupported database type: {self.database_type}")

View file

@ -26,10 +26,10 @@ from sqlalchemy.ext.asyncio.engine import AsyncEngine
from sqlalchemy.sql.elements import ColumnElement
from llama_stack.apis.common.responses import PaginatedResponse
from llama_stack.core.storage.datatypes import SqlAlchemySqlStoreConfig
from llama_stack.log import get_logger
from .api import ColumnDefinition, ColumnType, SqlStore
from .sqlstore import SqlAlchemySqlStoreConfig
logger = get_logger(name=__name__, category="providers::utils")

View file

@ -4,90 +4,28 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from abc import abstractmethod
from enum import StrEnum
from pathlib import Path
from typing import Annotated, Literal
from typing import Annotated, cast
from pydantic import BaseModel, Field
from pydantic import Field
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
from llama_stack.core.storage.datatypes import (
PostgresSqlStoreConfig,
SqliteSqlStoreConfig,
SqlStoreReference,
StorageBackendConfig,
StorageBackendType,
)
from .api import SqlStore
sql_store_pip_packages = ["sqlalchemy[asyncio]", "aiosqlite", "asyncpg"]
class SqlStoreType(StrEnum):
sqlite = "sqlite"
postgres = "postgres"
class SqlAlchemySqlStoreConfig(BaseModel):
@property
@abstractmethod
def engine_str(self) -> str: ...
# TODO: move this when we have a better way to specify dependencies with internal APIs
@classmethod
def pip_packages(cls) -> list[str]:
return ["sqlalchemy[asyncio]"]
class SqliteSqlStoreConfig(SqlAlchemySqlStoreConfig):
type: Literal[SqlStoreType.sqlite] = SqlStoreType.sqlite
db_path: str = Field(
default=(RUNTIME_BASE_DIR / "sqlstore.db").as_posix(),
description="Database path, e.g. ~/.llama/distributions/ollama/sqlstore.db",
)
@property
def engine_str(self) -> str:
return "sqlite+aiosqlite:///" + Path(self.db_path).expanduser().as_posix()
@classmethod
def sample_run_config(cls, __distro_dir__: str, db_name: str = "sqlstore.db"):
return {
"type": "sqlite",
"db_path": "${env.SQLITE_STORE_DIR:=" + __distro_dir__ + "}/" + db_name,
}
@classmethod
def pip_packages(cls) -> list[str]:
return super().pip_packages() + ["aiosqlite"]
class PostgresSqlStoreConfig(SqlAlchemySqlStoreConfig):
type: Literal[SqlStoreType.postgres] = SqlStoreType.postgres
host: str = "localhost"
port: int = 5432
db: str = "llamastack"
user: str
password: str | None = None
@property
def engine_str(self) -> str:
return f"postgresql+asyncpg://{self.user}:{self.password}@{self.host}:{self.port}/{self.db}"
@classmethod
def pip_packages(cls) -> list[str]:
return super().pip_packages() + ["asyncpg"]
@classmethod
def sample_run_config(cls, **kwargs):
return {
"type": "postgres",
"host": "${env.POSTGRES_HOST:=localhost}",
"port": "${env.POSTGRES_PORT:=5432}",
"db": "${env.POSTGRES_DB:=llamastack}",
"user": "${env.POSTGRES_USER:=llamastack}",
"password": "${env.POSTGRES_PASSWORD:=llamastack}",
}
_SQLSTORE_BACKENDS: dict[str, StorageBackendConfig] = {}
SqlStoreConfig = Annotated[
SqliteSqlStoreConfig | PostgresSqlStoreConfig,
Field(discriminator="type", default=SqlStoreType.sqlite.value),
Field(discriminator="type"),
]
@ -95,9 +33,9 @@ def get_pip_packages(store_config: dict | SqlStoreConfig) -> list[str]:
"""Get pip packages for SQL store config, handling both dict and object cases."""
if isinstance(store_config, dict):
store_type = store_config.get("type")
if store_type == "sqlite":
if store_type == StorageBackendType.SQL_SQLITE.value:
return SqliteSqlStoreConfig.pip_packages()
elif store_type == "postgres":
elif store_type == StorageBackendType.SQL_POSTGRES.value:
return PostgresSqlStoreConfig.pip_packages()
else:
raise ValueError(f"Unknown SQL store type: {store_type}")
@ -105,12 +43,28 @@ def get_pip_packages(store_config: dict | SqlStoreConfig) -> list[str]:
return store_config.pip_packages()
def sqlstore_impl(config: SqlStoreConfig) -> SqlStore:
if config.type in [SqlStoreType.sqlite, SqlStoreType.postgres]:
def sqlstore_impl(reference: SqlStoreReference) -> SqlStore:
backend_name = reference.backend
backend_config = _SQLSTORE_BACKENDS.get(backend_name)
if backend_config is None:
raise ValueError(
f"Unknown SQL store backend '{backend_name}'. Registered backends: {sorted(_SQLSTORE_BACKENDS)}"
)
if isinstance(backend_config, SqliteSqlStoreConfig | PostgresSqlStoreConfig):
from .sqlalchemy_sqlstore import SqlAlchemySqlStoreImpl
impl = SqlAlchemySqlStoreImpl(config)
config = cast(SqliteSqlStoreConfig | PostgresSqlStoreConfig, backend_config).model_copy()
return SqlAlchemySqlStoreImpl(config)
else:
raise ValueError(f"Unknown sqlstore type {config.type}")
raise ValueError(f"Unknown sqlstore type {backend_config.type}")
return impl
def register_sqlstore_backends(backends: dict[str, StorageBackendConfig]) -> None:
"""Register the set of available SQL store backends for reference resolution."""
global _SQLSTORE_BACKENDS
_SQLSTORE_BACKENDS.clear()
for name, cfg in backends.items():
_SQLSTORE_BACKENDS[name] = cfg

View file

@ -70,7 +70,7 @@ def trace_protocol[T](cls: type[T]) -> type[T]:
"__class__": class_name,
"__method__": method_name,
"__type__": span_type,
"__args__": str(combined_args),
"__args__": json.dumps(combined_args),
}
return class_name, method_name, span_attributes
@ -82,8 +82,8 @@ def trace_protocol[T](cls: type[T]) -> type[T]:
class_name, method_name, span_attributes = create_span_context(self, *args, **kwargs)
with tracing.span(f"{class_name}.{method_name}", span_attributes) as span:
count = 0
try:
count = 0
async for item in method(self, *args, **kwargs):
yield item
count += 1