Merge branch 'main' into hide-non-openai-inference-apis

This commit is contained in:
Matthew Farrellee 2025-09-26 17:48:30 -04:00
commit 0e78cd5383
33 changed files with 2394 additions and 1723 deletions

View file

@ -1,7 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from .batch_inference import *

View file

@ -1,79 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from typing import Protocol, runtime_checkable
from llama_stack.apis.common.job_types import Job
from llama_stack.apis.inference import (
InterleavedContent,
LogProbConfig,
Message,
ResponseFormat,
SamplingParams,
ToolChoice,
ToolDefinition,
ToolPromptFormat,
)
from llama_stack.apis.version import LLAMA_STACK_API_V1
from llama_stack.schema_utils import webmethod
@runtime_checkable
class BatchInference(Protocol):
"""Batch inference API for generating completions and chat completions.
This is an asynchronous API. If the request is successful, the response will be a job which can be polled for completion.
NOTE: This API is not yet implemented and is subject to change in concert with other asynchronous APIs
including (post-training, evals, etc).
"""
@webmethod(route="/batch-inference/completion", method="POST", level=LLAMA_STACK_API_V1)
async def completion(
self,
model: str,
content_batch: list[InterleavedContent],
sampling_params: SamplingParams | None = None,
response_format: ResponseFormat | None = None,
logprobs: LogProbConfig | None = None,
) -> Job:
"""Generate completions for a batch of content.
:param model: The model to use for the completion.
:param content_batch: The content to complete.
:param sampling_params: The sampling parameters to use for the completion.
:param response_format: The response format to use for the completion.
:param logprobs: The logprobs to use for the completion.
:returns: A job for the completion.
"""
...
@webmethod(route="/batch-inference/chat-completion", method="POST", level=LLAMA_STACK_API_V1)
async def chat_completion(
self,
model: str,
messages_batch: list[list[Message]],
sampling_params: SamplingParams | None = None,
# zero-shot tool definitions as input to the model
tools: list[ToolDefinition] | None = None,
tool_choice: ToolChoice | None = ToolChoice.auto,
tool_prompt_format: ToolPromptFormat | None = None,
response_format: ResponseFormat | None = None,
logprobs: LogProbConfig | None = None,
) -> Job:
"""Generate chat completions for a batch of messages.
:param model: The model to use for the chat completion.
:param messages_batch: The messages to complete.
:param sampling_params: The sampling parameters to use for the completion.
:param tools: The tools to use for the chat completion.
:param tool_choice: The tool choice to use for the chat completion.
:param tool_prompt_format: The tool prompt format to use for the chat completion.
:param response_format: The response format to use for the chat completion.
:param logprobs: The logprobs to use for the chat completion.
:returns: A job for the chat completion.
"""
...

View file

