Merge branch 'main' into chroma

This commit is contained in:
Bwook (Byoungwook) Kim 2025-09-11 20:46:53 +09:00 committed by GitHub
commit 11c71c958e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
308 changed files with 26415 additions and 11807 deletions

View file

@ -4,53 +4,55 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import os
from pydantic import BaseModel, Field
class BedrockBaseConfig(BaseModel):
aws_access_key_id: str | None = Field(
default=None,
default_factory=lambda: os.getenv("AWS_ACCESS_KEY_ID"),
description="The AWS access key to use. Default use environment variable: AWS_ACCESS_KEY_ID",
)
aws_secret_access_key: str | None = Field(
default=None,
default_factory=lambda: os.getenv("AWS_SECRET_ACCESS_KEY"),
description="The AWS secret access key to use. Default use environment variable: AWS_SECRET_ACCESS_KEY",
)
aws_session_token: str | None = Field(
default=None,
default_factory=lambda: os.getenv("AWS_SESSION_TOKEN"),
description="The AWS session token to use. Default use environment variable: AWS_SESSION_TOKEN",
)
region_name: str | None = Field(
default=None,
default_factory=lambda: os.getenv("AWS_DEFAULT_REGION"),
description="The default AWS Region to use, for example, us-west-1 or us-west-2."
"Default use environment variable: AWS_DEFAULT_REGION",
)
profile_name: str | None = Field(
default=None,
default_factory=lambda: os.getenv("AWS_PROFILE"),
description="The profile name that contains credentials to use.Default use environment variable: AWS_PROFILE",
)
total_max_attempts: int | None = Field(
default=None,
default_factory=lambda: int(val) if (val := os.getenv("AWS_MAX_ATTEMPTS")) else None,
description="An integer representing the maximum number of attempts that will be made for a single request, "
"including the initial attempt. Default use environment variable: AWS_MAX_ATTEMPTS",
)
retry_mode: str | None = Field(
default=None,
default_factory=lambda: os.getenv("AWS_RETRY_MODE"),
description="A string representing the type of retries Boto3 will perform."
"Default use environment variable: AWS_RETRY_MODE",
)
connect_timeout: float | None = Field(
default=60,
default_factory=lambda: float(os.getenv("AWS_CONNECT_TIMEOUT", "60")),
description="The time in seconds till a timeout exception is thrown when attempting to make a connection. "
"The default is 60 seconds.",
)
read_timeout: float | None = Field(
default=60,
default_factory=lambda: float(os.getenv("AWS_READ_TIMEOUT", "60")),
description="The time in seconds till a timeout exception is thrown when attempting to read from a connection."
"The default is 60 seconds.",
)
session_ttl: int | None = Field(
default=3600,
default_factory=lambda: int(os.getenv("AWS_SESSION_TTL", "3600")),
description="The time in seconds till a session expires. The default is 3600 seconds (1 hour).",
)

View file

@ -4,6 +4,7 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
import base64
import struct
from typing import TYPE_CHECKING
@ -43,9 +44,11 @@ class SentenceTransformerEmbeddingMixin:
task_type: EmbeddingTaskType | None = None,
) -> EmbeddingsResponse:
model = await self.model_store.get_model(model_id)
embedding_model = self._load_sentence_transformer_model(model.provider_resource_id)
embeddings = embedding_model.encode(
[interleaved_content_as_str(content) for content in contents], show_progress_bar=False
embedding_model = await self._load_sentence_transformer_model(model.provider_resource_id)
embeddings = await asyncio.to_thread(
embedding_model.encode,
[interleaved_content_as_str(content) for content in contents],
show_progress_bar=False,
)
return EmbeddingsResponse(embeddings=embeddings)
@ -64,8 +67,8 @@ class SentenceTransformerEmbeddingMixin:
# Get the model and generate embeddings
model_obj = await self.model_store.get_model(model)
embedding_model = self._load_sentence_transformer_model(model_obj.provider_resource_id)
embeddings = embedding_model.encode(input_list, show_progress_bar=False)
embedding_model = await self._load_sentence_transformer_model(model_obj.provider_resource_id)
embeddings = await asyncio.to_thread(embedding_model.encode, input_list, show_progress_bar=False)
# Convert embeddings to the requested format
data = []
@ -93,7 +96,7 @@ class SentenceTransformerEmbeddingMixin:
usage=usage,
)
def _load_sentence_transformer_model(self, model: str) -> "SentenceTransformer":
async def _load_sentence_transformer_model(self, model: str) -> "SentenceTransformer":
global EMBEDDING_MODELS
loaded_model = EMBEDDING_MODELS.get(model)
@ -101,8 +104,12 @@ class SentenceTransformerEmbeddingMixin:
return loaded_model
log.info(f"Loading sentence transformer for {model}...")
from sentence_transformers import SentenceTransformer
loaded_model = SentenceTransformer(model)
def _load_model():
from sentence_transformers import SentenceTransformer
return SentenceTransformer(model)
loaded_model = await asyncio.to_thread(_load_model)
EMBEDDING_MODELS[model] = loaded_model
return loaded_model

