llama-stack-mirror/llama_stack/core/conversations/conversations.py
dependabot[bot] 8885cea8d7
fix(conversations)!: update Conversations API definitions (was: bump openai from 1.107.0 to 2.5.0) (#3847)
Bumps [openai](https://github.com/openai/openai-python) from 1.107.0 to
2.5.0.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a
href="https://github.com/openai/openai-python/releases">openai's
releases</a>.</em></p>
<blockquote>
<h2>v2.5.0</h2>
<h2>2.5.0 (2025-10-17)</h2>
<p>Full Changelog: <a
href="https://github.com/openai/openai-python/compare/v2.4.0...v2.5.0">v2.4.0...v2.5.0</a></p>
<h3>Features</h3>
<ul>
<li><strong>api:</strong> api update (<a
href="8b280d57d6">8b280d5</a>)</li>
</ul>
<h3>Chores</h3>
<ul>
<li>bump <code>httpx-aiohttp</code> version to 0.1.9 (<a
href="67f2f0afe5">67f2f0a</a>)</li>
</ul>
<h2>v2.4.0</h2>
<h2>2.4.0 (2025-10-16)</h2>
<p>Full Changelog: <a
href="https://github.com/openai/openai-python/compare/v2.3.0...v2.4.0">v2.3.0...v2.4.0</a></p>
<h3>Features</h3>
<ul>
<li><strong>api:</strong> Add support for gpt-4o-transcribe-diarize on
audio/transcriptions endpoint (<a
href="bdbe9b8f44">bdbe9b8</a>)</li>
</ul>
<h3>Chores</h3>
<ul>
<li>fix dangling comment (<a
href="da14e99606">da14e99</a>)</li>
<li><strong>internal:</strong> detect missing future annotations with
ruff (<a
href="2672b8f072">2672b8f</a>)</li>
</ul>
<h2>v2.3.0</h2>
<h2>2.3.0 (2025-10-10)</h2>
<p>Full Changelog: <a
href="https://github.com/openai/openai-python/compare/v2.2.0...v2.3.0">v2.2.0...v2.3.0</a></p>
<h3>Features</h3>
<ul>
<li><strong>api:</strong> comparison filter in/not in (<a
href="aa49f626a6">aa49f62</a>)</li>
</ul>
<h3>Chores</h3>
<ul>
<li><strong>package:</strong> bump jiter to &gt;=0.10.0 to support
Python 3.14 (<a
href="https://redirect.github.com/openai/openai-python/issues/2618">#2618</a>)
(<a
href="aa445cab5c">aa445ca</a>)</li>
</ul>
<h2>v2.2.0</h2>
<h2>2.2.0 (2025-10-06)</h2>
<p>Full Changelog: <a
href="https://github.com/openai/openai-python/compare/v2.1.0...v2.2.0">v2.1.0...v2.2.0</a></p>
<h3>Features</h3>
<!-- raw HTML omitted -->
</blockquote>
<p>... (truncated)</p>
</details>
<details>
<summary>Changelog</summary>
<p><em>Sourced from <a
href="https://github.com/openai/openai-python/blob/main/CHANGELOG.md">openai's
changelog</a>.</em></p>
<blockquote>
<h2>2.5.0 (2025-10-17)</h2>
<p>Full Changelog: <a
href="https://github.com/openai/openai-python/compare/v2.4.0...v2.5.0">v2.4.0...v2.5.0</a></p>
<h3>Features</h3>
<ul>
<li><strong>api:</strong> api update (<a
href="8b280d57d6">8b280d5</a>)</li>
</ul>
<h3>Chores</h3>
<ul>
<li>bump <code>httpx-aiohttp</code> version to 0.1.9 (<a
href="67f2f0afe5">67f2f0a</a>)</li>
</ul>
<h2>2.4.0 (2025-10-16)</h2>
<p>Full Changelog: <a
href="https://github.com/openai/openai-python/compare/v2.3.0...v2.4.0">v2.3.0...v2.4.0</a></p>
<h3>Features</h3>
<ul>
<li><strong>api:</strong> Add support for gpt-4o-transcribe-diarize on
audio/transcriptions endpoint (<a
href="bdbe9b8f44">bdbe9b8</a>)</li>
</ul>
<h3>Chores</h3>
<ul>
<li>fix dangling comment (<a
href="da14e99606">da14e99</a>)</li>
<li><strong>internal:</strong> detect missing future annotations with
ruff (<a
href="2672b8f072">2672b8f</a>)</li>
</ul>
<h2>2.3.0 (2025-10-10)</h2>
<p>Full Changelog: <a
href="https://github.com/openai/openai-python/compare/v2.2.0...v2.3.0">v2.2.0...v2.3.0</a></p>
<h3>Features</h3>
<ul>
<li><strong>api:</strong> comparison filter in/not in (<a
href="aa49f626a6">aa49f62</a>)</li>
</ul>
<h3>Chores</h3>
<ul>
<li><strong>package:</strong> bump jiter to &gt;=0.10.0 to support
Python 3.14 (<a
href="https://redirect.github.com/openai/openai-python/issues/2618">#2618</a>)
(<a
href="aa445cab5c">aa445ca</a>)</li>
</ul>
<h2>2.2.0 (2025-10-06)</h2>
<p>Full Changelog: <a
href="https://github.com/openai/openai-python/compare/v2.1.0...v2.2.0">v2.1.0...v2.2.0</a></p>
<h3>Features</h3>
<ul>
<li><strong>api:</strong> dev day 2025 launches (<a
href="38ac0093eb">38ac009</a>)</li>
</ul>
<h3>Bug Fixes</h3>
<!-- raw HTML omitted -->
</blockquote>
<p>... (truncated)</p>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a
href="513ae76253"><code>513ae76</code></a>
release: 2.5.0 (<a
href="https://redirect.github.com/openai/openai-python/issues/2694">#2694</a>)</li>
<li><a
href="ebf32212f7"><code>ebf3221</code></a>
release: 2.4.0</li>
<li><a
href="e043d7b164"><code>e043d7b</code></a>
chore: fix dangling comment</li>
<li><a
href="25cbb74f83"><code>25cbb74</code></a>
feat(api): Add support for gpt-4o-transcribe-diarize on
audio/transcriptions ...</li>
<li><a
href="8cdfd0650e"><code>8cdfd06</code></a>
codegen metadata</li>
<li><a
href="d5c64434b7"><code>d5c6443</code></a>
codegen metadata</li>
<li><a
href="b20a9e7b81"><code>b20a9e7</code></a>
chore(internal): detect missing future annotations with ruff</li>
<li><a
href="e5f93f5dae"><code>e5f93f5</code></a>
release: 2.3.0</li>
<li><a
href="044878859c"><code>0448788</code></a>
feat(api): comparison filter in/not in</li>
<li><a
href="85a91ade61"><code>85a91ad</code></a>
chore(package): bump jiter to &gt;=0.10.0 to support Python 3.14 (<a
href="https://redirect.github.com/openai/openai-python/issues/2618">#2618</a>)</li>
<li>Additional commits viewable in <a
href="https://github.com/openai/openai-python/compare/v1.107.0...v2.5.0">compare
view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=openai&package-manager=uv&previous-version=1.107.0&new-version=2.5.0)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't
alter it yourself. You can also trigger a rebase manually by commenting
`@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits
that have been made to it
- `@dependabot merge` will merge this PR after your CI passes on it
- `@dependabot squash and merge` will squash and merge this PR after
your CI passes on it
- `@dependabot cancel merge` will cancel a previously requested merge
and block automerging
- `@dependabot reopen` will reopen this PR if it is closed
- `@dependabot close` will close this PR and stop Dependabot recreating
it. You can achieve the same result by closing it manually
- `@dependabot show <dependency name> ignore conditions` will show all
of the ignore conditions of the specified dependency
- `@dependabot ignore this major version` will close this PR and stop
Dependabot creating any more for this major version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop
Dependabot creating any more for this minor version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop
Dependabot creating any more for this dependency (unless you reopen the
PR or upgrade to it yourself)


</details>

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Ashwin Bharambe <ashwin.bharambe@gmail.com>
2025-10-22 12:32:48 -07:00

314 lines
12 KiB
Python

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import secrets
import time
from typing import Any, Literal
from pydantic import BaseModel, TypeAdapter
from llama_stack.apis.conversations.conversations import (
Conversation,
ConversationDeletedResource,
ConversationItem,
ConversationItemDeletedResource,
ConversationItemInclude,
ConversationItemList,
Conversations,
Metadata,
)
from llama_stack.core.datatypes import AccessRule, StackRunConfig
from llama_stack.log import get_logger
from llama_stack.providers.utils.sqlstore.api import ColumnDefinition, ColumnType
from llama_stack.providers.utils.sqlstore.authorized_sqlstore import AuthorizedSqlStore
from llama_stack.providers.utils.sqlstore.sqlstore import sqlstore_impl
logger = get_logger(name=__name__, category="openai_conversations")
class ConversationServiceConfig(BaseModel):
"""Configuration for the built-in conversation service.
:param run_config: Stack run configuration for resolving persistence
:param policy: Access control rules
"""
run_config: StackRunConfig
policy: list[AccessRule] = []
async def get_provider_impl(config: ConversationServiceConfig, deps: dict[Any, Any]):
"""Get the conversation service implementation."""
impl = ConversationServiceImpl(config, deps)
await impl.initialize()
return impl
class ConversationServiceImpl(Conversations):
"""Built-in conversation service implementation using AuthorizedSqlStore."""
def __init__(self, config: ConversationServiceConfig, deps: dict[Any, Any]):
self.config = config
self.deps = deps
self.policy = config.policy
# Use conversations store reference from run config
conversations_ref = config.run_config.storage.stores.conversations
if not conversations_ref:
raise ValueError("storage.stores.conversations must be configured in run config")
base_sql_store = sqlstore_impl(conversations_ref)
self.sql_store = AuthorizedSqlStore(base_sql_store, self.policy)
async def initialize(self) -> None:
"""Initialize the store and create tables."""
await self.sql_store.create_table(
"openai_conversations",
{
"id": ColumnDefinition(type=ColumnType.STRING, primary_key=True),
"created_at": ColumnType.INTEGER,
"items": ColumnType.JSON,
"metadata": ColumnType.JSON,
},
)
await self.sql_store.create_table(
"conversation_items",
{
"id": ColumnDefinition(type=ColumnType.STRING, primary_key=True),
"conversation_id": ColumnType.STRING,
"created_at": ColumnType.INTEGER,
"item_data": ColumnType.JSON,
},
)
async def create_conversation(
self, items: list[ConversationItem] | None = None, metadata: Metadata | None = None
) -> Conversation:
"""Create a conversation."""
random_bytes = secrets.token_bytes(24)
conversation_id = f"conv_{random_bytes.hex()}"
created_at = int(time.time())
record_data = {
"id": conversation_id,
"created_at": created_at,
"items": [],
"metadata": metadata,
}
await self.sql_store.insert(
table="openai_conversations",
data=record_data,
)
if items:
item_records = []
for item in items:
item_dict = item.model_dump()
item_id = self._get_or_generate_item_id(item, item_dict)
item_record = {
"id": item_id,
"conversation_id": conversation_id,
"created_at": created_at,
"item_data": item_dict,
}
item_records.append(item_record)
await self.sql_store.insert(table="conversation_items", data=item_records)
conversation = Conversation(
id=conversation_id,
created_at=created_at,
metadata=metadata,
object="conversation",
)
logger.debug(f"Created conversation {conversation_id}")
return conversation
async def get_conversation(self, conversation_id: str) -> Conversation:
"""Get a conversation with the given ID."""
record = await self.sql_store.fetch_one(table="openai_conversations", where={"id": conversation_id})
if record is None:
raise ValueError(f"Conversation {conversation_id} not found")
return Conversation(
id=record["id"], created_at=record["created_at"], metadata=record.get("metadata"), object="conversation"
)
async def update_conversation(self, conversation_id: str, metadata: Metadata) -> Conversation:
"""Update a conversation's metadata with the given ID"""
await self.sql_store.update(
table="openai_conversations", data={"metadata": metadata}, where={"id": conversation_id}
)
return await self.get_conversation(conversation_id)
async def openai_delete_conversation(self, conversation_id: str) -> ConversationDeletedResource:
"""Delete a conversation with the given ID."""
await self.sql_store.delete(table="openai_conversations", where={"id": conversation_id})
logger.debug(f"Deleted conversation {conversation_id}")
return ConversationDeletedResource(id=conversation_id)
def _validate_conversation_id(self, conversation_id: str) -> None:
"""Validate conversation ID format."""
if not conversation_id.startswith("conv_"):
raise ValueError(
f"Invalid 'conversation_id': '{conversation_id}'. Expected an ID that begins with 'conv_'."
)
def _get_or_generate_item_id(self, item: ConversationItem, item_dict: dict) -> str:
"""Get existing item ID or generate one if missing."""
if item.id is None:
random_bytes = secrets.token_bytes(24)
if item.type == "message":
item_id = f"msg_{random_bytes.hex()}"
else:
item_id = f"item_{random_bytes.hex()}"
item_dict["id"] = item_id
return item_id
return item.id
async def _get_validated_conversation(self, conversation_id: str) -> Conversation:
"""Validate conversation ID and return the conversation if it exists."""
self._validate_conversation_id(conversation_id)
return await self.get_conversation(conversation_id)
async def add_items(self, conversation_id: str, items: list[ConversationItem]) -> ConversationItemList:
"""Create (add) items to a conversation."""
await self._get_validated_conversation(conversation_id)
created_items = []
base_time = int(time.time())
for i, item in enumerate(items):
item_dict = item.model_dump()
item_id = self._get_or_generate_item_id(item, item_dict)
# make each timestamp unique to maintain order
created_at = base_time + i
item_record = {
"id": item_id,
"conversation_id": conversation_id,
"created_at": created_at,
"item_data": item_dict,
}
# TODO: Add support for upsert in sql_store, this will fail first if ID exists and then update
try:
await self.sql_store.insert(table="conversation_items", data=item_record)
except Exception:
# If insert fails due to ID conflict, update existing record
await self.sql_store.update(
table="conversation_items",
data={"created_at": created_at, "item_data": item_dict},
where={"id": item_id},
)
created_items.append(item_dict)
logger.debug(f"Created {len(created_items)} items in conversation {conversation_id}")
# Convert created items (dicts) to proper ConversationItem types
adapter: TypeAdapter[ConversationItem] = TypeAdapter(ConversationItem)
response_items: list[ConversationItem] = [adapter.validate_python(item_dict) for item_dict in created_items]
return ConversationItemList(
data=response_items,
first_id=created_items[0]["id"] if created_items else None,
last_id=created_items[-1]["id"] if created_items else None,
has_more=False,
)
async def retrieve(self, conversation_id: str, item_id: str) -> ConversationItem:
"""Retrieve a conversation item."""
if not conversation_id:
raise ValueError(f"Expected a non-empty value for `conversation_id` but received {conversation_id!r}")
if not item_id:
raise ValueError(f"Expected a non-empty value for `item_id` but received {item_id!r}")
# Get item from conversation_items table
record = await self.sql_store.fetch_one(
table="conversation_items", where={"id": item_id, "conversation_id": conversation_id}
)
if record is None:
raise ValueError(f"Item {item_id} not found in conversation {conversation_id}")
adapter: TypeAdapter[ConversationItem] = TypeAdapter(ConversationItem)
return adapter.validate_python(record["item_data"])
async def list_items(
self,
conversation_id: str,
after: str | None = None,
include: list[ConversationItemInclude] | None = None,
limit: int | None = None,
order: Literal["asc", "desc"] | None = None,
) -> ConversationItemList:
"""List items in the conversation."""
if not conversation_id:
raise ValueError(f"Expected a non-empty value for `conversation_id` but received {conversation_id!r}")
# check if conversation exists
await self.get_conversation(conversation_id)
result = await self.sql_store.fetch_all(table="conversation_items", where={"conversation_id": conversation_id})
records = result.data
if order is not None and order == "asc":
records.sort(key=lambda x: x["created_at"])
else:
records.sort(key=lambda x: x["created_at"], reverse=True)
actual_limit = limit or 20
records = records[:actual_limit]
items = [record["item_data"] for record in records]
adapter: TypeAdapter[ConversationItem] = TypeAdapter(ConversationItem)
response_items: list[ConversationItem] = [adapter.validate_python(item) for item in items]
first_id = response_items[0].id if response_items else None
last_id = response_items[-1].id if response_items else None
return ConversationItemList(
data=response_items,
first_id=first_id,
last_id=last_id,
has_more=False,
)
async def openai_delete_conversation_item(
self, conversation_id: str, item_id: str
) -> ConversationItemDeletedResource:
"""Delete a conversation item."""
if not conversation_id:
raise ValueError(f"Expected a non-empty value for `conversation_id` but received {conversation_id!r}")
if not item_id:
raise ValueError(f"Expected a non-empty value for `item_id` but received {item_id!r}")
_ = await self._get_validated_conversation(conversation_id)
record = await self.sql_store.fetch_one(
table="conversation_items", where={"id": item_id, "conversation_id": conversation_id}
)
if record is None:
raise ValueError(f"Item {item_id} not found in conversation {conversation_id}")
await self.sql_store.delete(
table="conversation_items", where={"id": item_id, "conversation_id": conversation_id}
)
logger.debug(f"Deleted item {item_id} from conversation {conversation_id}")
return ConversationItemDeletedResource(id=item_id)