@ -914,6 +914,7 @@ class OpenAIEmbeddingData(BaseModel):
"""
object: Literal["embedding"] = "embedding"
# TODO: consider dropping str and using openai.types.embeddings.Embedding instead of OpenAIEmbeddingData
embedding: list[float] | str
index: int
@ -974,26 +975,6 @@ class EmbeddingTaskType(Enum):
document = "document"
@json_schema_type
class BatchCompletionResponse(BaseModel):
"""Response from a batch completion request.
:param batch: List of completion responses, one for each input in the batch
"""
batch: list[CompletionResponse]
@json_schema_type
class BatchChatCompletionResponse(BaseModel):
"""Response from a batch chat completion request.
:param batch: List of chat completion responses, one for each conversation in the batch
"""
batch: list[ChatCompletionResponse]
class OpenAICompletionWithInputMessages(OpenAIChatCompletion):
input_messages: list[OpenAIMessageParam]
@ -1049,26 +1030,7 @@ class InferenceProvider(Protocol):
"""
...
async def batch_completion(
self,
model_id: str,
content_batch: list[InterleavedContent],
sampling_params: SamplingParams | None = None,
response_format: ResponseFormat | None = None,
logprobs: LogProbConfig | None = None,
) -> BatchCompletionResponse:
"""Generate completions for a batch of content using the specified model.
:param model_id: The identifier of the model to use. The model must be registered with Llama Stack and available via the /models endpoint.
:param content_batch: The content to generate completions for.
:param sampling_params: (Optional) Parameters to control the sampling strategy.
:param response_format: (Optional) Grammar specification for guided (structured) decoding.
:param logprobs: (Optional) If specified, log probabilities for each token position will be returned.
:returns: A BatchCompletionResponse with the full completions.
"""
raise NotImplementedError("Batch completion is not implemented")
return # this is so mypy's safe-super rule will consider the method concrete
@webmethod(route="/inference/chat-completion", method="POST", level=LLAMA_STACK_API_V1)
async def chat_completion(
self,
model_id: str,
@ -1108,30 +1070,7 @@ class InferenceProvider(Protocol):
"""
...
async def batch_chat_completion(
self,
model_id: str,
messages_batch: list[list[Message]],
sampling_params: SamplingParams | None = None,
tools: list[ToolDefinition] | None = None,
tool_config: ToolConfig | None = None,
response_format: ResponseFormat | None = None,
logprobs: LogProbConfig | None = None,
) -> BatchChatCompletionResponse:
"""Generate chat completions for a batch of messages using the specified model.
:param model_id: The identifier of the model to use. The model must be registered with Llama Stack and available via the /models endpoint.
:param messages_batch: The messages to generate completions for.
:param sampling_params: (Optional) Parameters to control the sampling strategy.
:param tools: (Optional) List of tool definitions available to the model.
:param tool_config: (Optional) Configuration for tool use.
:param response_format: (Optional) Grammar specification for guided (structured) decoding.
:param logprobs: (Optional) If specified, log probabilities for each token position will be returned.
:returns: A BatchChatCompletionResponse with the full completions.
"""
raise NotImplementedError("Batch chat completion is not implemented")
return # this is so mypy's safe-super rule will consider the method concrete
@webmethod(route="/inference/embeddings", method="POST", level=LLAMA_STACK_API_V1)
async def embeddings(
self,
model_id: str,

View file

@ -20,8 +20,6 @@ from llama_stack.apis.common.content_types import (
)
from llama_stack.apis.common.errors import ModelNotFoundError, ModelTypeError
from llama_stack.apis.inference import (
BatchChatCompletionResponse,
BatchCompletionResponse,
ChatCompletionResponse,
ChatCompletionResponseEventType,
ChatCompletionResponseStreamChunk,
@ -273,30 +271,6 @@ class InferenceRouter(Inference):
)
return response
async def batch_chat_completion(
self,
model_id: str,
messages_batch: list[list[Message]],
tools: list[ToolDefinition] | None = None,
tool_config: ToolConfig | None = None,
sampling_params: SamplingParams | None = None,
response_format: ResponseFormat | None = None,
logprobs: LogProbConfig | None = None,
) -> BatchChatCompletionResponse:
logger.debug(
f"InferenceRouter.batch_chat_completion: {model_id=}, {len(messages_batch)=}, {sampling_params=}, {response_format=}, {logprobs=}",
)
provider = await self.routing_table.get_provider_impl(model_id)
return await provider.batch_chat_completion(
model_id=model_id,
messages_batch=messages_batch,
tools=tools,
tool_config=tool_config,
sampling_params=sampling_params,
response_format=response_format,
logprobs=logprobs,
)
async def completion(
self,
model_id: str,
@ -338,20 +312,6 @@ class InferenceRouter(Inference):
return response
async def batch_completion(
self,
model_id: str,
content_batch: list[InterleavedContent],
sampling_params: SamplingParams | None = None,
response_format: ResponseFormat | None = None,
logprobs: LogProbConfig | None = None,
) -> BatchCompletionResponse:
logger.debug(
f"InferenceRouter.batch_completion: {model_id=}, {len(content_batch)=}, {sampling_params=}, {response_format=}, {logprobs=}",
)
provider = await self.routing_table.get_provider_impl(model_id)
return await provider.batch_completion(model_id, content_batch, sampling_params, response_format, logprobs)
async def embeddings(
self,
model_id: str,

View file

@ -9,7 +9,7 @@ from typing import Any
from llama_stack.apis.common.content_types import URL
from llama_stack.apis.common.errors import ToolGroupNotFoundError
from llama_stack.apis.tools import ListToolGroupsResponse, ListToolsResponse, Tool, ToolGroup, ToolGroups
from llama_stack.core.datatypes import ToolGroupWithOwner
from llama_stack.core.datatypes import AuthenticationRequiredError, ToolGroupWithOwner
from llama_stack.log import get_logger
from .common import CommonRoutingTableImpl
@ -54,7 +54,18 @@ class ToolGroupsRoutingTable(CommonRoutingTableImpl, ToolGroups):
all_tools = []
for toolgroup in toolgroups:
if toolgroup.identifier not in self.toolgroups_to_tools:
await self._index_tools(toolgroup)
try:
await self._index_tools(toolgroup)
except AuthenticationRequiredError:
# Send authentication errors back to the client so it knows
# that it needs to supply credentials for remote MCP servers.
raise
except Exception as e:
# Other errors that the client cannot fix are logged and
# those specific toolgroups are skipped.
logger.warning(f"Error listing tools for toolgroup {toolgroup.identifier}: {e}")
logger.debug(e, exc_info=True)
continue
all_tools.extend(self.toolgroups_to_tools[toolgroup.identifier])
return ListToolsResponse(data=all_tools)

View file

@ -14,7 +14,6 @@ from typing import Any
import yaml
from llama_stack.apis.agents import Agents
from llama_stack.apis.batch_inference import BatchInference
from llama_stack.apis.benchmarks import Benchmarks
from llama_stack.apis.datasetio import DatasetIO
from llama_stack.apis.datasets import Datasets
@ -54,7 +53,6 @@ class LlamaStack(
Providers,
VectorDBs,
Inference,
BatchInference,
Agents,
Safety,
SyntheticDataGeneration,

View file

@ -96,11 +96,9 @@ class DiskDistributionRegistry(DistributionRegistry):
async def register(self, obj: RoutableObjectWithProvider) -> bool:
existing_obj = await self.get(obj.type, obj.identifier)
# warn if the object's providerid is different but proceed with registration
if existing_obj and existing_obj.provider_id != obj.provider_id:
logger.warning(
f"Object {existing_obj.type}:{existing_obj.identifier}'s {existing_obj.provider_id} provider is being replaced with {obj.provider_id}"
)
# dont register if the object's providerid already exists
if existing_obj and existing_obj.provider_id == obj.provider_id:
return False
await self.kvstore.set(
KEY_FORMAT.format(type=obj.type, identifier=obj.identifier),

View file

@ -18,8 +18,6 @@ from llama_stack.apis.common.content_types import (
ToolCallParseStatus,
)
from llama_stack.apis.inference import (
BatchChatCompletionResponse,
BatchCompletionResponse,
ChatCompletionRequest,
ChatCompletionResponse,
ChatCompletionResponseEvent,
@ -219,41 +217,6 @@ class MetaReferenceInferenceImpl(
results = await self._nonstream_completion([request])
return results[0]
async def batch_completion(
self,
model_id: str,
content_batch: list[InterleavedContent],
sampling_params: SamplingParams | None = None,
response_format: ResponseFormat | None = None,
stream: bool | None = False,
logprobs: LogProbConfig | None = None,
) -> BatchCompletionResponse:
if sampling_params is None:
sampling_params = SamplingParams()
if logprobs:
assert logprobs.top_k == 1, f"Unexpected top_k={logprobs.top_k}"
content_batch = [
augment_content_with_response_format_prompt(response_format, content) for content in content_batch
]
request_batch = []
for content in content_batch:
request = CompletionRequest(
model=model_id,
content=content,
sampling_params=sampling_params,
response_format=response_format,
stream=stream,
logprobs=logprobs,
)
self.check_model(request)
request = await convert_request_to_raw(request)
request_batch.append(request)
results = await self._nonstream_completion(request_batch)
return BatchCompletionResponse(batch=results)
async def _stream_completion(self, request: CompletionRequest) -> AsyncGenerator:
tokenizer = self.generator.formatter.tokenizer
@ -399,49 +362,6 @@ class MetaReferenceInferenceImpl(
results = await self._nonstream_chat_completion([request])
return results[0]
async def batch_chat_completion(
self,
model_id: str,
messages_batch: list[list[Message]],
sampling_params: SamplingParams | None = None,
response_format: ResponseFormat | None = None,
tools: list[ToolDefinition] | None = None,
stream: bool | None = False,
logprobs: LogProbConfig | None = None,
tool_config: ToolConfig | None = None,
) -> BatchChatCompletionResponse:
if sampling_params is None:
sampling_params = SamplingParams()
if logprobs:
assert logprobs.top_k == 1, f"Unexpected top_k={logprobs.top_k}"
# wrapper request to make it easier to pass around (internal only, not exposed to API)
request_batch = []
for messages in messages_batch:
request = ChatCompletionRequest(
model=model_id,
messages=messages,
sampling_params=sampling_params,
tools=tools or [],
response_format=response_format,
logprobs=logprobs,
tool_config=tool_config or ToolConfig(),
)
self.check_model(request)
# augment and rewrite messages depending on the model
request.messages = chat_completion_request_to_messages(request, self.llama_model.core_model_id.value)
# download media and convert to raw content so we can send it to the model
request = await convert_request_to_raw(request)
request_batch.append(request)
if self.config.create_distributed_process_group:
if SEMAPHORE.locked():
raise RuntimeError("Only one concurrent request is supported")
results = await self._nonstream_chat_completion(request_batch)
return BatchChatCompletionResponse(batch=results)
async def _nonstream_chat_completion(
self, request_batch: list[ChatCompletionRequest]
) -> list[ChatCompletionResponse]:

View file

@ -24,7 +24,6 @@ from llama_stack.apis.inference import (
LogProbConfig,
Message,
Model,
ModelType,
OpenAICompletion,
ResponseFormat,
SamplingParams,
@ -34,6 +33,7 @@ from llama_stack.apis.inference import (
ToolDefinition,
ToolPromptFormat,
)
from llama_stack.apis.models import ModelType
from llama_stack.log import get_logger
from llama_stack.providers.utils.inference.openai_mixin import OpenAIMixin

View file

@ -64,6 +64,7 @@ class FireworksInferenceAdapter(OpenAIMixin, ModelRegistryHelper, Inference, Nee
}
def __init__(self, config: FireworksImplConfig) -> None:
ModelRegistryHelper.__init__(self)
self.config = config
self.allowed_models = config.allowed_models

View file

@ -4,12 +4,10 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from llama_stack.apis.inference import Inference
from .config import GroqConfig
async def get_adapter_impl(config: GroqConfig, _deps) -> Inference:
async def get_adapter_impl(config: GroqConfig, _deps):
# import dynamically so the import is used only when it is needed
from .groq import GroqInferenceAdapter

View file

@ -6,8 +6,7 @@
import asyncio
import base64
from collections.abc import AsyncGenerator, AsyncIterator
from collections.abc import AsyncGenerator
from typing import Any
from ollama import AsyncClient as AsyncOllamaClient
@ -33,10 +32,6 @@ from llama_stack.apis.inference import (
JsonSchemaResponseFormat,
LogProbConfig,
Message,
OpenAIChatCompletion,
OpenAIChatCompletionChunk,
OpenAIMessageParam,
OpenAIResponseFormatParam,
ResponseFormat,
SamplingParams,
TextTruncation,
@ -62,7 +57,6 @@ from llama_stack.providers.utils.inference.openai_compat import (
OpenAICompatCompletionChoice,
OpenAICompatCompletionResponse,
get_sampling_options,
prepare_openai_completion_params,
process_chat_completion_response,
process_chat_completion_stream_response,
process_completion_response,
@ -75,7 +69,6 @@ from llama_stack.providers.utils.inference.prompt_adapter import (
content_has_media,
convert_image_content_to_url,
interleaved_content_as_str,
localize_image_content,
request_has_media,
)
@ -84,6 +77,7 @@ logger = get_logger(name=__name__, category="inference::ollama")
class OllamaInferenceAdapter(
OpenAIMixin,
ModelRegistryHelper,
InferenceProvider,
ModelsProtocolPrivate,
):
@ -129,6 +123,8 @@ class OllamaInferenceAdapter(
],
)
self.config = config
# Ollama does not support image urls, so we need to download the image and convert it to base64
self.download_images = True
self._clients: dict[asyncio.AbstractEventLoop, AsyncOllamaClient] = {}
@property
@ -173,9 +169,6 @@ class OllamaInferenceAdapter(
async def shutdown(self) -> None:
self._clients.clear()
async def unregister_model(self, model_id: str) -> None:
pass
async def _get_model(self, model_id: str) -> Model:
if not self.model_store:
raise ValueError("Model store not set")
@ -403,75 +396,6 @@ class OllamaInferenceAdapter(
raise UnsupportedModelError(model.provider_model_id, list(self._model_cache.keys()))
async def openai_chat_completion(
self,
model: str,
messages: list[OpenAIMessageParam],
frequency_penalty: float | None = None,
function_call: str | dict[str, Any] | None = None,
functions: list[dict[str, Any]] | None = None,
logit_bias: dict[str, float] | None = None,
logprobs: bool | None = None,
max_completion_tokens: int | None = None,
max_tokens: int | None = None,
n: int | None = None,
parallel_tool_calls: bool | None = None,
presence_penalty: float | None = None,
response_format: OpenAIResponseFormatParam | None = None,
seed: int | None = None,
stop: str | list[str] | None = None,
stream: bool | None = None,
stream_options: dict[str, Any] | None = None,
temperature: float | None = None,
tool_choice: str | dict[str, Any] | None = None,
tools: list[dict[str, Any]] | None = None,
top_logprobs: int | None = None,
top_p: float | None = None,
user: str | None = None,
) -> OpenAIChatCompletion | AsyncIterator[OpenAIChatCompletionChunk]:
model_obj = await self._get_model(model)
# Ollama does not support image urls, so we need to download the image and convert it to base64
async def _convert_message(m: OpenAIMessageParam) -> OpenAIMessageParam:
if isinstance(m.content, list):
for c in m.content:
if c.type == "image_url" and c.image_url and c.image_url.url:
localize_result = await localize_image_content(c.image_url.url)
if localize_result is None:
raise ValueError(f"Failed to localize image content from {c.image_url.url}")
content, format = localize_result
c.image_url.url = f"data:image/{format};base64,{base64.b64encode(content).decode('utf-8')}"
return m
messages = [await _convert_message(m) for m in messages]
params = await prepare_openai_completion_params(
model=model_obj.provider_resource_id,
messages=messages,
frequency_penalty=frequency_penalty,
function_call=function_call,
functions=functions,
logit_bias=logit_bias,
logprobs=logprobs,
max_completion_tokens=max_completion_tokens,
max_tokens=max_tokens,
n=n,
parallel_tool_calls=parallel_tool_calls,
presence_penalty=presence_penalty,
response_format=response_format,
seed=seed,
stop=stop,
stream=stream,
stream_options=stream_options,
temperature=temperature,
tool_choice=tool_choice,
tools=tools,
top_logprobs=top_logprobs,
top_p=top_p,
user=user,
)
return await OpenAIMixin.openai_chat_completion(self, **params)
async def convert_message_to_openai_dict_for_ollama(message: Message) -> list[dict]:
async def _convert_content(content) -> dict:

View file

@ -21,8 +21,6 @@ logger = get_logger(name=__name__, category="inference::openai")
# | completion | LiteLLMOpenAIMixin |
# | chat_completion | LiteLLMOpenAIMixin |
# | embedding | LiteLLMOpenAIMixin |
# | batch_completion | LiteLLMOpenAIMixin |
# | batch_chat_completion | LiteLLMOpenAIMixin |
# | openai_completion | OpenAIMixin |
# | openai_chat_completion | OpenAIMixin |
# | openai_embeddings | OpenAIMixin |

View file

@ -4,12 +4,10 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from llama_stack.apis.inference import Inference
from .config import SambaNovaImplConfig
async def get_adapter_impl(config: SambaNovaImplConfig, _deps) -> Inference:
async def get_adapter_impl(config: SambaNovaImplConfig, _deps):
from .sambanova import SambaNovaInferenceAdapter
assert isinstance(config, SambaNovaImplConfig), f"Unexpected config type: {type(config)}"

View file

@ -25,7 +25,7 @@ class SambaNovaInferenceAdapter(OpenAIMixin, LiteLLMOpenAIMixin):
def __init__(self, config: SambaNovaImplConfig):
self.config = config
self.environment_available_models = []
self.environment_available_models: list[str] = []
LiteLLMOpenAIMixin.__init__(
self,
litellm_provider_name="sambanova",

View file

@ -70,6 +70,7 @@ class TogetherInferenceAdapter(OpenAIMixin, ModelRegistryHelper, Inference, Need
}
def __init__(self, config: TogetherImplConfig) -> None:
ModelRegistryHelper.__init__(self)
self.config = config
self.allowed_models = config.allowed_models
self._model_cache: dict[str, Model] = {}

View file

@ -20,7 +20,7 @@ logger = get_logger(name=__name__, category="providers::utils")
class RemoteInferenceProviderConfig(BaseModel):
allowed_models: list[str] | None = Field(
allowed_models: list[str] | None = Field( # TODO: make this non-optional and give a list() default
default=None,
description="List of models that should be registered with the model registry. If None, all models are allowed.",
)

View file

@ -4,6 +4,7 @@
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import base64
import uuid
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator
@ -26,6 +27,7 @@ from llama_stack.apis.models import ModelType
from llama_stack.log import get_logger
from llama_stack.providers.utils.inference.model_registry import ModelRegistryHelper
from llama_stack.providers.utils.inference.openai_compat import prepare_openai_completion_params
from llama_stack.providers.utils.inference.prompt_adapter import localize_image_content
logger = get_logger(name=__name__, category="providers::utils")
@ -51,6 +53,10 @@ class OpenAIMixin(ModelRegistryHelper, ABC):
# This is useful for providers that do not return a unique id in the response.
overwrite_completion_id: bool = False
# Allow subclasses to control whether to download images and convert to base64
# for providers that require base64 encoded images instead of URLs.
download_images: bool = False
# Embedding model metadata for this provider
# Can be set by subclasses or instances to provide embedding models
# Format: {"model_id": {"embedding_dimension": 1536, "context_length": 8192}}
@ -239,6 +245,24 @@ class OpenAIMixin(ModelRegistryHelper, ABC):
"""
Direct OpenAI chat completion API call.
"""
if self.download_images:
async def _localize_image_url(m: OpenAIMessageParam) -> OpenAIMessageParam:
if isinstance(m.content, list):
for c in m.content:
if c.type == "image_url" and c.image_url and c.image_url.url and "http" in c.image_url.url:
localize_result = await localize_image_content(c.image_url.url)
if localize_result is None:
raise ValueError(
f"Failed to localize image content from {c.image_url.url[:42]}{'...' if len(c.image_url.url) > 42 else ''}"
)
content, format = localize_result
c.image_url.url = f"data:image/{format};base64,{base64.b64encode(content).decode('utf-8')}"
# else it's a string and we don't need to modify it
return m
messages = [await _localize_image_url(m) for m in messages]
resp = await self.client.chat.completions.create(
**await prepare_openai_completion_params(
model=await self._get_provider_model_id(model),

View file

@ -28,7 +28,7 @@ class CommonConfig(BaseModel):
class RedisKVStoreConfig(CommonConfig):
type: Literal[KVStoreType.redis.value] = KVStoreType.redis.value
type: Literal["redis"] = KVStoreType.redis.value
host: str = "localhost"
port: int = 6379
@ -50,7 +50,7 @@ class RedisKVStoreConfig(CommonConfig):
class SqliteKVStoreConfig(CommonConfig):
type: Literal[KVStoreType.sqlite.value] = KVStoreType.sqlite.value
type: Literal["sqlite"] = KVStoreType.sqlite.value
db_path: str = Field(
default=(RUNTIME_BASE_DIR / "kvstore.db").as_posix(),
description="File path for the sqlite database",
@ -69,7 +69,7 @@ class SqliteKVStoreConfig(CommonConfig):
class PostgresKVStoreConfig(CommonConfig):
type: Literal[KVStoreType.postgres.value] = KVStoreType.postgres.value
type: Literal["postgres"] = KVStoreType.postgres.value
host: str = "localhost"
port: int = 5432
db: str = "llamastack"
@ -113,11 +113,11 @@ class PostgresKVStoreConfig(CommonConfig):
class MongoDBKVStoreConfig(CommonConfig):
type: Literal[KVStoreType.mongodb.value] = KVStoreType.mongodb.value
type: Literal["mongodb"] = KVStoreType.mongodb.value
host: str = "localhost"
port: int = 27017
db: str = "llamastack"
user: str = None
user: str | None = None
password: str | None = None
collection_name: str = "llamastack_kvstore"

View file

@ -7,6 +7,7 @@
from datetime import datetime
from pymongo import AsyncMongoClient
from pymongo.asynchronous.collection import AsyncCollection
from llama_stack.log import get_logger
from llama_stack.providers.utils.kvstore import KVStore
@ -19,8 +20,13 @@ log = get_logger(name=__name__, category="providers::utils")
class MongoDBKVStoreImpl(KVStore):
def __init__(self, config: MongoDBKVStoreConfig):
self.config = config
self.conn = None
self.collection = None
self.conn: AsyncMongoClient | None = None
@property
def collection(self) -> AsyncCollection:
if self.conn is None:
raise RuntimeError("MongoDB connection is not initialized")
return self.conn[self.config.db][self.config.collection_name]
async def initialize(self) -> None:
try:
@ -32,7 +38,6 @@ class MongoDBKVStoreImpl(KVStore):
}
conn_creds = {k: v for k, v in conn_creds.items() if v is not None}
self.conn = AsyncMongoClient(**conn_creds)
self.collection = self.conn[self.config.db][self.config.collection_name]
except Exception as e:
log.exception("Could not connect to MongoDB database server")
raise RuntimeError("Could not connect to MongoDB database server") from e

View file

@ -9,9 +9,13 @@ from datetime import datetime
import aiosqlite
from llama_stack.log import get_logger
from ..api import KVStore
from ..config import SqliteKVStoreConfig
logger = get_logger(name=__name__, category="providers::utils")
class SqliteKVStoreImpl(KVStore):
def __init__(self, config: SqliteKVStoreConfig):
@ -50,6 +54,9 @@ class SqliteKVStoreImpl(KVStore):
if row is None:
return None
value, expiration = row
if not isinstance(value, str):
logger.warning(f"Expected string value for key {key}, got {type(value)}, returning None")
return None
return value
async def delete(self, key: str) -> None:

View file

@ -18,7 +18,7 @@
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"framer-motion": "^12.23.12",
"llama-stack-client": "^0.2.22",
"llama-stack-client": "^0.2.23",
"lucide-react": "^0.542.0",
"next": "15.5.3",
"next-auth": "^4.24.11",
@ -10172,9 +10172,9 @@
"license": "MIT"
},
"node_modules/llama-stack-client": {
"version": "0.2.22",
"resolved": "https://registry.npmjs.org/llama-stack-client/-/llama-stack-client-0.2.22.tgz",
"integrity": "sha512-7aW3UQj5MwjV73Brd+yQ1e4W1W33nhozyeHM5tzOgbsVZ88tL78JNiNvyFqDR5w6V9XO4/uSGGiQVG6v83yR4w==",
"version": "0.2.23",
"resolved": "https://registry.npmjs.org/llama-stack-client/-/llama-stack-client-0.2.23.tgz",
"integrity": "sha512-J3YFH1HW2K70capejQxGlCyTgKdfx+sQf8Ab+HFi1j2Q00KtpHXB79RxejvBxjWC3X2E++P9iU57KdU2Tp/rIQ==",
"license": "MIT",
"dependencies": {
"@types/node": "^18.11.18",

View file

@ -23,7 +23,7 @@
"class-variance-authority": "^0.7.1",
"clsx": "^2.1.1",
"framer-motion": "^12.23.12",
"llama-stack-client": "^0.2.22",
"llama-stack-client": "^0.2.23",
"lucide-react": "^0.542.0",
"next": "15.5.3",
"next-auth": "^4.24.11",