View file

@ -3,6 +3,11 @@
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import asyncio
from typing import Any
from sqlalchemy.exc import IntegrityError
from llama_stack.apis.inference import (
ListOpenAIChatCompletionResponse,
OpenAIChatCompletion,
@ -10,24 +15,43 @@ from llama_stack.apis.inference import (
OpenAIMessageParam,
Order,
)
from llama_stack.core.datatypes import AccessRule
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
from llama_stack.core.datatypes import AccessRule, InferenceStoreConfig
from llama_stack.log import get_logger
from ..sqlstore.api import ColumnDefinition, ColumnType
from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore
from ..sqlstore.sqlstore import SqliteSqlStoreConfig, SqlStoreConfig, sqlstore_impl
from ..sqlstore.sqlstore import SqlStoreConfig, SqlStoreType, sqlstore_impl
logger = get_logger(name=__name__, category="inference_store")
class InferenceStore:
def __init__(self, sql_store_config: SqlStoreConfig, policy: list[AccessRule]):
if not sql_store_config:
sql_store_config = SqliteSqlStoreConfig(
db_path=(RUNTIME_BASE_DIR / "sqlstore.db").as_posix(),
def __init__(
self,
config: InferenceStoreConfig | SqlStoreConfig,
policy: list[AccessRule],
):
# Handle backward compatibility
if not isinstance(config, InferenceStoreConfig):
# Legacy: SqlStoreConfig passed directly as config
config = InferenceStoreConfig(
sql_store_config=config,
)
self.sql_store_config = sql_store_config
self.config = config
self.sql_store_config = config.sql_store_config
self.sql_store = None
self.policy = policy
# Disable write queue for SQLite to avoid concurrency issues
self.enable_write_queue = self.sql_store_config.type != SqlStoreType.sqlite
# Async write queue and worker control
self._queue: asyncio.Queue[tuple[OpenAIChatCompletion, list[OpenAIMessageParam]]] | None = None
self._worker_tasks: list[asyncio.Task[Any]] = []
self._max_write_queue_size: int = config.max_write_queue_size
self._num_writers: int = max(1, config.num_writers)
async def initialize(self):
"""Create the necessary tables if they don't exist."""
self.sql_store = AuthorizedSqlStore(sqlstore_impl(self.sql_store_config))
@ -42,23 +66,109 @@ class InferenceStore:
},
)
if self.enable_write_queue:
self._queue = asyncio.Queue(maxsize=self._max_write_queue_size)
for _ in range(self._num_writers):
self._worker_tasks.append(asyncio.create_task(self._worker_loop()))
else:
logger.info("Write queue disabled for SQLite to avoid concurrency issues")
async def shutdown(self) -> None:
if not self._worker_tasks:
return
if self._queue is not None:
await self._queue.join()
for t in self._worker_tasks:
if not t.done():
t.cancel()
for t in self._worker_tasks:
try:
await t
except asyncio.CancelledError:
pass
self._worker_tasks.clear()
async def flush(self) -> None:
"""Wait for all queued writes to complete. Useful for testing."""
if self.enable_write_queue and self._queue is not None:
await self._queue.join()
async def store_chat_completion(
self, chat_completion: OpenAIChatCompletion, input_messages: list[OpenAIMessageParam]
) -> None:
if not self.sql_store:
if self.enable_write_queue:
if self._queue is None:
raise ValueError("Inference store is not initialized")
try:
self._queue.put_nowait((chat_completion, input_messages))
except asyncio.QueueFull:
logger.warning(
f"Write queue full; adding chat completion id={getattr(chat_completion, 'id', '<unknown>')}"
)
await self._queue.put((chat_completion, input_messages))
else:
await self._write_chat_completion(chat_completion, input_messages)
async def _worker_loop(self) -> None:
assert self._queue is not None
while True:
try:
item = await self._queue.get()
except asyncio.CancelledError:
break
chat_completion, input_messages = item
try:
await self._write_chat_completion(chat_completion, input_messages)
except Exception as e: # noqa: BLE001
logger.error(f"Error writing chat completion: {e}")
finally:
self._queue.task_done()
async def _write_chat_completion(
self, chat_completion: OpenAIChatCompletion, input_messages: list[OpenAIMessageParam]
) -> None:
if self.sql_store is None:
raise ValueError("Inference store is not initialized")
data = chat_completion.model_dump()
record_data = {
"id": data["id"],
"created": data["created"],
"model": data["model"],
"choices": data["choices"],
"input_messages": [message.model_dump() for message in input_messages],
}
await self.sql_store.insert(
table="chat_completions",
data={
"id": data["id"],
"created": data["created"],
"model": data["model"],
"choices": data["choices"],
"input_messages": [message.model_dump() for message in input_messages],
},
try:
await self.sql_store.insert(
table="chat_completions",
data=record_data,
)
except IntegrityError as e:
# Duplicate chat completion IDs can be generated during tests especially if they are replaying
# recorded responses across different tests. No need to warn or error under those circumstances.
# In the wild, this is not likely to happen at all (no evidence) so we aren't really hiding any problem.
# Check if it's a unique constraint violation
error_message = str(e.orig) if e.orig else str(e)
if self._is_unique_constraint_error(error_message):
# Update the existing record instead
await self.sql_store.update(table="chat_completions", data=record_data, where={"id": data["id"]})
else:
# Re-raise if it's not a unique constraint error
raise
def _is_unique_constraint_error(self, error_message: str) -> bool:
"""Check if the error is specifically a unique constraint violation."""
error_lower = error_message.lower()
return any(
indicator in error_lower
for indicator in [
"unique constraint failed", # SQLite
"duplicate key", # PostgreSQL
"unique violation", # PostgreSQL alternative
"duplicate entry", # MySQL
]
)
async def list_chat_completions(

View file

@ -294,12 +294,12 @@ class VectorDBWithIndex:
_validate_embedding(c.embedding, i, self.vector_db.embedding_dimension)
if chunks_to_embed:
resp = await self.inference_api.embeddings(
resp = await self.inference_api.openai_embeddings(
self.vector_db.embedding_model,
[c.content for c in chunks_to_embed],
)
for c, embedding in zip(chunks_to_embed, resp.embeddings, strict=False):
c.embedding = embedding
for c, data in zip(chunks_to_embed, resp.data, strict=False):
c.embedding = data.embedding
embeddings = np.array([c.embedding for c in chunks], dtype=np.float32)
await self.index.add_chunks(chunks, embeddings)
@ -334,8 +334,8 @@ class VectorDBWithIndex:
if mode == "keyword":
return await self.index.query_keyword(query_string, k, score_threshold)
embeddings_response = await self.inference_api.embeddings(self.vector_db.embedding_model, [query_string])
query_vector = np.array(embeddings_response.embeddings[0], dtype=np.float32)
embeddings_response = await self.inference_api.openai_embeddings(self.vector_db.embedding_model, [query_string])
query_vector = np.array(embeddings_response.data[0].embedding, dtype=np.float32)
if mode == "hybrid":
return await self.index.query_hybrid(
query_vector, query_string, k, score_threshold, reranker_type, reranker_params

View file

@ -172,6 +172,20 @@ class AuthorizedSqlStore:
return results.data[0] if results.data else None
async def update(self, table: str, data: Mapping[str, Any], where: Mapping[str, Any]) -> None:
"""Update rows with automatic access control attribute capture."""
enhanced_data = dict(data)
current_user = get_authenticated_user()
if current_user:
enhanced_data["owner_principal"] = current_user.principal
enhanced_data["access_attributes"] = current_user.attributes
else:
enhanced_data["owner_principal"] = None
enhanced_data["access_attributes"] = None
await self.sql_store.update(table, enhanced_data, where)
async def delete(self, table: str, where: Mapping[str, Any]) -> None:
"""Delete rows with automatic access control filtering."""
await self.sql_store.delete(table, where)

View file

@ -23,6 +23,7 @@ from sqlalchemy import (
)
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.ext.asyncio.engine import AsyncEngine
from sqlalchemy.sql.elements import ColumnElement
from llama_stack.apis.common.responses import PaginatedResponse
from llama_stack.log import get_logger
@ -43,6 +44,30 @@ TYPE_MAPPING: dict[ColumnType, Any] = {
}
def _build_where_expr(column: ColumnElement, value: Any) -> ColumnElement:
"""Return a SQLAlchemy expression for a where condition.
`value` may be a simple scalar (equality) or a mapping like {">": 123}.
The returned expression is a SQLAlchemy ColumnElement usable in query.where(...).
"""
if isinstance(value, Mapping):
if len(value) != 1:
raise ValueError(f"Operator mapping must have a single operator, got: {value}")
op, operand = next(iter(value.items()))
if op == "==" or op == "=":
return column == operand
if op == ">":
return column > operand
if op == "<":
return column < operand
if op == ">=":
return column >= operand
if op == "<=":
return column <= operand
raise ValueError(f"Unsupported operator '{op}' in where mapping")
return column == value
class SqlAlchemySqlStoreImpl(SqlStore):
def __init__(self, config: SqlAlchemySqlStoreConfig):
self.config = config
@ -111,7 +136,7 @@ class SqlAlchemySqlStoreImpl(SqlStore):
if where:
for key, value in where.items():
query = query.where(table_obj.c[key] == value)
query = query.where(_build_where_expr(table_obj.c[key], value))
if where_sql:
query = query.where(text(where_sql))
@ -222,7 +247,7 @@ class SqlAlchemySqlStoreImpl(SqlStore):
async with self.async_session() as session:
stmt = self.metadata.tables[table].update()
for key, value in where.items():
stmt = stmt.where(self.metadata.tables[table].c[key] == value)
stmt = stmt.where(_build_where_expr(self.metadata.tables[table].c[key], value))
await session.execute(stmt, data)
await session.commit()
@ -233,7 +258,7 @@ class SqlAlchemySqlStoreImpl(SqlStore):
async with self.async_session() as session:
stmt = self.metadata.tables[table].delete()
for key, value in where.items():
stmt = stmt.where(self.metadata.tables[table].c[key] == value)
stmt = stmt.where(_build_where_expr(self.metadata.tables[table].c[key], value))
await session.execute(stmt)
await session.commit()

View file

@ -18,6 +18,7 @@ from functools import wraps
from typing import Any
from llama_stack.apis.telemetry import (
Event,
LogSeverity,
Span,
SpanEndPayload,
@ -98,7 +99,7 @@ class BackgroundLogger:
def __init__(self, api: Telemetry, capacity: int = 100000):
self.api = api
self.log_queue: queue.Queue[Any] = queue.Queue(maxsize=capacity)
self.worker_thread = threading.Thread(target=self._process_logs, daemon=True)
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
self.worker_thread.start()
self._last_queue_full_log_time: float = 0.0
self._dropped_since_last_notice: int = 0
@ -118,12 +119,16 @@ class BackgroundLogger:
self._last_queue_full_log_time = current_time
self._dropped_since_last_notice = 0
def _process_logs(self):
def _worker(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._process_logs())
async def _process_logs(self):
while True:
try:
event = self.log_queue.get()
# figure out how to use a thread's native loop
asyncio.run(self.api.log_event(event))
await self.api.log_event(event)
except Exception:
import traceback
@ -136,6 +141,19 @@ class BackgroundLogger:
self.log_queue.join()
def enqueue_event(event: Event) -> None:
"""Enqueue a telemetry event to the background logger if available.
This provides a non-blocking path for routers and other hot paths to
submit telemetry without awaiting the Telemetry API, reducing contention
with the main event loop.
"""
global BACKGROUND_LOGGER
if BACKGROUND_LOGGER is None:
raise RuntimeError("Telemetry API not initialized")
BACKGROUND_LOGGER.log_event(event)
class TraceContext:
spans: list[Span] = []
@ -256,11 +274,7 @@ class TelemetryHandler(logging.Handler):
if record.module in ("asyncio", "selector_events"):
return
global CURRENT_TRACE_CONTEXT, BACKGROUND_LOGGER
if BACKGROUND_LOGGER is None:
raise RuntimeError("Telemetry API not initialized")
global CURRENT_TRACE_CONTEXT
context = CURRENT_TRACE_CONTEXT.get()
if context is None:
return
@ -269,7 +283,7 @@ class TelemetryHandler(logging.Handler):
if span is None:
return
BACKGROUND_LOGGER.log_event(
enqueue_event(
UnstructuredLogEvent(
trace_id=span.trace_id,
span_id=span.span_id,

View file

@ -67,6 +67,38 @@ async def client_wrapper(endpoint: str, headers: dict[str, str]) -> AsyncGenerat
raise AuthenticationRequiredError(exc) from exc
if i == len(connection_strategies) - 1:
raise
except* httpx.ConnectError as eg:
# Connection refused, server down, network unreachable
if i == len(connection_strategies) - 1:
error_msg = f"Failed to connect to MCP server at {endpoint}: Connection refused"
logger.error(f"MCP connection error: {error_msg}")
raise ConnectionError(error_msg) from eg
else:
logger.warning(
f"failed to connect to MCP server at {endpoint} via {strategy.name}, falling back to {connection_strategies[i + 1].name}"
)
except* httpx.TimeoutException as eg:
# Request timeout, server too slow
if i == len(connection_strategies) - 1:
error_msg = f"MCP server at {endpoint} timed out"
logger.error(f"MCP timeout error: {error_msg}")
raise TimeoutError(error_msg) from eg
else:
logger.warning(
f"MCP server at {endpoint} timed out via {strategy.name}, falling back to {connection_strategies[i + 1].name}"
)
except* httpx.RequestError as eg:
# DNS resolution failures, network errors, invalid URLs
if i == len(connection_strategies) - 1:
# Get the first exception's message for the error string
exc_msg = str(eg.exceptions[0]) if eg.exceptions else "Unknown error"
error_msg = f"Network error connecting to MCP server at {endpoint}: {exc_msg}"
logger.error(f"MCP network error: {error_msg}")
raise ConnectionError(error_msg) from eg
else:
logger.warning(
f"network error connecting to MCP server at {endpoint} via {strategy.name}, falling back to {connection_strategies[i + 1].name}"
)
except* McpError:
if i < len(connection_strategies) - 1:
logger.warning(

View file

@ -39,13 +39,16 @@ def sanitize_collection_name(name: str, weaviate_format=False) -> str:
return s
class Reranker:
class WeightedInMemoryAggregator:
@staticmethod
def _normalize_scores(scores: dict[str, float]) -> dict[str, float]:
"""
Normalize scores to 0-1 range using min-max normalization.
Args:
scores: dictionary of scores with document IDs as keys and scores as values
Returns:
Normalized scores with document IDs as keys and normalized scores as values
"""
@ -65,17 +68,20 @@ class Reranker:
) -> dict[str, float]:
"""
Rerank via weighted average of scores.
Args:
vector_scores: scores from vector search
keyword_scores: scores from keyword search
alpha: weight factor between 0 and 1 (default: 0.5)
0 = keyword only, 1 = vector only, 0.5 = equal weight
Returns:
All unique document IDs with weighted combined scores
"""
all_ids = set(vector_scores.keys()) | set(keyword_scores.keys())
normalized_vector_scores = Reranker._normalize_scores(vector_scores)
normalized_keyword_scores = Reranker._normalize_scores(keyword_scores)
normalized_vector_scores = WeightedInMemoryAggregator._normalize_scores(vector_scores)
normalized_keyword_scores = WeightedInMemoryAggregator._normalize_scores(keyword_scores)
# Weighted formula: score = (1-alpha) * keyword_score + alpha * vector_score
# alpha=0 means keyword only, alpha=1 means vector only
@ -93,10 +99,12 @@ class Reranker:
) -> dict[str, float]:
"""
Rerank via Reciprocal Rank Fusion.
Args:
vector_scores: scores from vector search
keyword_scores: scores from keyword search
impact_factor: impact factor for RRF (default: 60.0)
Returns:
All unique document IDs with RRF combined scores
"""
@ -130,11 +138,13 @@ class Reranker:
) -> dict[str, float]:
"""
Combine vector and keyword search results using specified reranking strategy.
Args:
vector_scores: scores from vector search
keyword_scores: scores from keyword search
reranker_type: type of reranker to use (default: RERANKER_TYPE_RRF)
reranker_params: parameters for the reranker
Returns:
All unique document IDs with combined scores
"""
@ -143,8 +153,9 @@ class Reranker:
if reranker_type == "weighted":
alpha = reranker_params.get("alpha", 0.5)
return Reranker.weighted_rerank(vector_scores, keyword_scores, alpha)
return WeightedInMemoryAggregator.weighted_rerank(vector_scores, keyword_scores, alpha)
else:
# Default to RRF for None, RRF, or any unknown types
impact_factor = reranker_params.get("impact_factor", 60.0)
return Reranker.rrf_rerank(vector_scores, keyword_scores, impact_factor)
return WeightedInMemoryAggregator.rrf_rerank(vector_scores, keyword_scores, impact_factor)