mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-10-04 04:04:14 +00:00
Merge branch 'main' into hide-non-openai-inference-apis
This commit is contained in:
commit
cb534281c8
714 changed files with 123149 additions and 54618 deletions
|
@ -4,53 +4,55 @@
|
|||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
import os
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class BedrockBaseConfig(BaseModel):
|
||||
aws_access_key_id: str | None = Field(
|
||||
default=None,
|
||||
default_factory=lambda: os.getenv("AWS_ACCESS_KEY_ID"),
|
||||
description="The AWS access key to use. Default use environment variable: AWS_ACCESS_KEY_ID",
|
||||
)
|
||||
aws_secret_access_key: str | None = Field(
|
||||
default=None,
|
||||
default_factory=lambda: os.getenv("AWS_SECRET_ACCESS_KEY"),
|
||||
description="The AWS secret access key to use. Default use environment variable: AWS_SECRET_ACCESS_KEY",
|
||||
)
|
||||
aws_session_token: str | None = Field(
|
||||
default=None,
|
||||
default_factory=lambda: os.getenv("AWS_SESSION_TOKEN"),
|
||||
description="The AWS session token to use. Default use environment variable: AWS_SESSION_TOKEN",
|
||||
)
|
||||
region_name: str | None = Field(
|
||||
default=None,
|
||||
default_factory=lambda: os.getenv("AWS_DEFAULT_REGION"),
|
||||
description="The default AWS Region to use, for example, us-west-1 or us-west-2."
|
||||
"Default use environment variable: AWS_DEFAULT_REGION",
|
||||
)
|
||||
profile_name: str | None = Field(
|
||||
default=None,
|
||||
default_factory=lambda: os.getenv("AWS_PROFILE"),
|
||||
description="The profile name that contains credentials to use.Default use environment variable: AWS_PROFILE",
|
||||
)
|
||||
total_max_attempts: int | None = Field(
|
||||
default=None,
|
||||
default_factory=lambda: int(val) if (val := os.getenv("AWS_MAX_ATTEMPTS")) else None,
|
||||
description="An integer representing the maximum number of attempts that will be made for a single request, "
|
||||
"including the initial attempt. Default use environment variable: AWS_MAX_ATTEMPTS",
|
||||
)
|
||||
retry_mode: str | None = Field(
|
||||
default=None,
|
||||
default_factory=lambda: os.getenv("AWS_RETRY_MODE"),
|
||||
description="A string representing the type of retries Boto3 will perform."
|
||||
"Default use environment variable: AWS_RETRY_MODE",
|
||||
)
|
||||
connect_timeout: float | None = Field(
|
||||
default=60,
|
||||
default_factory=lambda: float(os.getenv("AWS_CONNECT_TIMEOUT", "60")),
|
||||
description="The time in seconds till a timeout exception is thrown when attempting to make a connection. "
|
||||
"The default is 60 seconds.",
|
||||
)
|
||||
read_timeout: float | None = Field(
|
||||
default=60,
|
||||
default_factory=lambda: float(os.getenv("AWS_READ_TIMEOUT", "60")),
|
||||
description="The time in seconds till a timeout exception is thrown when attempting to read from a connection."
|
||||
"The default is 60 seconds.",
|
||||
)
|
||||
session_ttl: int | None = Field(
|
||||
default=3600,
|
||||
default_factory=lambda: int(os.getenv("AWS_SESSION_TTL", "3600")),
|
||||
description="The time in seconds till a session expires. The default is 3600 seconds (1 hour).",
|
||||
)
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
import asyncio
|
||||
import base64
|
||||
import struct
|
||||
from typing import TYPE_CHECKING
|
||||
|
@ -43,9 +44,11 @@ class SentenceTransformerEmbeddingMixin:
|
|||
task_type: EmbeddingTaskType | None = None,
|
||||
) -> EmbeddingsResponse:
|
||||
model = await self.model_store.get_model(model_id)
|
||||
embedding_model = self._load_sentence_transformer_model(model.provider_resource_id)
|
||||
embeddings = embedding_model.encode(
|
||||
[interleaved_content_as_str(content) for content in contents], show_progress_bar=False
|
||||
embedding_model = await self._load_sentence_transformer_model(model.provider_resource_id)
|
||||
embeddings = await asyncio.to_thread(
|
||||
embedding_model.encode,
|
||||
[interleaved_content_as_str(content) for content in contents],
|
||||
show_progress_bar=False,
|
||||
)
|
||||
return EmbeddingsResponse(embeddings=embeddings)
|
||||
|
||||
|
@ -64,8 +67,8 @@ class SentenceTransformerEmbeddingMixin:
|
|||
|
||||
# Get the model and generate embeddings
|
||||
model_obj = await self.model_store.get_model(model)
|
||||
embedding_model = self._load_sentence_transformer_model(model_obj.provider_resource_id)
|
||||
embeddings = embedding_model.encode(input_list, show_progress_bar=False)
|
||||
embedding_model = await self._load_sentence_transformer_model(model_obj.provider_resource_id)
|
||||
embeddings = await asyncio.to_thread(embedding_model.encode, input_list, show_progress_bar=False)
|
||||
|
||||
# Convert embeddings to the requested format
|
||||
data = []
|
||||
|
@ -93,7 +96,7 @@ class SentenceTransformerEmbeddingMixin:
|
|||
usage=usage,
|
||||
)
|
||||
|
||||
def _load_sentence_transformer_model(self, model: str) -> "SentenceTransformer":
|
||||
async def _load_sentence_transformer_model(self, model: str) -> "SentenceTransformer":
|
||||
global EMBEDDING_MODELS
|
||||
|
||||
loaded_model = EMBEDDING_MODELS.get(model)
|
||||
|
@ -101,8 +104,12 @@ class SentenceTransformerEmbeddingMixin:
|
|||
return loaded_model
|
||||
|
||||
log.info(f"Loading sentence transformer for {model}...")
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
loaded_model = SentenceTransformer(model)
|
||||
def _load_model():
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
return SentenceTransformer(model)
|
||||
|
||||
loaded_model = await asyncio.to_thread(_load_model)
|
||||
EMBEDDING_MODELS[model] = loaded_model
|
||||
return loaded_model
|
||||
|
|
|
@ -3,6 +3,11 @@
|
|||
#
|
||||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
import asyncio
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from llama_stack.apis.inference import (
|
||||
ListOpenAIChatCompletionResponse,
|
||||
OpenAIChatCompletion,
|
||||
|
@ -10,27 +15,46 @@ from llama_stack.apis.inference import (
|
|||
OpenAIMessageParam,
|
||||
Order,
|
||||
)
|
||||
from llama_stack.core.datatypes import AccessRule
|
||||
from llama_stack.core.utils.config_dirs import RUNTIME_BASE_DIR
|
||||
from llama_stack.core.datatypes import AccessRule, InferenceStoreConfig
|
||||
from llama_stack.log import get_logger
|
||||
|
||||
from ..sqlstore.api import ColumnDefinition, ColumnType
|
||||
from ..sqlstore.authorized_sqlstore import AuthorizedSqlStore
|
||||
from ..sqlstore.sqlstore import SqliteSqlStoreConfig, SqlStoreConfig, sqlstore_impl
|
||||
from ..sqlstore.sqlstore import SqlStoreConfig, SqlStoreType, sqlstore_impl
|
||||
|
||||
logger = get_logger(name=__name__, category="inference_store")
|
||||
|
||||
|
||||
class InferenceStore:
|
||||
def __init__(self, sql_store_config: SqlStoreConfig, policy: list[AccessRule]):
|
||||
if not sql_store_config:
|
||||
sql_store_config = SqliteSqlStoreConfig(
|
||||
db_path=(RUNTIME_BASE_DIR / "sqlstore.db").as_posix(),
|
||||
def __init__(
|
||||
self,
|
||||
config: InferenceStoreConfig | SqlStoreConfig,
|
||||
policy: list[AccessRule],
|
||||
):
|
||||
# Handle backward compatibility
|
||||
if not isinstance(config, InferenceStoreConfig):
|
||||
# Legacy: SqlStoreConfig passed directly as config
|
||||
config = InferenceStoreConfig(
|
||||
sql_store_config=config,
|
||||
)
|
||||
self.sql_store_config = sql_store_config
|
||||
|
||||
self.config = config
|
||||
self.sql_store_config = config.sql_store_config
|
||||
self.sql_store = None
|
||||
self.policy = policy
|
||||
|
||||
# Disable write queue for SQLite to avoid concurrency issues
|
||||
self.enable_write_queue = self.sql_store_config.type != SqlStoreType.sqlite
|
||||
|
||||
# Async write queue and worker control
|
||||
self._queue: asyncio.Queue[tuple[OpenAIChatCompletion, list[OpenAIMessageParam]]] | None = None
|
||||
self._worker_tasks: list[asyncio.Task[Any]] = []
|
||||
self._max_write_queue_size: int = config.max_write_queue_size
|
||||
self._num_writers: int = max(1, config.num_writers)
|
||||
|
||||
async def initialize(self):
|
||||
"""Create the necessary tables if they don't exist."""
|
||||
self.sql_store = AuthorizedSqlStore(sqlstore_impl(self.sql_store_config))
|
||||
self.sql_store = AuthorizedSqlStore(sqlstore_impl(self.sql_store_config), self.policy)
|
||||
await self.sql_store.create_table(
|
||||
"chat_completions",
|
||||
{
|
||||
|
@ -42,23 +66,109 @@ class InferenceStore:
|
|||
},
|
||||
)
|
||||
|
||||
if self.enable_write_queue:
|
||||
self._queue = asyncio.Queue(maxsize=self._max_write_queue_size)
|
||||
for _ in range(self._num_writers):
|
||||
self._worker_tasks.append(asyncio.create_task(self._worker_loop()))
|
||||
else:
|
||||
logger.info("Write queue disabled for SQLite to avoid concurrency issues")
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
if not self._worker_tasks:
|
||||
return
|
||||
if self._queue is not None:
|
||||
await self._queue.join()
|
||||
for t in self._worker_tasks:
|
||||
if not t.done():
|
||||
t.cancel()
|
||||
for t in self._worker_tasks:
|
||||
try:
|
||||
await t
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._worker_tasks.clear()
|
||||
|
||||
async def flush(self) -> None:
|
||||
"""Wait for all queued writes to complete. Useful for testing."""
|
||||
if self.enable_write_queue and self._queue is not None:
|
||||
await self._queue.join()
|
||||
|
||||
async def store_chat_completion(
|
||||
self, chat_completion: OpenAIChatCompletion, input_messages: list[OpenAIMessageParam]
|
||||
) -> None:
|
||||
if not self.sql_store:
|
||||
if self.enable_write_queue:
|
||||
if self._queue is None:
|
||||
raise ValueError("Inference store is not initialized")
|
||||
try:
|
||||
self._queue.put_nowait((chat_completion, input_messages))
|
||||
except asyncio.QueueFull:
|
||||
logger.warning(
|
||||
f"Write queue full; adding chat completion id={getattr(chat_completion, 'id', '<unknown>')}"
|
||||
)
|
||||
await self._queue.put((chat_completion, input_messages))
|
||||
else:
|
||||
await self._write_chat_completion(chat_completion, input_messages)
|
||||
|
||||
async def _worker_loop(self) -> None:
|
||||
assert self._queue is not None
|
||||
while True:
|
||||
try:
|
||||
item = await self._queue.get()
|
||||
except asyncio.CancelledError:
|
||||
break
|
||||
chat_completion, input_messages = item
|
||||
try:
|
||||
await self._write_chat_completion(chat_completion, input_messages)
|
||||
except Exception as e: # noqa: BLE001
|
||||
logger.error(f"Error writing chat completion: {e}")
|
||||
finally:
|
||||
self._queue.task_done()
|
||||
|
||||
async def _write_chat_completion(
|
||||
self, chat_completion: OpenAIChatCompletion, input_messages: list[OpenAIMessageParam]
|
||||
) -> None:
|
||||
if self.sql_store is None:
|
||||
raise ValueError("Inference store is not initialized")
|
||||
|
||||
data = chat_completion.model_dump()
|
||||
record_data = {
|
||||
"id": data["id"],
|
||||
"created": data["created"],
|
||||
"model": data["model"],
|
||||
"choices": data["choices"],
|
||||
"input_messages": [message.model_dump() for message in input_messages],
|
||||
}
|
||||
|
||||
await self.sql_store.insert(
|
||||
table="chat_completions",
|
||||
data={
|
||||
"id": data["id"],
|
||||
"created": data["created"],
|
||||
"model": data["model"],
|
||||
"choices": data["choices"],
|
||||
"input_messages": [message.model_dump() for message in input_messages],
|
||||
},
|
||||
try:
|
||||
await self.sql_store.insert(
|
||||
table="chat_completions",
|
||||
data=record_data,
|
||||
)
|
||||
except IntegrityError as e:
|
||||
# Duplicate chat completion IDs can be generated during tests especially if they are replaying
|
||||
# recorded responses across different tests. No need to warn or error under those circumstances.
|
||||
# In the wild, this is not likely to happen at all (no evidence) so we aren't really hiding any problem.
|
||||
|
||||
# Check if it's a unique constraint violation
|
||||
error_message = str(e.orig) if e.orig else str(e)
|
||||
if self._is_unique_constraint_error(error_message):
|
||||
# Update the existing record instead
|
||||
await self.sql_store.update(table="chat_completions", data=record_data, where={"id": data["id"]})
|
||||
else:
|
||||
# Re-raise if it's not a unique constraint error
|
||||
raise
|
||||
|
||||
def _is_unique_constraint_error(self, error_message: str) -> bool:
|
||||
"""Check if the error is specifically a unique constraint violation."""
|
||||
error_lower = error_message.lower()
|
||||
return any(
|
||||
indicator in error_lower
|
||||
for indicator in [
|
||||
"unique constraint failed", # SQLite
|
||||
"duplicate key", # PostgreSQL
|
||||
"unique violation", # PostgreSQL alternative
|
||||
"duplicate entry", # MySQL
|
||||
]
|
||||
)
|
||||
|
||||
async def list_chat_completions(
|
||||
|
@ -92,7 +202,6 @@ class InferenceStore:
|
|||
order_by=[("created", order.value)],
|
||||
cursor=("id", after) if after else None,
|
||||
limit=limit,
|
||||
policy=self.policy,
|
||||
)
|
||||
|
||||
data = [
|
||||
|
@ -119,7 +228,6 @@ class InferenceStore:
|
|||
row = await self.sql_store.fetch_one(
|
||||
table="chat_completions",
|
||||
where={"id": completion_id},
|
||||
policy=self.policy,
|
||||
)
|
||||
|
||||
if not row:
|
||||
|
|
|
@ -40,7 +40,7 @@ from llama_stack.apis.inference import (
|
|||
)
|
||||
from llama_stack.core.request_headers import NeedsRequestProviderData
|
||||
from llama_stack.log import get_logger
|
||||
from llama_stack.providers.utils.inference.model_registry import ModelRegistryHelper
|
||||
from llama_stack.providers.utils.inference.model_registry import ModelRegistryHelper, ProviderModelEntry
|
||||
from llama_stack.providers.utils.inference.openai_compat import (
|
||||
b64_encode_openai_embeddings_response,
|
||||
convert_message_to_openai_dict_new,
|
||||
|
@ -67,10 +67,10 @@ class LiteLLMOpenAIMixin(
|
|||
# when calling litellm.
|
||||
def __init__(
|
||||
self,
|
||||
model_entries,
|
||||
litellm_provider_name: str,
|
||||
api_key_from_config: str | None,
|
||||
provider_data_api_key_field: str,
|
||||
model_entries: list[ProviderModelEntry] | None = None,
|
||||
openai_compat_api_base: str | None = None,
|
||||
download_images: bool = False,
|
||||
json_schema_strict: bool = True,
|
||||
|
@ -86,7 +86,7 @@ class LiteLLMOpenAIMixin(
|
|||
:param download_images: Whether to download images and convert to base64 for message conversion.
|
||||
:param json_schema_strict: Whether to use strict mode for JSON schema validation.
|
||||
"""
|
||||
ModelRegistryHelper.__init__(self, model_entries)
|
||||
ModelRegistryHelper.__init__(self, model_entries=model_entries)
|
||||
|
||||
self.litellm_provider_name = litellm_provider_name
|
||||
self.api_key_from_config = api_key_from_config
|
||||
|
|
|
@ -11,7 +11,6 @@ from pydantic import BaseModel, Field
|
|||
from llama_stack.apis.common.errors import UnsupportedModelError
|
||||
from llama_stack.apis.models import ModelType
|
||||
from llama_stack.log import get_logger
|
||||
from llama_stack.models.llama.sku_list import all_registered_models
|
||||
from llama_stack.providers.datatypes import Model, ModelsProtocolPrivate
|
||||
from llama_stack.providers.utils.inference import (
|
||||
ALL_HUGGINGFACE_REPOS_TO_MODEL_DESCRIPTOR,
|
||||
|
@ -37,13 +36,6 @@ class ProviderModelEntry(BaseModel):
|
|||
metadata: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
def get_huggingface_repo(model_descriptor: str) -> str | None:
|
||||
for model in all_registered_models():
|
||||
if model.descriptor() == model_descriptor:
|
||||
return model.huggingface_repo
|
||||
return None
|
||||
|
||||
|
||||
def build_hf_repo_model_entry(
|
||||
provider_model_id: str,
|
||||
model_descriptor: str,
|
||||
|
@ -63,25 +55,20 @@ def build_hf_repo_model_entry(
|
|||
)
|
||||
|
||||
|
||||
def build_model_entry(provider_model_id: str, model_descriptor: str) -> ProviderModelEntry:
|
||||
return ProviderModelEntry(
|
||||
provider_model_id=provider_model_id,
|
||||
aliases=[],
|
||||
llama_model=model_descriptor,
|
||||
model_type=ModelType.llm,
|
||||
)
|
||||
|
||||
|
||||
class ModelRegistryHelper(ModelsProtocolPrivate):
|
||||
__provider_id__: str
|
||||
|
||||
def __init__(self, model_entries: list[ProviderModelEntry], allowed_models: list[str] | None = None):
|
||||
self.model_entries = model_entries
|
||||
def __init__(
|
||||
self,
|
||||
model_entries: list[ProviderModelEntry] | None = None,
|
||||
allowed_models: list[str] | None = None,
|
||||
):
|
||||
self.allowed_models = allowed_models
|
||||
|
||||
self.alias_to_provider_id_map = {}
|
||||
self.provider_id_to_llama_model_map = {}
|
||||
for entry in model_entries:
|
||||
self.model_entries = model_entries or []
|
||||
for entry in self.model_entries:
|
||||
for alias in entry.aliases:
|
||||
self.alias_to_provider_id_map[alias] = entry.provider_model_id
|
||||
|
||||
|
@ -103,7 +90,7 @@ class ModelRegistryHelper(ModelsProtocolPrivate):
|
|||
Model(
|
||||
identifier=id,
|
||||
provider_resource_id=entry.provider_model_id,
|
||||
model_type=ModelType.llm,
|
||||
model_type=entry.model_type,
|
||||
metadata=entry.metadata,
|
||||
provider_id=self.__provider_id__,
|
||||
)
|
||||
|
|
|
@ -4,11 +4,11 @@
|
|||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
import uuid
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import AsyncIterator
|
||||
from typing import Any
|
||||
|
||||
import openai
|
||||
from openai import NOT_GIVEN, AsyncOpenAI
|
||||
|
||||
from llama_stack.apis.inference import (
|
||||
|
@ -22,13 +22,15 @@ from llama_stack.apis.inference import (
|
|||
OpenAIMessageParam,
|
||||
OpenAIResponseFormatParam,
|
||||
)
|
||||
from llama_stack.apis.models import ModelType
|
||||
from llama_stack.log import get_logger
|
||||
from llama_stack.providers.utils.inference.model_registry import ModelRegistryHelper
|
||||
from llama_stack.providers.utils.inference.openai_compat import prepare_openai_completion_params
|
||||
|
||||
logger = get_logger(name=__name__, category="providers::utils")
|
||||
|
||||
|
||||
class OpenAIMixin(ABC):
|
||||
class OpenAIMixin(ModelRegistryHelper, ABC):
|
||||
"""
|
||||
Mixin class that provides OpenAI-specific functionality for inference providers.
|
||||
This class handles direct OpenAI API calls using the AsyncOpenAI client.
|
||||
|
@ -43,6 +45,24 @@ class OpenAIMixin(ABC):
|
|||
The model_store is set in routing_tables/common.py during provider initialization.
|
||||
"""
|
||||
|
||||
# Allow subclasses to control whether to overwrite the 'id' field in OpenAI responses
|
||||
# is overwritten with a client-side generated id.
|
||||
#
|
||||
# This is useful for providers that do not return a unique id in the response.
|
||||
overwrite_completion_id: bool = False
|
||||
|
||||
# Embedding model metadata for this provider
|
||||
# Can be set by subclasses or instances to provide embedding models
|
||||
# Format: {"model_id": {"embedding_dimension": 1536, "context_length": 8192}}
|
||||
embedding_model_metadata: dict[str, dict[str, int]] = {}
|
||||
|
||||
# Cache of available models keyed by model ID
|
||||
# This is set in list_models() and used in check_model_availability()
|
||||
_model_cache: dict[str, Model] = {}
|
||||
|
||||
# List of allowed models for this provider, if empty all models allowed
|
||||
allowed_models: list[str] = []
|
||||
|
||||
@abstractmethod
|
||||
def get_api_key(self) -> str:
|
||||
"""
|
||||
|
@ -67,6 +87,17 @@ class OpenAIMixin(ABC):
|
|||
"""
|
||||
pass
|
||||
|
||||
def get_extra_client_params(self) -> dict[str, Any]:
|
||||
"""
|
||||
Get any extra parameters to pass to the AsyncOpenAI client.
|
||||
|
||||
Child classes can override this method to provide additional parameters
|
||||
such as timeout settings, proxies, etc.
|
||||
|
||||
:return: A dictionary of extra parameters
|
||||
"""
|
||||
return {}
|
||||
|
||||
@property
|
||||
def client(self) -> AsyncOpenAI:
|
||||
"""
|
||||
|
@ -78,6 +109,7 @@ class OpenAIMixin(ABC):
|
|||
return AsyncOpenAI(
|
||||
api_key=self.get_api_key(),
|
||||
base_url=self.get_base_url(),
|
||||
**self.get_extra_client_params(),
|
||||
)
|
||||
|
||||
async def _get_provider_model_id(self, model: str) -> str:
|
||||
|
@ -98,6 +130,23 @@ class OpenAIMixin(ABC):
|
|||
raise ValueError(f"Model {model} has no provider_resource_id")
|
||||
return model_obj.provider_resource_id
|
||||
|
||||
async def _maybe_overwrite_id(self, resp: Any, stream: bool | None) -> Any:
|
||||
if not self.overwrite_completion_id:
|
||||
return resp
|
||||
|
||||
new_id = f"cltsd-{uuid.uuid4()}"
|
||||
if stream:
|
||||
|
||||
async def _gen():
|
||||
async for chunk in resp:
|
||||
chunk.id = new_id
|
||||
yield chunk
|
||||
|
||||
return _gen()
|
||||
else:
|
||||
resp.id = new_id
|
||||
return resp
|
||||
|
||||
async def openai_completion(
|
||||
self,
|
||||
model: str,
|
||||
|
@ -124,13 +173,18 @@ class OpenAIMixin(ABC):
|
|||
"""
|
||||
Direct OpenAI completion API call.
|
||||
"""
|
||||
if guided_choice is not None:
|
||||
logger.warning("guided_choice is not supported by the OpenAI API. Ignoring.")
|
||||
if prompt_logprobs is not None:
|
||||
logger.warning("prompt_logprobs is not supported by the OpenAI API. Ignoring.")
|
||||
# Handle parameters that are not supported by OpenAI API, but may be by the provider
|
||||
# prompt_logprobs is supported by vLLM
|
||||
# guided_choice is supported by vLLM
|
||||
# TODO: test coverage
|
||||
extra_body: dict[str, Any] = {}
|
||||
if prompt_logprobs is not None and prompt_logprobs >= 0:
|
||||
extra_body["prompt_logprobs"] = prompt_logprobs
|
||||
if guided_choice:
|
||||
extra_body["guided_choice"] = guided_choice
|
||||
|
||||
# TODO: fix openai_completion to return type compatible with OpenAI's API response
|
||||
return await self.client.completions.create( # type: ignore[no-any-return]
|
||||
resp = await self.client.completions.create(
|
||||
**await prepare_openai_completion_params(
|
||||
model=await self._get_provider_model_id(model),
|
||||
prompt=prompt,
|
||||
|
@ -150,9 +204,12 @@ class OpenAIMixin(ABC):
|
|||
top_p=top_p,
|
||||
user=user,
|
||||
suffix=suffix,
|
||||
)
|
||||
),
|
||||
extra_body=extra_body,
|
||||
)
|
||||
|
||||
return await self._maybe_overwrite_id(resp, stream) # type: ignore[no-any-return]
|
||||
|
||||
async def openai_chat_completion(
|
||||
self,
|
||||
model: str,
|
||||
|
@ -182,8 +239,7 @@ class OpenAIMixin(ABC):
|
|||
"""
|
||||
Direct OpenAI chat completion API call.
|
||||
"""
|
||||
# Type ignore because return types are compatible
|
||||
return await self.client.chat.completions.create( # type: ignore[no-any-return]
|
||||
resp = await self.client.chat.completions.create(
|
||||
**await prepare_openai_completion_params(
|
||||
model=await self._get_provider_model_id(model),
|
||||
messages=messages,
|
||||
|
@ -211,6 +267,8 @@ class OpenAIMixin(ABC):
|
|||
)
|
||||
)
|
||||
|
||||
return await self._maybe_overwrite_id(resp, stream) # type: ignore[no-any-return]
|
||||
|
||||
async def openai_embeddings(
|
||||
self,
|
||||
model: str,
|
||||
|
@ -247,26 +305,53 @@ class OpenAIMixin(ABC):
|
|||
|
||||
return OpenAIEmbeddingsResponse(
|
||||
data=data,
|
||||
model=response.model,
|
||||
model=model,
|
||||
usage=usage,
|
||||
)
|
||||
|
||||
async def list_models(self) -> list[Model] | None:
|
||||
"""
|
||||
List available models from the provider's /v1/models endpoint augmented with static embedding model metadata.
|
||||
|
||||
Also, caches the models in self._model_cache for use in check_model_availability().
|
||||
|
||||
:return: A list of Model instances representing available models.
|
||||
"""
|
||||
self._model_cache = {}
|
||||
|
||||
async for m in self.client.models.list():
|
||||
if self.allowed_models and m.id not in self.allowed_models:
|
||||
logger.info(f"Skipping model {m.id} as it is not in the allowed models list")
|
||||
continue
|
||||
if metadata := self.embedding_model_metadata.get(m.id):
|
||||
# This is an embedding model - augment with metadata
|
||||
model = Model(
|
||||
provider_id=self.__provider_id__, # type: ignore[attr-defined]
|
||||
provider_resource_id=m.id,
|
||||
identifier=m.id,
|
||||
model_type=ModelType.embedding,
|
||||
metadata=metadata,
|
||||
)
|
||||
else:
|
||||
# This is an LLM
|
||||
model = Model(
|
||||
provider_id=self.__provider_id__, # type: ignore[attr-defined]
|
||||
provider_resource_id=m.id,
|
||||
identifier=m.id,
|
||||
model_type=ModelType.llm,
|
||||
)
|
||||
self._model_cache[m.id] = model
|
||||
|
||||
return list(self._model_cache.values())
|
||||
|
||||
async def check_model_availability(self, model: str) -> bool:
|
||||
"""
|
||||
Check if a specific model is available from OpenAI.
|
||||
Check if a specific model is available from the provider's /v1/models.
|
||||
|
||||
:param model: The model identifier to check.
|
||||
:return: True if the model is available dynamically, False otherwise.
|
||||
"""
|
||||
try:
|
||||
# Direct model lookup - returns model or raises NotFoundError
|
||||
await self.client.models.retrieve(model)
|
||||
return True
|
||||
except openai.NotFoundError:
|
||||
# Model doesn't exist - this is expected for unavailable models
|
||||
pass
|
||||
except Exception as e:
|
||||
# All other errors (auth, rate limit, network, etc.)
|
||||
logger.warning(f"Failed to check model availability for {model}: {e}")
|
||||
if not self._model_cache:
|
||||
await self.list_models()
|
||||
|
||||
return False
|
||||
return model in self._model_cache
|
||||
|
|
|
@ -28,8 +28,7 @@ class ResponsesStore:
|
|||
sql_store_config = SqliteSqlStoreConfig(
|
||||
db_path=(RUNTIME_BASE_DIR / "sqlstore.db").as_posix(),
|
||||
)
|
||||
self.sql_store = AuthorizedSqlStore(sqlstore_impl(sql_store_config))
|
||||
self.policy = policy
|
||||
self.sql_store = AuthorizedSqlStore(sqlstore_impl(sql_store_config), policy)
|
||||
|
||||
async def initialize(self):
|
||||
"""Create the necessary tables if they don't exist."""
|
||||
|
@ -87,7 +86,6 @@ class ResponsesStore:
|
|||
order_by=[("created_at", order.value)],
|
||||
cursor=("id", after) if after else None,
|
||||
limit=limit,
|
||||
policy=self.policy,
|
||||
)
|
||||
|
||||
data = [OpenAIResponseObjectWithInput(**row["response_object"]) for row in paginated_result.data]
|
||||
|
@ -105,7 +103,6 @@ class ResponsesStore:
|
|||
row = await self.sql_store.fetch_one(
|
||||
"openai_responses",
|
||||
where={"id": response_id},
|
||||
policy=self.policy,
|
||||
)
|
||||
|
||||
if not row:
|
||||
|
@ -116,7 +113,7 @@ class ResponsesStore:
|
|||
return OpenAIResponseObjectWithInput(**row["response_object"])
|
||||
|
||||
async def delete_response_object(self, response_id: str) -> OpenAIDeleteResponseObject:
|
||||
row = await self.sql_store.fetch_one("openai_responses", where={"id": response_id}, policy=self.policy)
|
||||
row = await self.sql_store.fetch_one("openai_responses", where={"id": response_id})
|
||||
if not row:
|
||||
raise ValueError(f"Response with id {response_id} not found")
|
||||
await self.sql_store.delete("openai_responses", where={"id": response_id})
|
||||
|
|
|
@ -53,13 +53,15 @@ class AuthorizedSqlStore:
|
|||
access control policies, user attribute capture, and SQL filtering optimization.
|
||||
"""
|
||||
|
||||
def __init__(self, sql_store: SqlStore):
|
||||
def __init__(self, sql_store: SqlStore, policy: list[AccessRule]):
|
||||
"""
|
||||
Initialize the authorization layer.
|
||||
|
||||
:param sql_store: Base SqlStore implementation to wrap
|
||||
:param policy: Access control policy to use for authorization
|
||||
"""
|
||||
self.sql_store = sql_store
|
||||
self.policy = policy
|
||||
self._detect_database_type()
|
||||
self._validate_sql_optimized_policy()
|
||||
|
||||
|
@ -117,14 +119,13 @@ class AuthorizedSqlStore:
|
|||
async def fetch_all(
|
||||
self,
|
||||
table: str,
|
||||
policy: list[AccessRule],
|
||||
where: Mapping[str, Any] | None = None,
|
||||
limit: int | None = None,
|
||||
order_by: list[tuple[str, Literal["asc", "desc"]]] | None = None,
|
||||
cursor: tuple[str, str] | None = None,
|
||||
) -> PaginatedResponse:
|
||||
"""Fetch all rows with automatic access control filtering."""
|
||||
access_where = self._build_access_control_where_clause(policy)
|
||||
access_where = self._build_access_control_where_clause(self.policy)
|
||||
rows = await self.sql_store.fetch_all(
|
||||
table=table,
|
||||
where=where,
|
||||
|
@ -146,7 +147,7 @@ class AuthorizedSqlStore:
|
|||
str(record_id), table, User(principal=stored_owner_principal, attributes=stored_access_attrs)
|
||||
)
|
||||
|
||||
if is_action_allowed(policy, Action.READ, sql_record, current_user):
|
||||
if is_action_allowed(self.policy, Action.READ, sql_record, current_user):
|
||||
filtered_rows.append(row)
|
||||
|
||||
return PaginatedResponse(
|
||||
|
@ -157,14 +158,12 @@ class AuthorizedSqlStore:
|
|||
async def fetch_one(
|
||||
self,
|
||||
table: str,
|
||||
policy: list[AccessRule],
|
||||
where: Mapping[str, Any] | None = None,
|
||||
order_by: list[tuple[str, Literal["asc", "desc"]]] | None = None,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Fetch one row with automatic access control checking."""
|
||||
results = await self.fetch_all(
|
||||
table=table,
|
||||
policy=policy,
|
||||
where=where,
|
||||
limit=1,
|
||||
order_by=order_by,
|
||||
|
@ -172,6 +171,20 @@ class AuthorizedSqlStore:
|
|||
|
||||
return results.data[0] if results.data else None
|
||||
|
||||
async def update(self, table: str, data: Mapping[str, Any], where: Mapping[str, Any]) -> None:
|
||||
"""Update rows with automatic access control attribute capture."""
|
||||
enhanced_data = dict(data)
|
||||
|
||||
current_user = get_authenticated_user()
|
||||
if current_user:
|
||||
enhanced_data["owner_principal"] = current_user.principal
|
||||
enhanced_data["access_attributes"] = current_user.attributes
|
||||
else:
|
||||
enhanced_data["owner_principal"] = None
|
||||
enhanced_data["access_attributes"] = None
|
||||
|
||||
await self.sql_store.update(table, enhanced_data, where)
|
||||
|
||||
async def delete(self, table: str, where: Mapping[str, Any]) -> None:
|
||||
"""Delete rows with automatic access control filtering."""
|
||||
await self.sql_store.delete(table, where)
|
||||
|
|
|
@ -8,7 +8,7 @@ import asyncio
|
|||
import contextvars
|
||||
import logging # allow-direct-logging
|
||||
import queue
|
||||
import random
|
||||
import secrets
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
@ -18,6 +18,7 @@ from functools import wraps
|
|||
from typing import Any
|
||||
|
||||
from llama_stack.apis.telemetry import (
|
||||
Event,
|
||||
LogSeverity,
|
||||
Span,
|
||||
SpanEndPayload,
|
||||
|
@ -75,16 +76,16 @@ def span_id_to_str(span_id: int) -> str:
|
|||
|
||||
|
||||
def generate_span_id() -> str:
|
||||
span_id = random.getrandbits(64)
|
||||
span_id = secrets.randbits(64)
|
||||
while span_id == INVALID_SPAN_ID:
|
||||
span_id = random.getrandbits(64)
|
||||
span_id = secrets.randbits(64)
|
||||
return span_id_to_str(span_id)
|
||||
|
||||
|
||||
def generate_trace_id() -> str:
|
||||
trace_id = random.getrandbits(128)
|
||||
trace_id = secrets.randbits(128)
|
||||
while trace_id == INVALID_TRACE_ID:
|
||||
trace_id = random.getrandbits(128)
|
||||
trace_id = secrets.randbits(128)
|
||||
return trace_id_to_str(trace_id)
|
||||
|
||||
|
||||
|
@ -98,7 +99,7 @@ class BackgroundLogger:
|
|||
def __init__(self, api: Telemetry, capacity: int = 100000):
|
||||
self.api = api
|
||||
self.log_queue: queue.Queue[Any] = queue.Queue(maxsize=capacity)
|
||||
self.worker_thread = threading.Thread(target=self._process_logs, daemon=True)
|
||||
self.worker_thread = threading.Thread(target=self._worker, daemon=True)
|
||||
self.worker_thread.start()
|
||||
self._last_queue_full_log_time: float = 0.0
|
||||
self._dropped_since_last_notice: int = 0
|
||||
|
@ -118,12 +119,16 @@ class BackgroundLogger:
|
|||
self._last_queue_full_log_time = current_time
|
||||
self._dropped_since_last_notice = 0
|
||||
|
||||
def _process_logs(self):
|
||||
def _worker(self):
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_until_complete(self._process_logs())
|
||||
|
||||
async def _process_logs(self):
|
||||
while True:
|
||||
try:
|
||||
event = self.log_queue.get()
|
||||
# figure out how to use a thread's native loop
|
||||
asyncio.run(self.api.log_event(event))
|
||||
await self.api.log_event(event)
|
||||
except Exception:
|
||||
import traceback
|
||||
|
||||
|
@ -136,6 +141,19 @@ class BackgroundLogger:
|
|||
self.log_queue.join()
|
||||
|
||||
|
||||
def enqueue_event(event: Event) -> None:
|
||||
"""Enqueue a telemetry event to the background logger if available.
|
||||
|
||||
This provides a non-blocking path for routers and other hot paths to
|
||||
submit telemetry without awaiting the Telemetry API, reducing contention
|
||||
with the main event loop.
|
||||
"""
|
||||
global BACKGROUND_LOGGER
|
||||
if BACKGROUND_LOGGER is None:
|
||||
raise RuntimeError("Telemetry API not initialized")
|
||||
BACKGROUND_LOGGER.log_event(event)
|
||||
|
||||
|
||||
class TraceContext:
|
||||
spans: list[Span] = []
|
||||
|
||||
|
@ -256,11 +274,7 @@ class TelemetryHandler(logging.Handler):
|
|||
if record.module in ("asyncio", "selector_events"):
|
||||
return
|
||||
|
||||
global CURRENT_TRACE_CONTEXT, BACKGROUND_LOGGER
|
||||
|
||||
if BACKGROUND_LOGGER is None:
|
||||
raise RuntimeError("Telemetry API not initialized")
|
||||
|
||||
global CURRENT_TRACE_CONTEXT
|
||||
context = CURRENT_TRACE_CONTEXT.get()
|
||||
if context is None:
|
||||
return
|
||||
|
@ -269,7 +283,7 @@ class TelemetryHandler(logging.Handler):
|
|||
if span is None:
|
||||
return
|
||||
|
||||
BACKGROUND_LOGGER.log_event(
|
||||
enqueue_event(
|
||||
UnstructuredLogEvent(
|
||||
trace_id=span.trace_id,
|
||||
span_id=span.span_id,
|
||||
|
|
|
@ -67,6 +67,38 @@ async def client_wrapper(endpoint: str, headers: dict[str, str]) -> AsyncGenerat
|
|||
raise AuthenticationRequiredError(exc) from exc
|
||||
if i == len(connection_strategies) - 1:
|
||||
raise
|
||||
except* httpx.ConnectError as eg:
|
||||
# Connection refused, server down, network unreachable
|
||||
if i == len(connection_strategies) - 1:
|
||||
error_msg = f"Failed to connect to MCP server at {endpoint}: Connection refused"
|
||||
logger.error(f"MCP connection error: {error_msg}")
|
||||
raise ConnectionError(error_msg) from eg
|
||||
else:
|
||||
logger.warning(
|
||||
f"failed to connect to MCP server at {endpoint} via {strategy.name}, falling back to {connection_strategies[i + 1].name}"
|
||||
)
|
||||
except* httpx.TimeoutException as eg:
|
||||
# Request timeout, server too slow
|
||||
if i == len(connection_strategies) - 1:
|
||||
error_msg = f"MCP server at {endpoint} timed out"
|
||||
logger.error(f"MCP timeout error: {error_msg}")
|
||||
raise TimeoutError(error_msg) from eg
|
||||
else:
|
||||
logger.warning(
|
||||
f"MCP server at {endpoint} timed out via {strategy.name}, falling back to {connection_strategies[i + 1].name}"
|
||||
)
|
||||
except* httpx.RequestError as eg:
|
||||
# DNS resolution failures, network errors, invalid URLs
|
||||
if i == len(connection_strategies) - 1:
|
||||
# Get the first exception's message for the error string
|
||||
exc_msg = str(eg.exceptions[0]) if eg.exceptions else "Unknown error"
|
||||
error_msg = f"Network error connecting to MCP server at {endpoint}: {exc_msg}"
|
||||
logger.error(f"MCP network error: {error_msg}")
|
||||
raise ConnectionError(error_msg) from eg
|
||||
else:
|
||||
logger.warning(
|
||||
f"network error connecting to MCP server at {endpoint} via {strategy.name}, falling back to {connection_strategies[i + 1].name}"
|
||||
)
|
||||
except* McpError:
|
||||
if i < len(connection_strategies) - 1:
|
||||
logger.warning(
|
||||
|
|
|
@ -12,14 +12,12 @@ import uuid
|
|||
def generate_chunk_id(document_id: str, chunk_text: str, chunk_window: str | None = None) -> str:
|
||||
"""
|
||||
Generate a unique chunk ID using a hash of the document ID and chunk text.
|
||||
|
||||
Note: MD5 is used only to calculate an identifier, not for security purposes.
|
||||
Adding usedforsecurity=False for compatibility with FIPS environments.
|
||||
Then use the first 32 characters of the hash to create a UUID.
|
||||
"""
|
||||
hash_input = f"{document_id}:{chunk_text}".encode()
|
||||
if chunk_window:
|
||||
hash_input += f":{chunk_window}".encode()
|
||||
return str(uuid.UUID(hashlib.md5(hash_input, usedforsecurity=False).hexdigest()))
|
||||
return str(uuid.UUID(hashlib.sha256(hash_input).hexdigest()[:32]))
|
||||
|
||||
|
||||
def proper_case(s: str) -> str:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue