Merge branch 'main' into responses-and-safety

This commit is contained in:
slekkala1 2025-10-10 12:01:44 -07:00 committed by GitHub
commit 505809c05c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
42 changed files with 6214 additions and 19 deletions

View file

@ -812,6 +812,7 @@ class Agents(Protocol):
model: str,
instructions: str | None = None,
previous_response_id: str | None = None,
conversation: str | None = None,
store: bool | None = True,
stream: bool | None = False,
temperature: float | None = None,
@ -831,6 +832,7 @@ class Agents(Protocol):
:param input: Input message(s) to create the response.
:param model: The underlying LLM used for completions.
:param previous_response_id: (Optional) if specified, the new response will be a continuation of the previous response. This can be used to easily fork-off new responses from existing responses.
:param conversation: (Optional) The ID of a conversation to add the response to. Must begin with 'conv_'. Input and output messages will be automatically added to the conversation.
:param include: (Optional) Additional fields to include in the response.
:param shields: (Optional) List of shields to apply during response generation. Can be shield IDs (strings) or shield specifications.
:returns: An OpenAIResponseObject.

View file

@ -86,3 +86,18 @@ class TokenValidationError(ValueError):
def __init__(self, message: str) -> None:
super().__init__(message)
class ConversationNotFoundError(ResourceNotFoundError):
"""raised when Llama Stack cannot find a referenced conversation"""
def __init__(self, conversation_id: str) -> None:
super().__init__(conversation_id, "Conversation", "client.conversations.list()")
class InvalidConversationIdError(ValueError):
"""raised when a conversation ID has an invalid format"""
def __init__(self, conversation_id: str) -> None:
message = f"Invalid conversation ID '{conversation_id}'. Expected an ID that begins with 'conv_'."
super().__init__(message)

View file

@ -193,12 +193,15 @@ class ConversationServiceImpl(Conversations):
await self._get_validated_conversation(conversation_id)
created_items = []
created_at = int(time.time())
base_time = int(time.time())
for item in items:
for i, item in enumerate(items):
item_dict = item.model_dump()
item_id = self._get_or_generate_item_id(item, item_dict)
# make each timestamp unique to maintain order
created_at = base_time + i
item_record = {
"id": item_id,
"conversation_id": conversation_id,

View file

@ -150,6 +150,7 @@ async def resolve_impls(
provider_registry: ProviderRegistry,
dist_registry: DistributionRegistry,
policy: list[AccessRule],
internal_impls: dict[Api, Any] | None = None,
) -> dict[Api, Any]:
"""
Resolves provider implementations by:
@ -172,7 +173,7 @@ async def resolve_impls(
sorted_providers = sort_providers_by_deps(providers_with_specs, run_config)
return await instantiate_providers(sorted_providers, router_apis, dist_registry, run_config, policy)
return await instantiate_providers(sorted_providers, router_apis, dist_registry, run_config, policy, internal_impls)
def specs_for_autorouted_apis(apis_to_serve: list[str] | set[str]) -> dict[str, dict[str, ProviderWithSpec]]:
@ -280,9 +281,10 @@ async def instantiate_providers(
dist_registry: DistributionRegistry,
run_config: StackRunConfig,
policy: list[AccessRule],
internal_impls: dict[Api, Any] | None = None,
) -> dict[Api, Any]:
"""Instantiates providers asynchronously while managing dependencies."""
impls: dict[Api, Any] = {}
impls: dict[Api, Any] = internal_impls.copy() if internal_impls else {}
inner_impls_by_provider_id: dict[str, dict[str, Any]] = {f"inner-{x.value}": {} for x in router_apis}
for api_str, provider in sorted_providers:
# Skip providers that are not enabled

View file

@ -326,12 +326,17 @@ class Stack:
dist_registry, _ = await create_dist_registry(self.run_config.metadata_store, self.run_config.image_name)
policy = self.run_config.server.auth.access_policy if self.run_config.server.auth else []
impls = await resolve_impls(
self.run_config, self.provider_registry or get_provider_registry(self.run_config), dist_registry, policy
)
# Add internal implementations after all other providers are resolved
add_internal_implementations(impls, self.run_config)
internal_impls = {}
add_internal_implementations(internal_impls, self.run_config)
impls = await resolve_impls(
self.run_config,
self.provider_registry or get_provider_registry(self.run_config),
dist_registry,
policy,
internal_impls,
)
if Api.prompts in impls:
await impls[Api.prompts].initialize()

View file

@ -224,6 +224,9 @@ metadata_store:
inference_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/ci-tests}/inference_store.db
conversations_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/ci-tests}/conversations.db
models: []
shields:
- shield_id: llama-guard

View file

@ -101,6 +101,9 @@ metadata_store:
inference_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/inference_store.db
conversations_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/conversations.db
models:
- metadata: {}
model_id: ${env.INFERENCE_MODEL}

View file

@ -97,6 +97,9 @@ metadata_store:
inference_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/inference_store.db
conversations_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/dell}/conversations.db
models:
- metadata: {}
model_id: ${env.INFERENCE_MODEL}

View file

@ -114,6 +114,9 @@ metadata_store:
inference_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/inference_store.db
conversations_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/conversations.db
models:
- metadata: {}
model_id: ${env.INFERENCE_MODEL}

View file

@ -104,6 +104,9 @@ metadata_store:
inference_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/inference_store.db
conversations_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/meta-reference-gpu}/conversations.db
models:
- metadata: {}
model_id: ${env.INFERENCE_MODEL}

View file

@ -103,6 +103,9 @@ metadata_store:
inference_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/inference_store.db
conversations_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/conversations.db
models:
- metadata: {}
model_id: ${env.INFERENCE_MODEL}

View file

@ -92,6 +92,9 @@ metadata_store:
inference_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/inference_store.db
conversations_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/nvidia}/conversations.db
models: []
shields: []
vector_dbs: []

View file

@ -134,6 +134,9 @@ metadata_store:
inference_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/open-benchmark}/inference_store.db
conversations_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/open-benchmark}/conversations.db
models:
- metadata: {}
model_id: gpt-4o

View file

@ -86,6 +86,9 @@ inference_store:
db: ${env.POSTGRES_DB:=llamastack}
user: ${env.POSTGRES_USER:=llamastack}
password: ${env.POSTGRES_PASSWORD:=llamastack}
conversations_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/postgres-demo}/conversations.db
models:
- metadata: {}
model_id: ${env.INFERENCE_MODEL}

View file

@ -227,6 +227,9 @@ metadata_store:
inference_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter-gpu}/inference_store.db
conversations_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter-gpu}/conversations.db
models: []
shields:
- shield_id: llama-guard

View file

@ -224,6 +224,9 @@ metadata_store:
inference_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter}/inference_store.db
conversations_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/starter}/conversations.db
models: []
shields:
- shield_id: llama-guard

View file

@ -181,6 +181,7 @@ class RunConfigSettings(BaseModel):
default_benchmarks: list[BenchmarkInput] | None = None
metadata_store: dict | None = None
inference_store: dict | None = None
conversations_store: dict | None = None
def run_config(
self,
@ -240,6 +241,11 @@ class RunConfigSettings(BaseModel):
__distro_dir__=f"~/.llama/distributions/{name}",
db_name="inference_store.db",
),
"conversations_store": self.conversations_store
or SqliteSqlStoreConfig.sample_run_config(
__distro_dir__=f"~/.llama/distributions/{name}",
db_name="conversations.db",
),
"models": [m.model_dump(exclude_none=True) for m in (self.default_models or [])],
"shields": [s.model_dump(exclude_none=True) for s in (self.default_shields or [])],
"vector_dbs": [],

View file

@ -107,6 +107,9 @@ metadata_store:
inference_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/watsonx}/inference_store.db
conversations_store:
type: sqlite
db_path: ${env.SQLITE_STORE_DIR:=~/.llama/distributions/watsonx}/conversations.db
models: []
shields: []
vector_dbs: []

View file

@ -30,6 +30,7 @@ CATEGORIES = [
"tools",
"client",
"telemetry",
"openai",
"openai_responses",
"openai_conversations",
"testing",

View file

@ -21,6 +21,7 @@ async def get_provider_impl(config: MetaReferenceAgentsImplConfig, deps: dict[Ap
deps[Api.safety],
deps[Api.tool_runtime],
deps[Api.tool_groups],
deps[Api.conversations],
policy,
Api.telemetry in deps,
)

View file

@ -30,6 +30,7 @@ from llama_stack.apis.agents import (
)
from llama_stack.apis.agents.openai_responses import OpenAIResponseText
from llama_stack.apis.common.responses import PaginatedResponse
from llama_stack.apis.conversations import Conversations
from llama_stack.apis.inference import (
Inference,
ToolConfig,
@ -63,6 +64,7 @@ class MetaReferenceAgentsImpl(Agents):
safety_api: Safety,
tool_runtime_api: ToolRuntime,
tool_groups_api: ToolGroups,
conversations_api: Conversations,
policy: list[AccessRule],
telemetry_enabled: bool = False,
):
@ -72,6 +74,7 @@ class MetaReferenceAgentsImpl(Agents):
self.safety_api = safety_api
self.tool_runtime_api = tool_runtime_api
self.tool_groups_api = tool_groups_api
self.conversations_api = conversations_api
self.telemetry_enabled = telemetry_enabled
self.in_memory_store = InmemoryKVStoreImpl()
@ -89,6 +92,7 @@ class MetaReferenceAgentsImpl(Agents):
responses_store=self.responses_store,
vector_io_api=self.vector_io_api,
safety_api=self.safety_api,
conversations_api=self.conversations_api,
)
async def create_agent(
@ -326,6 +330,7 @@ class MetaReferenceAgentsImpl(Agents):
model: str,
instructions: str | None = None,
previous_response_id: str | None = None,
conversation: str | None = None,
store: bool | None = True,
stream: bool | None = False,
temperature: float | None = None,
@ -340,6 +345,7 @@ class MetaReferenceAgentsImpl(Agents):
model,
instructions,
previous_response_id,
conversation,
store,
stream,
temperature,

View file

@ -27,6 +27,11 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseText,
OpenAIResponseTextFormat,
)
from llama_stack.apis.common.errors import (
InvalidConversationIdError,
)
from llama_stack.apis.conversations import Conversations
from llama_stack.apis.conversations.conversations import ConversationItem
from llama_stack.apis.inference import (
Inference,
Message,
@ -71,6 +76,7 @@ class OpenAIResponsesImpl:
responses_store: ResponsesStore,
vector_io_api: VectorIO, # VectorIO
safety_api: Safety,
conversations_api: Conversations,
):
self.inference_api = inference_api
self.tool_groups_api = tool_groups_api
@ -78,6 +84,7 @@ class OpenAIResponsesImpl:
self.responses_store = responses_store
self.vector_io_api = vector_io_api
self.safety_api = safety_api
self.conversations_api = conversations_api
self.tool_executor = ToolExecutor(
tool_groups_api=tool_groups_api,
tool_runtime_api=tool_runtime_api,
@ -216,6 +223,7 @@ class OpenAIResponsesImpl:
model: str,
instructions: str | None = None,
previous_response_id: str | None = None,
conversation: str | None = None,
store: bool | None = True,
stream: bool | None = False,
temperature: float | None = None,
@ -230,11 +238,27 @@ class OpenAIResponsesImpl:
shield_ids = extract_shield_ids(shields) if shields else []
if conversation is not None and previous_response_id is not None:
raise ValueError(
"Mutually exclusive parameters: 'previous_response_id' and 'conversation'. Ensure you are only providing one of these parameters."
)
original_input = input # needed for syncing to Conversations
if conversation is not None:
if not conversation.startswith("conv_"):
raise InvalidConversationIdError(conversation)
# Check conversation exists (raises ConversationNotFoundError if not)
_ = await self.conversations_api.get_conversation(conversation)
input = await self._load_conversation_context(conversation, input)
stream_gen = self._create_streaming_response(
input=input,
original_input=original_input,
model=model,
instructions=instructions,
previous_response_id=previous_response_id,
conversation=conversation,
store=store,
temperature=temperature,
text=text,
@ -314,8 +338,10 @@ class OpenAIResponsesImpl:
self,
input: str | list[OpenAIResponseInput],
model: str,
original_input: str | list[OpenAIResponseInput] | None = None,
instructions: str | None = None,
previous_response_id: str | None = None,
conversation: str | None = None,
store: bool | None = True,
temperature: float | None = None,
text: OpenAIResponseText | None = None,
@ -358,7 +384,7 @@ class OpenAIResponsesImpl:
)
# Create orchestrator and delegate streaming logic
response_id = f"resp-{uuid.uuid4()}"
response_id = f"resp_{uuid.uuid4()}"
created_at = int(time.time())
orchestrator = StreamingResponseOrchestrator(
@ -384,13 +410,102 @@ class OpenAIResponsesImpl:
failed_response = stream_chunk.response
yield stream_chunk
# Store the response if requested
if store and final_response and failed_response is None:
await self._store_response(
response=final_response,
input=all_input,
messages=orchestrator.final_messages,
)
# Store and sync immediately after yielding terminal events
# This ensures the storage/syncing happens even if the consumer breaks early
if (
stream_chunk.type in {"response.completed", "response.incomplete"}
and store
and final_response
and failed_response is None
):
await self._store_response(
response=final_response,
input=all_input,
messages=orchestrator.final_messages,
)
if stream_chunk.type in {"response.completed", "response.incomplete"} and conversation and final_response:
# for Conversations, we need to use the original_input if it's available, otherwise use input
sync_input = original_input if original_input is not None else input
await self._sync_response_to_conversation(conversation, sync_input, final_response)
async def delete_openai_response(self, response_id: str) -> OpenAIDeleteResponseObject:
return await self.responses_store.delete_response_object(response_id)
async def _load_conversation_context(
self, conversation_id: str, content: str | list[OpenAIResponseInput]
) -> list[OpenAIResponseInput]:
"""Load conversation history and merge with provided content."""
conversation_items = await self.conversations_api.list(conversation_id, order="asc")
context_messages = []
for item in conversation_items.data:
if isinstance(item, OpenAIResponseMessage):
if item.role == "user":
context_messages.append(
OpenAIResponseMessage(
role="user", content=item.content, id=item.id if hasattr(item, "id") else None
)
)
elif item.role == "assistant":
context_messages.append(
OpenAIResponseMessage(
role="assistant", content=item.content, id=item.id if hasattr(item, "id") else None
)
)
# add new content to context
if isinstance(content, str):
context_messages.append(OpenAIResponseMessage(role="user", content=content))
elif isinstance(content, list):
context_messages.extend(content)
return context_messages
async def _sync_response_to_conversation(
self, conversation_id: str, content: str | list[OpenAIResponseInput], response: OpenAIResponseObject
) -> None:
"""Sync content and response messages to the conversation."""
conversation_items = []
# add user content message(s)
if isinstance(content, str):
conversation_items.append(
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": content}]}
)
elif isinstance(content, list):
for item in content:
if not isinstance(item, OpenAIResponseMessage):
raise NotImplementedError(f"Unsupported input item type: {type(item)}")
if item.role == "user":
if isinstance(item.content, str):
conversation_items.append(
{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": item.content}],
}
)
elif isinstance(item.content, list):
conversation_items.append({"type": "message", "role": "user", "content": item.content})
else:
raise NotImplementedError(f"Unsupported user message content type: {type(item.content)}")
elif item.role == "assistant":
if isinstance(item.content, list):
conversation_items.append({"type": "message", "role": "assistant", "content": item.content})
else:
raise NotImplementedError(f"Unsupported assistant message content type: {type(item.content)}")
else:
raise NotImplementedError(f"Unsupported message role: {item.role}")
# add assistant response message
for output_item in response.output:
if isinstance(output_item, OpenAIResponseMessage) and output_item.role == "assistant":
if hasattr(output_item, "content") and isinstance(output_item.content, list):
conversation_items.append({"type": "message", "role": "assistant", "content": output_item.content})
if conversation_items:
adapter = TypeAdapter(list[ConversationItem])
validated_items = adapter.validate_python(conversation_items)
await self.conversations_api.add_items(conversation_id, validated_items)

View file

@ -35,6 +35,7 @@ def available_providers() -> list[ProviderSpec]:
Api.vector_dbs,
Api.tool_runtime,
Api.tool_groups,
Api.conversations,
],
optional_api_dependencies=[
Api.telemetry,