mirror of
https://github.com/meta-llama/llama-stack.git
synced 2025-12-04 10:10:36 +00:00
Merge branch 'main' into chroma
This commit is contained in:
commit
3f66f55771
137 changed files with 35682 additions and 1800 deletions
|
|
@ -25,6 +25,7 @@ from llama_stack.apis.inference import (
|
|||
OpenAIChatCompletionRequestWithExtraBody,
|
||||
OpenAICompletionRequestWithExtraBody,
|
||||
OpenAIDeveloperMessageParam,
|
||||
OpenAIEmbeddingsRequestWithExtraBody,
|
||||
OpenAIMessageParam,
|
||||
OpenAISystemMessageParam,
|
||||
OpenAIToolMessageParam,
|
||||
|
|
@ -640,7 +641,9 @@ class ReferenceBatchesImpl(Batches):
|
|||
},
|
||||
}
|
||||
else: # /v1/embeddings
|
||||
embeddings_response = await self.inference_api.openai_embeddings(**request.body)
|
||||
embeddings_response = await self.inference_api.openai_embeddings(
|
||||
OpenAIEmbeddingsRequestWithExtraBody(**request.body)
|
||||
)
|
||||
assert hasattr(embeddings_response, "model_dump_json"), (
|
||||
"Embeddings response must have model_dump_json method"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -54,11 +54,11 @@ class SentenceTransformersInferenceImpl(
|
|||
async def list_models(self) -> list[Model] | None:
|
||||
return [
|
||||
Model(
|
||||
identifier="all-MiniLM-L6-v2",
|
||||
provider_resource_id="all-MiniLM-L6-v2",
|
||||
identifier="nomic-ai/nomic-embed-text-v1.5",
|
||||
provider_resource_id="nomic-ai/nomic-embed-text-v1.5",
|
||||
provider_id=self.__provider_id__,
|
||||
metadata={
|
||||
"embedding_dimension": 384,
|
||||
"embedding_dimension": 768,
|
||||
},
|
||||
model_type=ModelType.embedding,
|
||||
),
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any
|
|||
if TYPE_CHECKING:
|
||||
from codeshield.cs import CodeShieldScanResult
|
||||
|
||||
from llama_stack.apis.inference import Message
|
||||
from llama_stack.apis.inference import OpenAIMessageParam
|
||||
from llama_stack.apis.safety import (
|
||||
RunShieldResponse,
|
||||
Safety,
|
||||
|
|
@ -53,7 +53,7 @@ class MetaReferenceCodeScannerSafetyImpl(Safety):
|
|||
async def run_shield(
|
||||
self,
|
||||
shield_id: str,
|
||||
messages: list[Message],
|
||||
messages: list[OpenAIMessageParam],
|
||||
params: dict[str, Any] = None,
|
||||
) -> RunShieldResponse:
|
||||
shield = await self.shield_store.get_shield(shield_id)
|
||||
|
|
|
|||
|
|
@ -12,10 +12,9 @@ from typing import Any
|
|||
from llama_stack.apis.common.content_types import ImageContentItem, TextContentItem
|
||||
from llama_stack.apis.inference import (
|
||||
Inference,
|
||||
Message,
|
||||
OpenAIChatCompletionRequestWithExtraBody,
|
||||
OpenAIMessageParam,
|
||||
OpenAIUserMessageParam,
|
||||
UserMessage,
|
||||
)
|
||||
from llama_stack.apis.safety import (
|
||||
RunShieldResponse,
|
||||
|
|
@ -165,7 +164,7 @@ class LlamaGuardSafetyImpl(Safety, ShieldsProtocolPrivate):
|
|||
async def run_shield(
|
||||
self,
|
||||
shield_id: str,
|
||||
messages: list[Message],
|
||||
messages: list[OpenAIMessageParam],
|
||||
params: dict[str, Any] = None,
|
||||
) -> RunShieldResponse:
|
||||
shield = await self.shield_store.get_shield(shield_id)
|
||||
|
|
@ -175,8 +174,8 @@ class LlamaGuardSafetyImpl(Safety, ShieldsProtocolPrivate):
|
|||
messages = messages.copy()
|
||||
# some shields like llama-guard require the first message to be a user message
|
||||
# since this might be a tool call, first role might not be user
|
||||
if len(messages) > 0 and messages[0].role != Role.user.value:
|
||||
messages[0] = UserMessage(content=messages[0].content)
|
||||
if len(messages) > 0 and messages[0].role != "user":
|
||||
messages[0] = OpenAIUserMessageParam(content=messages[0].content)
|
||||
|
||||
# Use the inference API's model resolution instead of hardcoded mappings
|
||||
# This allows the shield to work with any registered model
|
||||
|
|
@ -208,7 +207,7 @@ class LlamaGuardSafetyImpl(Safety, ShieldsProtocolPrivate):
|
|||
messages = [input]
|
||||
|
||||
# convert to user messages format with role
|
||||
messages = [UserMessage(content=m) for m in messages]
|
||||
messages = [OpenAIUserMessageParam(content=m) for m in messages]
|
||||
|
||||
# Determine safety categories based on the model type
|
||||
# For known Llama Guard models, use specific categories
|
||||
|
|
@ -277,7 +276,7 @@ class LlamaGuardShield:
|
|||
|
||||
return final_categories
|
||||
|
||||
def validate_messages(self, messages: list[Message]) -> None:
|
||||
def validate_messages(self, messages: list[OpenAIMessageParam]) -> list[OpenAIMessageParam]:
|
||||
if len(messages) == 0:
|
||||
raise ValueError("Messages must not be empty")
|
||||
if messages[0].role != Role.user.value:
|
||||
|
|
@ -288,7 +287,7 @@ class LlamaGuardShield:
|
|||
|
||||
return messages
|
||||
|
||||
async def run(self, messages: list[Message]) -> RunShieldResponse:
|
||||
async def run(self, messages: list[OpenAIMessageParam]) -> RunShieldResponse:
|
||||
messages = self.validate_messages(messages)
|
||||
|
||||
if self.model == CoreModelId.llama_guard_3_11b_vision.value:
|
||||
|
|
@ -307,10 +306,10 @@ class LlamaGuardShield:
|
|||
content = content.strip()
|
||||
return self.get_shield_response(content)
|
||||
|
||||
def build_text_shield_input(self, messages: list[Message]) -> OpenAIUserMessageParam:
|
||||
return OpenAIUserMessageParam(role="user", content=self.build_prompt(messages))
|
||||
def build_text_shield_input(self, messages: list[OpenAIMessageParam]) -> OpenAIUserMessageParam:
|
||||
return OpenAIUserMessageParam(content=self.build_prompt(messages))
|
||||
|
||||
def build_vision_shield_input(self, messages: list[Message]) -> OpenAIUserMessageParam:
|
||||
def build_vision_shield_input(self, messages: list[OpenAIMessageParam]) -> OpenAIUserMessageParam:
|
||||
conversation = []
|
||||
most_recent_img = None
|
||||
|
||||
|
|
@ -333,7 +332,7 @@ class LlamaGuardShield:
|
|||
else:
|
||||
raise ValueError(f"Unknown content type: {c}")
|
||||
|
||||
conversation.append(UserMessage(content=content))
|
||||
conversation.append(OpenAIUserMessageParam(content=content))
|
||||
else:
|
||||
raise ValueError(f"Unknown content type: {m.content}")
|
||||
|
||||
|
|
@ -342,9 +341,9 @@ class LlamaGuardShield:
|
|||
prompt.append(most_recent_img)
|
||||
prompt.append(self.build_prompt(conversation[::-1]))
|
||||
|
||||
return OpenAIUserMessageParam(role="user", content=prompt)
|
||||
return OpenAIUserMessageParam(content=prompt)
|
||||
|
||||
def build_prompt(self, messages: list[Message]) -> str:
|
||||
def build_prompt(self, messages: list[OpenAIMessageParam]) -> str:
|
||||
categories = self.get_safety_categories()
|
||||
categories_str = "\n".join(categories)
|
||||
conversations_str = "\n\n".join(
|
||||
|
|
@ -377,7 +376,7 @@ class LlamaGuardShield:
|
|||
|
||||
raise ValueError(f"Unexpected response: {response}")
|
||||
|
||||
async def run_moderation(self, messages: list[Message]) -> ModerationObject:
|
||||
async def run_moderation(self, messages: list[OpenAIMessageParam]) -> ModerationObject:
|
||||
if not messages:
|
||||
return self.create_moderation_object(self.model)
|
||||
|
||||
|
|
@ -388,6 +387,7 @@ class LlamaGuardShield:
|
|||
model=self.model,
|
||||
messages=[shield_input_message],
|
||||
stream=False,
|
||||
temperature=0.0, # default is 1, which is too high for safety
|
||||
)
|
||||
response = await self.inference_api.openai_chat_completion(params)
|
||||
content = response.choices[0].message.content
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ from typing import Any
|
|||
import torch
|
||||
from transformers import AutoModelForSequenceClassification, AutoTokenizer
|
||||
|
||||
from llama_stack.apis.inference import Message
|
||||
from llama_stack.apis.inference import OpenAIMessageParam
|
||||
from llama_stack.apis.safety import (
|
||||
RunShieldResponse,
|
||||
Safety,
|
||||
|
|
@ -22,9 +22,7 @@ from llama_stack.apis.shields import Shield
|
|||
from llama_stack.core.utils.model_utils import model_local_dir
|
||||
from llama_stack.log import get_logger
|
||||
from llama_stack.providers.datatypes import ShieldsProtocolPrivate
|
||||
from llama_stack.providers.utils.inference.prompt_adapter import (
|
||||
interleaved_content_as_str,
|
||||
)
|
||||
from llama_stack.providers.utils.inference.prompt_adapter import interleaved_content_as_str
|
||||
|
||||
from .config import PromptGuardConfig, PromptGuardType
|
||||
|
||||
|
|
@ -56,7 +54,7 @@ class PromptGuardSafetyImpl(Safety, ShieldsProtocolPrivate):
|
|||
async def run_shield(
|
||||
self,
|
||||
shield_id: str,
|
||||
messages: list[Message],
|
||||
messages: list[OpenAIMessageParam],
|
||||
params: dict[str, Any],
|
||||
) -> RunShieldResponse:
|
||||
shield = await self.shield_store.get_shield(shield_id)
|
||||
|
|
@ -93,7 +91,7 @@ class PromptGuardShield:
|
|||
self.tokenizer = AutoTokenizer.from_pretrained(model_dir)
|
||||
self.model = AutoModelForSequenceClassification.from_pretrained(model_dir, device_map=self.device)
|
||||
|
||||
async def run(self, messages: list[Message]) -> RunShieldResponse:
|
||||
async def run(self, messages: list[OpenAIMessageParam]) -> RunShieldResponse:
|
||||
message = messages[-1]
|
||||
text = interleaved_content_as_str(message.content)
|
||||
|
||||
|
|
|
|||
|
|
@ -43,6 +43,12 @@ def available_providers() -> list[ProviderSpec]:
|
|||
pip_packages=[
|
||||
"torch torchvision torchao>=0.12.0 --extra-index-url https://download.pytorch.org/whl/cpu",
|
||||
"sentence-transformers --no-deps",
|
||||
# required by some SentenceTransformers architectures for tensor rearrange/merge ops
|
||||
"einops",
|
||||
# fast HF tokenization backend used by SentenceTransformers models
|
||||
"tokenizers",
|
||||
# safe and fast file format for storing and loading tensors
|
||||
"safetensors",
|
||||
],
|
||||
module="llama_stack.providers.inline.inference.sentence_transformers",
|
||||
config_class="llama_stack.providers.inline.inference.sentence_transformers.config.SentenceTransformersInferenceConfig",
|
||||
|
|
@ -271,7 +277,7 @@ Available Models:
|
|||
pip_packages=["litellm"],
|
||||
module="llama_stack.providers.remote.inference.watsonx",
|
||||
config_class="llama_stack.providers.remote.inference.watsonx.WatsonXConfig",
|
||||
provider_data_validator="llama_stack.providers.remote.inference.watsonx.WatsonXProviderDataValidator",
|
||||
provider_data_validator="llama_stack.providers.remote.inference.watsonx.config.WatsonXProviderDataValidator",
|
||||
description="IBM WatsonX inference provider for accessing AI models on IBM's WatsonX platform.",
|
||||
),
|
||||
RemoteProviderSpec(
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ from llama_stack.apis.inference import (
|
|||
Inference,
|
||||
OpenAIChatCompletionRequestWithExtraBody,
|
||||
OpenAICompletionRequestWithExtraBody,
|
||||
OpenAIEmbeddingsRequestWithExtraBody,
|
||||
OpenAIEmbeddingsResponse,
|
||||
)
|
||||
from llama_stack.apis.inference.inference import (
|
||||
|
|
@ -124,11 +125,7 @@ class BedrockInferenceAdapter(
|
|||
|
||||
async def openai_embeddings(
|
||||
self,
|
||||
model: str,
|
||||
input: str | list[str],
|
||||
encoding_format: str | None = "float",
|
||||
dimensions: int | None = None,
|
||||
user: str | None = None,
|
||||
params: OpenAIEmbeddingsRequestWithExtraBody,
|
||||
) -> OpenAIEmbeddingsResponse:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,10 @@
|
|||
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from llama_stack.apis.inference import OpenAIEmbeddingsResponse
|
||||
from llama_stack.apis.inference import (
|
||||
OpenAIEmbeddingsRequestWithExtraBody,
|
||||
OpenAIEmbeddingsResponse,
|
||||
)
|
||||
from llama_stack.providers.utils.inference.openai_mixin import OpenAIMixin
|
||||
|
||||
from .config import CerebrasImplConfig
|
||||
|
|
@ -20,10 +23,6 @@ class CerebrasInferenceAdapter(OpenAIMixin):
|
|||
|
||||
async def openai_embeddings(
|
||||
self,
|
||||
model: str,
|
||||
input: str | list[str],
|
||||
encoding_format: str | None = "float",
|
||||
dimensions: int | None = None,
|
||||
user: str | None = None,
|
||||
params: OpenAIEmbeddingsRequestWithExtraBody,
|
||||
) -> OpenAIEmbeddingsResponse:
|
||||
raise NotImplementedError()
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@
|
|||
from llama_stack.apis.inference.inference import (
|
||||
OpenAICompletion,
|
||||
OpenAICompletionRequestWithExtraBody,
|
||||
OpenAIEmbeddingsRequestWithExtraBody,
|
||||
OpenAIEmbeddingsResponse,
|
||||
)
|
||||
from llama_stack.log import get_logger
|
||||
|
|
@ -40,10 +41,6 @@ class LlamaCompatInferenceAdapter(OpenAIMixin):
|
|||
|
||||
async def openai_embeddings(
|
||||
self,
|
||||
model: str,
|
||||
input: str | list[str],
|
||||
encoding_format: str | None = "float",
|
||||
dimensions: int | None = None,
|
||||
user: str | None = None,
|
||||
params: OpenAIEmbeddingsRequestWithExtraBody,
|
||||
) -> OpenAIEmbeddingsResponse:
|
||||
raise NotImplementedError()
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ from openai import NOT_GIVEN
|
|||
|
||||
from llama_stack.apis.inference import (
|
||||
OpenAIEmbeddingData,
|
||||
OpenAIEmbeddingsRequestWithExtraBody,
|
||||
OpenAIEmbeddingsResponse,
|
||||
OpenAIEmbeddingUsage,
|
||||
)
|
||||
|
|
@ -78,11 +79,7 @@ class NVIDIAInferenceAdapter(OpenAIMixin):
|
|||
|
||||
async def openai_embeddings(
|
||||
self,
|
||||
model: str,
|
||||
input: str | list[str],
|
||||
encoding_format: str | None = "float",
|
||||
dimensions: int | None = None,
|
||||
user: str | None = None,
|
||||
params: OpenAIEmbeddingsRequestWithExtraBody,
|
||||
) -> OpenAIEmbeddingsResponse:
|
||||
"""
|
||||
OpenAI-compatible embeddings for NVIDIA NIM.
|
||||
|
|
@ -99,11 +96,11 @@ class NVIDIAInferenceAdapter(OpenAIMixin):
|
|||
)
|
||||
|
||||
response = await self.client.embeddings.create(
|
||||
model=await self._get_provider_model_id(model),
|
||||
input=input,
|
||||
encoding_format=encoding_format if encoding_format is not None else NOT_GIVEN,
|
||||
dimensions=dimensions if dimensions is not None else NOT_GIVEN,
|
||||
user=user if user is not None else NOT_GIVEN,
|
||||
model=await self._get_provider_model_id(params.model),
|
||||
input=params.input,
|
||||
encoding_format=params.encoding_format if params.encoding_format is not None else NOT_GIVEN,
|
||||
dimensions=params.dimensions if params.dimensions is not None else NOT_GIVEN,
|
||||
user=params.user if params.user is not None else NOT_GIVEN,
|
||||
extra_body=extra_body,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ from llama_stack.apis.inference import (
|
|||
OpenAIChatCompletionRequestWithExtraBody,
|
||||
OpenAICompletion,
|
||||
OpenAICompletionRequestWithExtraBody,
|
||||
OpenAIEmbeddingsRequestWithExtraBody,
|
||||
OpenAIEmbeddingsResponse,
|
||||
)
|
||||
from llama_stack.apis.models import Model
|
||||
|
|
@ -69,11 +70,7 @@ class PassthroughInferenceAdapter(Inference):
|
|||
|
||||
async def openai_embeddings(
|
||||
self,
|
||||
model: str,
|
||||
input: str | list[str],
|
||||
encoding_format: str | None = "float",
|
||||
dimensions: int | None = None,
|
||||
user: str | None = None,
|
||||
params: OpenAIEmbeddingsRequestWithExtraBody,
|
||||
) -> OpenAIEmbeddingsResponse:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,10 @@ from collections.abc import Iterable
|
|||
from huggingface_hub import AsyncInferenceClient, HfApi
|
||||
from pydantic import SecretStr
|
||||
|
||||
from llama_stack.apis.inference import OpenAIEmbeddingsResponse
|
||||
from llama_stack.apis.inference import (
|
||||
OpenAIEmbeddingsRequestWithExtraBody,
|
||||
OpenAIEmbeddingsResponse,
|
||||
)
|
||||
from llama_stack.log import get_logger
|
||||
from llama_stack.providers.utils.inference.openai_mixin import OpenAIMixin
|
||||
|
||||
|
|
@ -40,11 +43,7 @@ class _HfAdapter(OpenAIMixin):
|
|||
|
||||
async def openai_embeddings(
|
||||
self,
|
||||
model: str,
|
||||
input: str | list[str],
|
||||
encoding_format: str | None = "float",
|
||||
dimensions: int | None = None,
|
||||
user: str | None = None,
|
||||
params: OpenAIEmbeddingsRequestWithExtraBody,
|
||||
) -> OpenAIEmbeddingsResponse:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ from together import AsyncTogether
|
|||
from together.constants import BASE_URL
|
||||
|
||||
from llama_stack.apis.inference import (
|
||||
OpenAIEmbeddingsRequestWithExtraBody,
|
||||
OpenAIEmbeddingsResponse,
|
||||
)
|
||||
from llama_stack.apis.inference.inference import OpenAIEmbeddingUsage
|
||||
|
|
@ -62,11 +63,7 @@ class TogetherInferenceAdapter(OpenAIMixin, NeedsRequestProviderData):
|
|||
|
||||
async def openai_embeddings(
|
||||
self,
|
||||
model: str,
|
||||
input: str | list[str],
|
||||
encoding_format: str | None = "float",
|
||||
dimensions: int | None = None,
|
||||
user: str | None = None,
|
||||
params: OpenAIEmbeddingsRequestWithExtraBody,
|
||||
) -> OpenAIEmbeddingsResponse:
|
||||
"""
|
||||
Together's OpenAI-compatible embeddings endpoint is not compatible with
|
||||
|
|
@ -78,25 +75,27 @@ class TogetherInferenceAdapter(OpenAIMixin, NeedsRequestProviderData):
|
|||
- does not support dimensions param, returns 400 Unrecognized request arguments supplied: dimensions
|
||||
"""
|
||||
# Together support ticket #13332 -> will not fix
|
||||
if user is not None:
|
||||
if params.user is not None:
|
||||
raise ValueError("Together's embeddings endpoint does not support user param.")
|
||||
# Together support ticket #13333 -> escalated
|
||||
if dimensions is not None:
|
||||
if params.dimensions is not None:
|
||||
raise ValueError("Together's embeddings endpoint does not support dimensions param.")
|
||||
|
||||
response = await self.client.embeddings.create(
|
||||
model=await self._get_provider_model_id(model),
|
||||
input=input,
|
||||
encoding_format=encoding_format,
|
||||
model=await self._get_provider_model_id(params.model),
|
||||
input=params.input,
|
||||
encoding_format=params.encoding_format,
|
||||
)
|
||||
|
||||
response.model = model # return the user the same model id they provided, avoid exposing the provider model id
|
||||
response.model = (
|
||||
params.model
|
||||
) # return the user the same model id they provided, avoid exposing the provider model id
|
||||
|
||||
# Together support ticket #13330 -> escalated
|
||||
# - togethercomputer/m2-bert-80M-32k-retrieval *does not* return usage information
|
||||
if not hasattr(response, "usage") or response.usage is None:
|
||||
logger.warning(
|
||||
f"Together's embedding endpoint for {model} did not return usage information, substituting -1s."
|
||||
f"Together's embedding endpoint for {params.model} did not return usage information, substituting -1s."
|
||||
)
|
||||
response.usage = OpenAIEmbeddingUsage(prompt_tokens=-1, total_tokens=-1)
|
||||
|
||||
|
|
|
|||
|
|
@ -7,18 +7,18 @@
|
|||
import os
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from llama_stack.providers.utils.inference.model_registry import RemoteInferenceProviderConfig
|
||||
from llama_stack.schema_utils import json_schema_type
|
||||
|
||||
|
||||
class WatsonXProviderDataValidator(BaseModel):
|
||||
model_config = ConfigDict(
|
||||
from_attributes=True,
|
||||
extra="forbid",
|
||||
watsonx_project_id: str | None = Field(
|
||||
default=None,
|
||||
description="IBM WatsonX project ID",
|
||||
)
|
||||
watsonx_api_key: str | None
|
||||
watsonx_api_key: str | None = None
|
||||
|
||||
|
||||
@json_schema_type
|
||||
|
|
|
|||
|
|
@ -4,42 +4,259 @@
|
|||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
from collections.abc import AsyncIterator
|
||||
from typing import Any
|
||||
|
||||
import litellm
|
||||
import requests
|
||||
|
||||
from llama_stack.apis.inference import ChatCompletionRequest
|
||||
from llama_stack.apis.inference.inference import (
|
||||
OpenAIChatCompletion,
|
||||
OpenAIChatCompletionChunk,
|
||||
OpenAIChatCompletionRequestWithExtraBody,
|
||||
OpenAIChatCompletionUsage,
|
||||
OpenAICompletion,
|
||||
OpenAICompletionRequestWithExtraBody,
|
||||
OpenAIEmbeddingsRequestWithExtraBody,
|
||||
OpenAIEmbeddingsResponse,
|
||||
)
|
||||
from llama_stack.apis.models import Model
|
||||
from llama_stack.apis.models.models import ModelType
|
||||
from llama_stack.log import get_logger
|
||||
from llama_stack.providers.remote.inference.watsonx.config import WatsonXConfig
|
||||
from llama_stack.providers.utils.inference.litellm_openai_mixin import LiteLLMOpenAIMixin
|
||||
from llama_stack.providers.utils.inference.openai_compat import prepare_openai_completion_params
|
||||
from llama_stack.providers.utils.telemetry.tracing import get_current_span
|
||||
|
||||
logger = get_logger(name=__name__, category="providers::remote::watsonx")
|
||||
|
||||
|
||||
class WatsonXInferenceAdapter(LiteLLMOpenAIMixin):
|
||||
_model_cache: dict[str, Model] = {}
|
||||
|
||||
provider_data_api_key_field: str = "watsonx_api_key"
|
||||
|
||||
def __init__(self, config: WatsonXConfig):
|
||||
self.available_models = None
|
||||
self.config = config
|
||||
api_key = config.auth_credential.get_secret_value() if config.auth_credential else None
|
||||
LiteLLMOpenAIMixin.__init__(
|
||||
self,
|
||||
litellm_provider_name="watsonx",
|
||||
api_key_from_config=config.auth_credential.get_secret_value() if config.auth_credential else None,
|
||||
api_key_from_config=api_key,
|
||||
provider_data_api_key_field="watsonx_api_key",
|
||||
openai_compat_api_base=self.get_base_url(),
|
||||
)
|
||||
|
||||
async def openai_chat_completion(
|
||||
self,
|
||||
params: OpenAIChatCompletionRequestWithExtraBody,
|
||||
) -> OpenAIChatCompletion | AsyncIterator[OpenAIChatCompletionChunk]:
|
||||
"""
|
||||
Override parent method to add timeout and inject usage object when missing.
|
||||
This works around a LiteLLM defect where usage block is sometimes dropped.
|
||||
"""
|
||||
|
||||
# Add usage tracking for streaming when telemetry is active
|
||||
stream_options = params.stream_options
|
||||
if params.stream and get_current_span() is not None:
|
||||
if stream_options is None:
|
||||
stream_options = {"include_usage": True}
|
||||
elif "include_usage" not in stream_options:
|
||||
stream_options = {**stream_options, "include_usage": True}
|
||||
|
||||
model_obj = await self.model_store.get_model(params.model)
|
||||
|
||||
request_params = await prepare_openai_completion_params(
|
||||
model=self.get_litellm_model_name(model_obj.provider_resource_id),
|
||||
messages=params.messages,
|
||||
frequency_penalty=params.frequency_penalty,
|
||||
function_call=params.function_call,
|
||||
functions=params.functions,
|
||||
logit_bias=params.logit_bias,
|
||||
logprobs=params.logprobs,
|
||||
max_completion_tokens=params.max_completion_tokens,
|
||||
max_tokens=params.max_tokens,
|
||||
n=params.n,
|
||||
parallel_tool_calls=params.parallel_tool_calls,
|
||||
presence_penalty=params.presence_penalty,
|
||||
response_format=params.response_format,
|
||||
seed=params.seed,
|
||||
stop=params.stop,
|
||||
stream=params.stream,
|
||||
stream_options=stream_options,
|
||||
temperature=params.temperature,
|
||||
tool_choice=params.tool_choice,
|
||||
tools=params.tools,
|
||||
top_logprobs=params.top_logprobs,
|
||||
top_p=params.top_p,
|
||||
user=params.user,
|
||||
api_key=self.get_api_key(),
|
||||
api_base=self.api_base,
|
||||
# These are watsonx-specific parameters
|
||||
timeout=self.config.timeout,
|
||||
project_id=self.config.project_id,
|
||||
)
|
||||
|
||||
result = await litellm.acompletion(**request_params)
|
||||
|
||||
# If not streaming, check and inject usage if missing
|
||||
if not params.stream:
|
||||
# Use getattr to safely handle cases where usage attribute might not exist
|
||||
if getattr(result, "usage", None) is None:
|
||||
# Create usage object with zeros
|
||||
usage_obj = OpenAIChatCompletionUsage(
|
||||
prompt_tokens=0,
|
||||
completion_tokens=0,
|
||||
total_tokens=0,
|
||||
)
|
||||
# Use model_copy to create a new response with the usage injected
|
||||
result = result.model_copy(update={"usage": usage_obj})
|
||||
return result
|
||||
|
||||
# For streaming, wrap the iterator to normalize chunks
|
||||
return self._normalize_stream(result)
|
||||
|
||||
def _normalize_chunk(self, chunk: OpenAIChatCompletionChunk) -> OpenAIChatCompletionChunk:
|
||||
"""
|
||||
Normalize a chunk to ensure it has all expected attributes.
|
||||
This works around LiteLLM not always including all expected attributes.
|
||||
"""
|
||||
# Ensure chunk has usage attribute with zeros if missing
|
||||
if not hasattr(chunk, "usage") or chunk.usage is None:
|
||||
usage_obj = OpenAIChatCompletionUsage(
|
||||
prompt_tokens=0,
|
||||
completion_tokens=0,
|
||||
total_tokens=0,
|
||||
)
|
||||
chunk = chunk.model_copy(update={"usage": usage_obj})
|
||||
|
||||
# Ensure all delta objects in choices have expected attributes
|
||||
if hasattr(chunk, "choices") and chunk.choices:
|
||||
normalized_choices = []
|
||||
for choice in chunk.choices:
|
||||
if hasattr(choice, "delta") and choice.delta:
|
||||
delta = choice.delta
|
||||
# Build update dict for missing attributes
|
||||
delta_updates = {}
|
||||
if not hasattr(delta, "refusal"):
|
||||
delta_updates["refusal"] = None
|
||||
if not hasattr(delta, "reasoning_content"):
|
||||
delta_updates["reasoning_content"] = None
|
||||
|
||||
# If we need to update delta, create a new choice with updated delta
|
||||
if delta_updates:
|
||||
new_delta = delta.model_copy(update=delta_updates)
|
||||
new_choice = choice.model_copy(update={"delta": new_delta})
|
||||
normalized_choices.append(new_choice)
|
||||
else:
|
||||
normalized_choices.append(choice)
|
||||
else:
|
||||
normalized_choices.append(choice)
|
||||
|
||||
# If we modified any choices, create a new chunk with updated choices
|
||||
if any(normalized_choices[i] is not chunk.choices[i] for i in range(len(chunk.choices))):
|
||||
chunk = chunk.model_copy(update={"choices": normalized_choices})
|
||||
|
||||
return chunk
|
||||
|
||||
async def _normalize_stream(
|
||||
self, stream: AsyncIterator[OpenAIChatCompletionChunk]
|
||||
) -> AsyncIterator[OpenAIChatCompletionChunk]:
|
||||
"""
|
||||
Normalize all chunks in the stream to ensure they have expected attributes.
|
||||
This works around LiteLLM sometimes not including expected attributes.
|
||||
"""
|
||||
try:
|
||||
async for chunk in stream:
|
||||
# Normalize and yield each chunk immediately
|
||||
yield self._normalize_chunk(chunk)
|
||||
except Exception as e:
|
||||
logger.error(f"Error normalizing stream: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
async def openai_completion(
|
||||
self,
|
||||
params: OpenAICompletionRequestWithExtraBody,
|
||||
) -> OpenAICompletion:
|
||||
"""
|
||||
Override parent method to add watsonx-specific parameters.
|
||||
"""
|
||||
from llama_stack.providers.utils.inference.openai_compat import prepare_openai_completion_params
|
||||
|
||||
model_obj = await self.model_store.get_model(params.model)
|
||||
|
||||
request_params = await prepare_openai_completion_params(
|
||||
model=self.get_litellm_model_name(model_obj.provider_resource_id),
|
||||
prompt=params.prompt,
|
||||
best_of=params.best_of,
|
||||
echo=params.echo,
|
||||
frequency_penalty=params.frequency_penalty,
|
||||
logit_bias=params.logit_bias,
|
||||
logprobs=params.logprobs,
|
||||
max_tokens=params.max_tokens,
|
||||
n=params.n,
|
||||
presence_penalty=params.presence_penalty,
|
||||
seed=params.seed,
|
||||
stop=params.stop,
|
||||
stream=params.stream,
|
||||
stream_options=params.stream_options,
|
||||
temperature=params.temperature,
|
||||
top_p=params.top_p,
|
||||
user=params.user,
|
||||
suffix=params.suffix,
|
||||
api_key=self.get_api_key(),
|
||||
api_base=self.api_base,
|
||||
# These are watsonx-specific parameters
|
||||
timeout=self.config.timeout,
|
||||
project_id=self.config.project_id,
|
||||
)
|
||||
return await litellm.atext_completion(**request_params)
|
||||
|
||||
async def openai_embeddings(
|
||||
self,
|
||||
params: OpenAIEmbeddingsRequestWithExtraBody,
|
||||
) -> OpenAIEmbeddingsResponse:
|
||||
"""
|
||||
Override parent method to add watsonx-specific parameters.
|
||||
"""
|
||||
model_obj = await self.model_store.get_model(params.model)
|
||||
|
||||
# Convert input to list if it's a string
|
||||
input_list = [params.input] if isinstance(params.input, str) else params.input
|
||||
|
||||
# Call litellm embedding function with watsonx-specific parameters
|
||||
response = litellm.embedding(
|
||||
model=self.get_litellm_model_name(model_obj.provider_resource_id),
|
||||
input=input_list,
|
||||
api_key=self.get_api_key(),
|
||||
api_base=self.api_base,
|
||||
dimensions=params.dimensions,
|
||||
# These are watsonx-specific parameters
|
||||
timeout=self.config.timeout,
|
||||
project_id=self.config.project_id,
|
||||
)
|
||||
|
||||
# Convert response to OpenAI format
|
||||
from llama_stack.apis.inference import OpenAIEmbeddingUsage
|
||||
from llama_stack.providers.utils.inference.litellm_openai_mixin import b64_encode_openai_embeddings_response
|
||||
|
||||
data = b64_encode_openai_embeddings_response(response.data, params.encoding_format)
|
||||
|
||||
usage = OpenAIEmbeddingUsage(
|
||||
prompt_tokens=response["usage"]["prompt_tokens"],
|
||||
total_tokens=response["usage"]["total_tokens"],
|
||||
)
|
||||
|
||||
return OpenAIEmbeddingsResponse(
|
||||
data=data,
|
||||
model=model_obj.provider_resource_id,
|
||||
usage=usage,
|
||||
)
|
||||
self.available_models = None
|
||||
self.config = config
|
||||
|
||||
def get_base_url(self) -> str:
|
||||
return self.config.url
|
||||
|
||||
async def _get_params(self, request: ChatCompletionRequest) -> dict[str, Any]:
|
||||
# Get base parameters from parent
|
||||
params = await super()._get_params(request)
|
||||
|
||||
# Add watsonx.ai specific parameters
|
||||
params["project_id"] = self.config.project_id
|
||||
params["time_limit"] = self.config.timeout
|
||||
return params
|
||||
|
||||
# Copied from OpenAIMixin
|
||||
async def check_model_availability(self, model: str) -> bool:
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@
|
|||
import json
|
||||
from typing import Any
|
||||
|
||||
from llama_stack.apis.inference import Message
|
||||
from llama_stack.apis.inference import OpenAIMessageParam
|
||||
from llama_stack.apis.safety import (
|
||||
RunShieldResponse,
|
||||
Safety,
|
||||
|
|
@ -56,7 +56,7 @@ class BedrockSafetyAdapter(Safety, ShieldsProtocolPrivate):
|
|||
pass
|
||||
|
||||
async def run_shield(
|
||||
self, shield_id: str, messages: list[Message], params: dict[str, Any] = None
|
||||
self, shield_id: str, messages: list[OpenAIMessageParam], params: dict[str, Any] = None
|
||||
) -> RunShieldResponse:
|
||||
shield = await self.shield_store.get_shield(shield_id)
|
||||
if not shield:
|
||||
|
|
|
|||
|
|
@ -8,12 +8,11 @@ from typing import Any
|
|||
|
||||
import requests
|
||||
|
||||
from llama_stack.apis.inference import Message
|
||||
from llama_stack.apis.inference import OpenAIMessageParam
|
||||
from llama_stack.apis.safety import ModerationObject, RunShieldResponse, Safety, SafetyViolation, ViolationLevel
|
||||
from llama_stack.apis.shields import Shield
|
||||
from llama_stack.log import get_logger
|
||||
from llama_stack.providers.datatypes import ShieldsProtocolPrivate
|
||||
from llama_stack.providers.utils.inference.openai_compat import convert_message_to_openai_dict_new
|
||||
|
||||
from .config import NVIDIASafetyConfig
|
||||
|
||||
|
|
@ -44,7 +43,7 @@ class NVIDIASafetyAdapter(Safety, ShieldsProtocolPrivate):
|
|||
pass
|
||||
|
||||
async def run_shield(
|
||||
self, shield_id: str, messages: list[Message], params: dict[str, Any] | None = None
|
||||
self, shield_id: str, messages: list[OpenAIMessageParam], params: dict[str, Any] | None = None
|
||||
) -> RunShieldResponse:
|
||||
"""
|
||||
Run a safety shield check against the provided messages.
|
||||
|
|
@ -118,7 +117,7 @@ class NeMoGuardrails:
|
|||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
async def run(self, messages: list[Message]) -> RunShieldResponse:
|
||||
async def run(self, messages: list[OpenAIMessageParam]) -> RunShieldResponse:
|
||||
"""
|
||||
Queries the /v1/guardrails/checks endpoint of the NeMo guardrails deployed API.
|
||||
|
||||
|
|
@ -132,10 +131,9 @@ class NeMoGuardrails:
|
|||
Raises:
|
||||
requests.HTTPError: If the POST request fails.
|
||||
"""
|
||||
request_messages = [await convert_message_to_openai_dict_new(message) for message in messages]
|
||||
request_data = {
|
||||
"model": self.model,
|
||||
"messages": request_messages,
|
||||
"messages": [{"role": message.role, "content": message.content} for message in messages],
|
||||
"temperature": self.temperature,
|
||||
"top_p": 1,
|
||||
"frequency_penalty": 0,
|
||||
|
|
|
|||
|
|
@ -4,13 +4,12 @@
|
|||
# This source code is licensed under the terms described in the LICENSE file in
|
||||
# the root directory of this source tree.
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
import litellm
|
||||
import requests
|
||||
|
||||
from llama_stack.apis.inference import Message
|
||||
from llama_stack.apis.inference import OpenAIMessageParam
|
||||
from llama_stack.apis.safety import (
|
||||
RunShieldResponse,
|
||||
Safety,
|
||||
|
|
@ -21,7 +20,6 @@ from llama_stack.apis.shields import Shield
|
|||
from llama_stack.core.request_headers import NeedsRequestProviderData
|
||||
from llama_stack.log import get_logger
|
||||
from llama_stack.providers.datatypes import ShieldsProtocolPrivate
|
||||
from llama_stack.providers.utils.inference.openai_compat import convert_message_to_openai_dict_new
|
||||
|
||||
from .config import SambaNovaSafetyConfig
|
||||
|
||||
|
|
@ -72,7 +70,7 @@ class SambaNovaSafetyAdapter(Safety, ShieldsProtocolPrivate, NeedsRequestProvide
|
|||
pass
|
||||
|
||||
async def run_shield(
|
||||
self, shield_id: str, messages: list[Message], params: dict[str, Any] | None = None
|
||||
self, shield_id: str, messages: list[OpenAIMessageParam], params: dict[str, Any] | None = None
|
||||
) -> RunShieldResponse:
|
||||
shield = await self.shield_store.get_shield(shield_id)
|
||||
if not shield:
|
||||
|
|
@ -80,12 +78,8 @@ class SambaNovaSafetyAdapter(Safety, ShieldsProtocolPrivate, NeedsRequestProvide
|
|||
|
||||
shield_params = shield.params
|
||||
logger.debug(f"run_shield::{shield_params}::messages={messages}")
|
||||
content_messages = [await convert_message_to_openai_dict_new(m) for m in messages]
|
||||
logger.debug(f"run_shield::final:messages::{json.dumps(content_messages, indent=2)}:")
|
||||
|
||||
response = litellm.completion(
|
||||
model=shield.provider_resource_id, messages=content_messages, api_key=self._get_api_key()
|
||||
)
|
||||
response = litellm.completion(model=shield.provider_resource_id, messages=messages, api_key=self._get_api_key())
|
||||
shield_message = response.choices[0].message.content
|
||||
|
||||
if "unsafe" in shield_message.lower():
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ if TYPE_CHECKING:
|
|||
from llama_stack.apis.inference import (
|
||||
ModelStore,
|
||||
OpenAIEmbeddingData,
|
||||
OpenAIEmbeddingsRequestWithExtraBody,
|
||||
OpenAIEmbeddingsResponse,
|
||||
OpenAIEmbeddingUsage,
|
||||
)
|
||||
|
|
@ -32,26 +33,22 @@ class SentenceTransformerEmbeddingMixin:
|
|||
|
||||
async def openai_embeddings(
|
||||
self,
|
||||
model: str,
|
||||
input: str | list[str],
|
||||
encoding_format: str | None = "float",
|
||||
dimensions: int | None = None,
|
||||
user: str | None = None,
|
||||
params: OpenAIEmbeddingsRequestWithExtraBody,
|
||||
) -> OpenAIEmbeddingsResponse:
|
||||
# Convert input to list format if it's a single string
|
||||
input_list = [input] if isinstance(input, str) else input
|
||||
input_list = [params.input] if isinstance(params.input, str) else params.input
|
||||
if not input_list:
|
||||
raise ValueError("Empty list not supported")
|
||||
|
||||
# Get the model and generate embeddings
|
||||
model_obj = await self.model_store.get_model(model)
|
||||
model_obj = await self.model_store.get_model(params.model)
|
||||
embedding_model = await self._load_sentence_transformer_model(model_obj.provider_resource_id)
|
||||
embeddings = await asyncio.to_thread(embedding_model.encode, input_list, show_progress_bar=False)
|
||||
|
||||
# Convert embeddings to the requested format
|
||||
data = []
|
||||
for i, embedding in enumerate(embeddings):
|
||||
if encoding_format == "base64":
|
||||
if params.encoding_format == "base64":
|
||||
# Convert float array to base64 string
|
||||
float_bytes = struct.pack(f"{len(embedding)}f", *embedding)
|
||||
embedding_value = base64.b64encode(float_bytes).decode("ascii")
|
||||
|
|
@ -70,7 +67,7 @@ class SentenceTransformerEmbeddingMixin:
|
|||
usage = OpenAIEmbeddingUsage(prompt_tokens=-1, total_tokens=-1)
|
||||
return OpenAIEmbeddingsResponse(
|
||||
data=data,
|
||||
model=model,
|
||||
model=params.model,
|
||||
usage=usage,
|
||||
)
|
||||
|
||||
|
|
@ -86,7 +83,7 @@ class SentenceTransformerEmbeddingMixin:
|
|||
def _load_model():
|
||||
from sentence_transformers import SentenceTransformer
|
||||
|
||||
return SentenceTransformer(model)
|
||||
return SentenceTransformer(model, trust_remote_code=True)
|
||||
|
||||
loaded_model = await asyncio.to_thread(_load_model)
|
||||
EMBEDDING_MODELS[model] = loaded_model
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ from llama_stack.apis.inference import (
|
|||
OpenAICompletion,
|
||||
OpenAICompletionRequestWithExtraBody,
|
||||
OpenAIEmbeddingData,
|
||||
OpenAIEmbeddingsRequestWithExtraBody,
|
||||
OpenAIEmbeddingsResponse,
|
||||
OpenAIEmbeddingUsage,
|
||||
ToolChoice,
|
||||
|
|
@ -189,16 +190,12 @@ class LiteLLMOpenAIMixin(
|
|||
|
||||
async def openai_embeddings(
|
||||
self,
|
||||
model: str,
|
||||
input: str | list[str],
|
||||
encoding_format: str | None = "float",
|
||||
dimensions: int | None = None,
|
||||
user: str | None = None,
|
||||
params: OpenAIEmbeddingsRequestWithExtraBody,
|
||||
) -> OpenAIEmbeddingsResponse:
|
||||
model_obj = await self.model_store.get_model(model)
|
||||
model_obj = await self.model_store.get_model(params.model)
|
||||
|
||||
# Convert input to list if it's a string
|
||||
input_list = [input] if isinstance(input, str) else input
|
||||
input_list = [params.input] if isinstance(params.input, str) else params.input
|
||||
|
||||
# Call litellm embedding function
|
||||
# litellm.drop_params = True
|
||||
|
|
@ -207,11 +204,11 @@ class LiteLLMOpenAIMixin(
|
|||
input=input_list,
|
||||
api_key=self.get_api_key(),
|
||||
api_base=self.api_base,
|
||||
dimensions=dimensions,
|
||||
dimensions=params.dimensions,
|
||||
)
|
||||
|
||||
# Convert response to OpenAI format
|
||||
data = b64_encode_openai_embeddings_response(response.data, encoding_format)
|
||||
data = b64_encode_openai_embeddings_response(response.data, params.encoding_format)
|
||||
|
||||
usage = OpenAIEmbeddingUsage(
|
||||
prompt_tokens=response["usage"]["prompt_tokens"],
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ from llama_stack.apis.inference import (
|
|||
OpenAICompletion,
|
||||
OpenAICompletionRequestWithExtraBody,
|
||||
OpenAIEmbeddingData,
|
||||
OpenAIEmbeddingsRequestWithExtraBody,
|
||||
OpenAIEmbeddingsResponse,
|
||||
OpenAIEmbeddingUsage,
|
||||
OpenAIMessageParam,
|
||||
|
|
@ -316,23 +317,27 @@ class OpenAIMixin(NeedsRequestProviderData, ABC, BaseModel):
|
|||
|
||||
async def openai_embeddings(
|
||||
self,
|
||||
model: str,
|
||||
input: str | list[str],
|
||||
encoding_format: str | None = "float",
|
||||
dimensions: int | None = None,
|
||||
user: str | None = None,
|
||||
params: OpenAIEmbeddingsRequestWithExtraBody,
|
||||
) -> OpenAIEmbeddingsResponse:
|
||||
"""
|
||||
Direct OpenAI embeddings API call.
|
||||
"""
|
||||
# Prepare request parameters
|
||||
request_params = {
|
||||
"model": await self._get_provider_model_id(params.model),
|
||||
"input": params.input,
|
||||
"encoding_format": params.encoding_format if params.encoding_format is not None else NOT_GIVEN,
|
||||
"dimensions": params.dimensions if params.dimensions is not None else NOT_GIVEN,
|
||||
"user": params.user if params.user is not None else NOT_GIVEN,
|
||||
}
|
||||
|
||||
# Add extra_body if present
|
||||
extra_body = params.model_extra
|
||||
if extra_body:
|
||||
request_params["extra_body"] = extra_body
|
||||
|
||||
# Call OpenAI embeddings API with properly typed parameters
|
||||
response = await self.client.embeddings.create(
|
||||
model=await self._get_provider_model_id(model),
|
||||
input=input,
|
||||
encoding_format=encoding_format if encoding_format is not None else NOT_GIVEN,
|
||||
dimensions=dimensions if dimensions is not None else NOT_GIVEN,
|
||||
user=user if user is not None else NOT_GIVEN,
|
||||
)
|
||||
response = await self.client.embeddings.create(**request_params)
|
||||
|
||||
data = []
|
||||
for i, embedding_data in enumerate(response.data):
|
||||
|
|
@ -350,7 +355,7 @@ class OpenAIMixin(NeedsRequestProviderData, ABC, BaseModel):
|
|||
|
||||
return OpenAIEmbeddingsResponse(
|
||||
data=data,
|
||||
model=model,
|
||||
model=params.model,
|
||||
usage=usage,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import base64
|
|||
import io
|
||||
import json
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
from PIL import Image as PIL_Image
|
||||
|
|
@ -23,6 +24,9 @@ from llama_stack.apis.inference import (
|
|||
ChatCompletionRequest,
|
||||
CompletionRequest,
|
||||
Message,
|
||||
OpenAIChatCompletionContentPartImageParam,
|
||||
OpenAIChatCompletionContentPartTextParam,
|
||||
OpenAIFile,
|
||||
ResponseFormat,
|
||||
ResponseFormatType,
|
||||
SystemMessage,
|
||||
|
|
@ -74,14 +78,22 @@ def decode_assistant_message(content: str, stop_reason: StopReason) -> RawMessag
|
|||
return formatter.decode_assistant_message_from_content(content, stop_reason)
|
||||
|
||||
|
||||
def interleaved_content_as_str(content: InterleavedContent, sep: str = " ") -> str:
|
||||
def interleaved_content_as_str(
|
||||
content: Any,
|
||||
sep: str = " ",
|
||||
) -> str:
|
||||
if content is None:
|
||||
return ""
|
||||
|
||||
def _process(c) -> str:
|
||||
if isinstance(c, str):
|
||||
return c
|
||||
elif isinstance(c, ImageContentItem):
|
||||
return "<image>"
|
||||
elif isinstance(c, TextContentItem):
|
||||
elif isinstance(c, TextContentItem) or isinstance(c, OpenAIChatCompletionContentPartTextParam):
|
||||
return c.text
|
||||
elif isinstance(c, ImageContentItem) or isinstance(c, OpenAIChatCompletionContentPartImageParam):
|
||||
return "<image>"
|
||||
elif isinstance(c, OpenAIFile):
|
||||
return "<file>"
|
||||
else:
|
||||
raise ValueError(f"Unsupported content type: {type(c)}")
|
||||
|
||||
|
|
|
|||
|
|
@ -21,14 +21,27 @@ class SqliteKVStoreImpl(KVStore):
|
|||
def __init__(self, config: SqliteKVStoreConfig):
|
||||
self.db_path = config.db_path
|
||||
self.table_name = "kvstore"
|
||||
self._conn: aiosqlite.Connection | None = None
|
||||
|
||||
def __str__(self):
|
||||
return f"SqliteKVStoreImpl(db_path={self.db_path}, table_name={self.table_name})"
|
||||
|
||||
def _is_memory_db(self) -> bool:
|
||||
"""Check if this is an in-memory database."""
|
||||
return self.db_path == ":memory:" or "mode=memory" in self.db_path
|
||||
|
||||
async def initialize(self):
|
||||
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute(
|
||||
# Skip directory creation for in-memory databases and file: URIs
|
||||
if not self._is_memory_db() and not self.db_path.startswith("file:"):
|
||||
db_dir = os.path.dirname(self.db_path)
|
||||
if db_dir: # Only create if there's a directory component
|
||||
os.makedirs(db_dir, exist_ok=True)
|
||||
|
||||
# Only use persistent connection for in-memory databases
|
||||
# File-based databases use connection-per-operation to avoid hangs
|
||||
if self._is_memory_db():
|
||||
self._conn = await aiosqlite.connect(self.db_path)
|
||||
await self._conn.execute(
|
||||
f"""
|
||||
CREATE TABLE IF NOT EXISTS {self.table_name} (
|
||||
key TEXT PRIMARY KEY,
|
||||
|
|
@ -37,19 +50,50 @@ class SqliteKVStoreImpl(KVStore):
|
|||
)
|
||||
"""
|
||||
)
|
||||
await db.commit()
|
||||
await self._conn.commit()
|
||||
else:
|
||||
# For file-based databases, just create the table
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute(
|
||||
f"""
|
||||
CREATE TABLE IF NOT EXISTS {self.table_name} (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT,
|
||||
expiration TIMESTAMP
|
||||
)
|
||||
"""
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
async def shutdown(self):
|
||||
"""Close the persistent connection (only for in-memory databases)."""
|
||||
if self._conn:
|
||||
await self._conn.close()
|
||||
self._conn = None
|
||||
|
||||
async def set(self, key: str, value: str, expiration: datetime | None = None) -> None:
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute(
|
||||
if self._conn:
|
||||
# In-memory database with persistent connection
|
||||
await self._conn.execute(
|
||||
f"INSERT OR REPLACE INTO {self.table_name} (key, value, expiration) VALUES (?, ?, ?)",
|
||||
(key, value, expiration),
|
||||
)
|
||||
await db.commit()
|
||||
await self._conn.commit()
|
||||
else:
|
||||
# File-based database with connection per operation
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute(
|
||||
f"INSERT OR REPLACE INTO {self.table_name} (key, value, expiration) VALUES (?, ?, ?)",
|
||||
(key, value, expiration),
|
||||
)
|
||||
await db.commit()
|
||||
|
||||
async def get(self, key: str) -> str | None:
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
async with db.execute(f"SELECT value, expiration FROM {self.table_name} WHERE key = ?", (key,)) as cursor:
|
||||
if self._conn:
|
||||
# In-memory database with persistent connection
|
||||
async with self._conn.execute(
|
||||
f"SELECT value, expiration FROM {self.table_name} WHERE key = ?", (key,)
|
||||
) as cursor:
|
||||
row = await cursor.fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
|
|
@ -58,15 +102,36 @@ class SqliteKVStoreImpl(KVStore):
|
|||
logger.warning(f"Expected string value for key {key}, got {type(value)}, returning None")
|
||||
return None
|
||||
return value
|
||||
else:
|
||||
# File-based database with connection per operation
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
async with db.execute(
|
||||
f"SELECT value, expiration FROM {self.table_name} WHERE key = ?", (key,)
|
||||
) as cursor:
|
||||
row = await cursor.fetchone()
|
||||
if row is None:
|
||||
return None
|
||||
value, expiration = row
|
||||
if not isinstance(value, str):
|
||||
logger.warning(f"Expected string value for key {key}, got {type(value)}, returning None")
|
||||
return None
|
||||
return value
|
||||
|
||||
async def delete(self, key: str) -> None:
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute(f"DELETE FROM {self.table_name} WHERE key = ?", (key,))
|
||||
await db.commit()
|
||||
if self._conn:
|
||||
# In-memory database with persistent connection
|
||||
await self._conn.execute(f"DELETE FROM {self.table_name} WHERE key = ?", (key,))
|
||||
await self._conn.commit()
|
||||
else:
|
||||
# File-based database with connection per operation
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
await db.execute(f"DELETE FROM {self.table_name} WHERE key = ?", (key,))
|
||||
await db.commit()
|
||||
|
||||
async def values_in_range(self, start_key: str, end_key: str) -> list[str]:
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
async with db.execute(
|
||||
if self._conn:
|
||||
# In-memory database with persistent connection
|
||||
async with self._conn.execute(
|
||||
f"SELECT key, value, expiration FROM {self.table_name} WHERE key >= ? AND key <= ?",
|
||||
(start_key, end_key),
|
||||
) as cursor:
|
||||
|
|
@ -75,13 +140,35 @@ class SqliteKVStoreImpl(KVStore):
|
|||
_, value, _ = row
|
||||
result.append(value)
|
||||
return result
|
||||
else:
|
||||
# File-based database with connection per operation
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
async with db.execute(
|
||||
f"SELECT key, value, expiration FROM {self.table_name} WHERE key >= ? AND key <= ?",
|
||||
(start_key, end_key),
|
||||
) as cursor:
|
||||
result = []
|
||||
async for row in cursor:
|
||||
_, value, _ = row
|
||||
result.append(value)
|
||||
return result
|
||||
|
||||
async def keys_in_range(self, start_key: str, end_key: str) -> list[str]:
|
||||
"""Get all keys in the given range."""
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
if self._conn:
|
||||
# In-memory database with persistent connection
|
||||
cursor = await self._conn.execute(
|
||||
f"SELECT key FROM {self.table_name} WHERE key >= ? AND key <= ?",
|
||||
(start_key, end_key),
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
return [row[0] for row in rows]
|
||||
else:
|
||||
# File-based database with connection per operation
|
||||
async with aiosqlite.connect(self.db_path) as db:
|
||||
cursor = await db.execute(
|
||||
f"SELECT key FROM {self.table_name} WHERE key >= ? AND key <= ?",
|
||||
(start_key, end_key),
|
||||
)
|
||||
rows = await cursor.fetchall()
|
||||
return [row[0] for row in rows]
|
||||
|
|
|
|||
|
|
@ -10,8 +10,9 @@ import mimetypes
|
|||
import time
|
||||
import uuid
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any
|
||||
from typing import Annotated, Any
|
||||
|
||||
from fastapi import Body
|
||||
from pydantic import TypeAdapter
|
||||
|
||||
from llama_stack.apis.common.errors import VectorStoreNotFoundError
|
||||
|
|
@ -19,6 +20,8 @@ from llama_stack.apis.files import Files, OpenAIFileObject
|
|||
from llama_stack.apis.vector_dbs import VectorDB
|
||||
from llama_stack.apis.vector_io import (
|
||||
Chunk,
|
||||
OpenAICreateVectorStoreFileBatchRequestWithExtraBody,
|
||||
OpenAICreateVectorStoreRequestWithExtraBody,
|
||||
QueryChunksResponse,
|
||||
SearchRankingOptions,
|
||||
VectorStoreChunkingStrategy,
|
||||
|
|
@ -340,39 +343,39 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
|
||||
async def openai_create_vector_store(
|
||||
self,
|
||||
name: str | None = None,
|
||||
file_ids: list[str] | None = None,
|
||||
expires_after: dict[str, Any] | None = None,
|
||||
chunking_strategy: dict[str, Any] | None = None,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
embedding_model: str | None = None,
|
||||
embedding_dimension: int | None = 384,
|
||||
provider_id: str | None = None,
|
||||
provider_vector_db_id: str | None = None,
|
||||
params: Annotated[OpenAICreateVectorStoreRequestWithExtraBody, Body(...)],
|
||||
) -> VectorStoreObject:
|
||||
"""Creates a vector store."""
|
||||
created_at = int(time.time())
|
||||
|
||||
# Extract llama-stack-specific parameters from extra_body
|
||||
extra = params.model_extra or {}
|
||||
provider_vector_db_id = extra.get("provider_vector_db_id")
|
||||
embedding_model = extra.get("embedding_model")
|
||||
embedding_dimension = extra.get("embedding_dimension", 768)
|
||||
# use provider_id set by router; fallback to provider's own ID when used directly via --stack-config
|
||||
provider_id = extra.get("provider_id") or getattr(self, "__provider_id__", None)
|
||||
# Derive the canonical vector_db_id (allow override, else generate)
|
||||
vector_db_id = provider_vector_db_id or generate_object_id("vector_store", lambda: f"vs_{uuid.uuid4()}")
|
||||
|
||||
if provider_id is None:
|
||||
raise ValueError("Provider ID is required")
|
||||
|
||||
if embedding_model is None:
|
||||
raise ValueError("Embedding model is required")
|
||||
|
||||
# Embedding dimension is required (defaulted to 384 if not provided)
|
||||
# Embedding dimension is required (defaulted to 768 if not provided)
|
||||
if embedding_dimension is None:
|
||||
raise ValueError("Embedding dimension is required")
|
||||
|
||||
# Register the VectorDB backing this vector store
|
||||
if provider_id is None:
|
||||
raise ValueError("Provider ID is required but was not provided")
|
||||
|
||||
vector_db = VectorDB(
|
||||
identifier=vector_db_id,
|
||||
embedding_dimension=embedding_dimension,
|
||||
embedding_model=embedding_model,
|
||||
provider_id=provider_id,
|
||||
provider_resource_id=vector_db_id,
|
||||
vector_db_name=name,
|
||||
vector_db_name=params.name,
|
||||
)
|
||||
await self.register_vector_db(vector_db)
|
||||
|
||||
|
|
@ -391,19 +394,19 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
"id": vector_db_id,
|
||||
"object": "vector_store",
|
||||
"created_at": created_at,
|
||||
"name": name,
|
||||
"name": params.name,
|
||||
"usage_bytes": 0,
|
||||
"file_counts": file_counts.model_dump(),
|
||||
"status": status,
|
||||
"expires_after": expires_after,
|
||||
"expires_after": params.expires_after,
|
||||
"expires_at": None,
|
||||
"last_active_at": created_at,
|
||||
"file_ids": [],
|
||||
"chunking_strategy": chunking_strategy,
|
||||
"chunking_strategy": params.chunking_strategy,
|
||||
}
|
||||
|
||||
# Add provider information to metadata if provided
|
||||
metadata = metadata or {}
|
||||
metadata = params.metadata or {}
|
||||
if provider_id:
|
||||
metadata["provider_id"] = provider_id
|
||||
if provider_vector_db_id:
|
||||
|
|
@ -417,7 +420,7 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
self.openai_vector_stores[vector_db_id] = store_info
|
||||
|
||||
# Now that our vector store is created, attach any files that were provided
|
||||
file_ids = file_ids or []
|
||||
file_ids = params.file_ids or []
|
||||
tasks = [self.openai_attach_file_to_vector_store(vector_db_id, file_id) for file_id in file_ids]
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
|
|
@ -976,15 +979,13 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
async def openai_create_vector_store_file_batch(
|
||||
self,
|
||||
vector_store_id: str,
|
||||
file_ids: list[str],
|
||||
attributes: dict[str, Any] | None = None,
|
||||
chunking_strategy: VectorStoreChunkingStrategy | None = None,
|
||||
params: Annotated[OpenAICreateVectorStoreFileBatchRequestWithExtraBody, Body(...)],
|
||||
) -> VectorStoreFileBatchObject:
|
||||
"""Create a vector store file batch."""
|
||||
if vector_store_id not in self.openai_vector_stores:
|
||||
raise VectorStoreNotFoundError(vector_store_id)
|
||||
|
||||
chunking_strategy = chunking_strategy or VectorStoreChunkingStrategyAuto()
|
||||
chunking_strategy = params.chunking_strategy or VectorStoreChunkingStrategyAuto()
|
||||
|
||||
created_at = int(time.time())
|
||||
batch_id = generate_object_id("vector_store_file_batch", lambda: f"batch_{uuid.uuid4()}")
|
||||
|
|
@ -996,8 +997,8 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
completed=0,
|
||||
cancelled=0,
|
||||
failed=0,
|
||||
in_progress=len(file_ids),
|
||||
total=len(file_ids),
|
||||
in_progress=len(params.file_ids),
|
||||
total=len(params.file_ids),
|
||||
)
|
||||
|
||||
# Create batch object immediately with in_progress status
|
||||
|
|
@ -1011,8 +1012,8 @@ class OpenAIVectorStoreMixin(ABC):
|
|||
|
||||
batch_info = {
|
||||
**batch_object.model_dump(),
|
||||
"file_ids": file_ids,
|
||||
"attributes": attributes,
|
||||
"file_ids": params.file_ids,
|
||||
"attributes": params.attributes,
|
||||
"chunking_strategy": chunking_strategy.model_dump(),
|
||||
"expires_at": expires_at,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ from llama_stack.apis.common.content_types import (
|
|||
URL,
|
||||
InterleavedContent,
|
||||
)
|
||||
from llama_stack.apis.inference import OpenAIEmbeddingsRequestWithExtraBody
|
||||
from llama_stack.apis.tools import RAGDocument
|
||||
from llama_stack.apis.vector_dbs import VectorDB
|
||||
from llama_stack.apis.vector_io import Chunk, ChunkMetadata, QueryChunksResponse
|
||||
|
|
@ -274,10 +275,11 @@ class VectorDBWithIndex:
|
|||
_validate_embedding(c.embedding, i, self.vector_db.embedding_dimension)
|
||||
|
||||
if chunks_to_embed:
|
||||
resp = await self.inference_api.openai_embeddings(
|
||||
self.vector_db.embedding_model,
|
||||
[c.content for c in chunks_to_embed],
|
||||
params = OpenAIEmbeddingsRequestWithExtraBody(
|
||||
model=self.vector_db.embedding_model,
|
||||
input=[c.content for c in chunks_to_embed],
|
||||
)
|
||||
resp = await self.inference_api.openai_embeddings(params)
|
||||
for c, data in zip(chunks_to_embed, resp.data, strict=False):
|
||||
c.embedding = data.embedding
|
||||
|
||||
|
|
@ -316,7 +318,11 @@ class VectorDBWithIndex:
|
|||
if mode == "keyword":
|
||||
return await self.index.query_keyword(query_string, k, score_threshold)
|
||||
|
||||
embeddings_response = await self.inference_api.openai_embeddings(self.vector_db.embedding_model, [query_string])
|
||||
params = OpenAIEmbeddingsRequestWithExtraBody(
|
||||
model=self.vector_db.embedding_model,
|
||||
input=[query_string],
|
||||
)
|
||||
embeddings_response = await self.inference_api.openai_embeddings(params)
|
||||
query_vector = np.array(embeddings_response.data[0].embedding, dtype=np.float32)
|
||||
if mode == "hybrid":
|
||||
return await self.index.query_hybrid(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue