feat: Add support for Conversations in Responses API (#3743)

# What does this PR do?
This PR adds support for Conversations in Responses.

<!-- If resolving an issue, uncomment and update the line below -->
<!-- Closes #[issue-number] -->

## Test Plan
Unit tests
Integration tests

<Details>
<Summary>Manual testing with this script: (click to expand)</Summary>

```python
from openai import OpenAI

client = OpenAI()
client = OpenAI(base_url="http://localhost:8321/v1/", api_key="none")

def test_conversation_create():
    print("Testing conversation create...")
    conversation = client.conversations.create(
        metadata={"topic": "demo"},
        items=[
            {"type": "message", "role": "user", "content": "Hello!"}
        ]
    )
    print(f"Created: {conversation}")
    return conversation

def test_conversation_retrieve(conv_id):
    print(f"Testing conversation retrieve for {conv_id}...")
    retrieved = client.conversations.retrieve(conv_id)
    print(f"Retrieved: {retrieved}")
    return retrieved

def test_conversation_update(conv_id):
    print(f"Testing conversation update for {conv_id}...")
    updated = client.conversations.update(
        conv_id,
        metadata={"topic": "project-x"}
    )
    print(f"Updated: {updated}")
    return updated

def test_conversation_delete(conv_id):
    print(f"Testing conversation delete for {conv_id}...")
    deleted = client.conversations.delete(conv_id)
    print(f"Deleted: {deleted}")
    return deleted

def test_conversation_items_create(conv_id):
    print(f"Testing conversation items create for {conv_id}...")
    items = client.conversations.items.create(
        conv_id,
        items=[
            {
                "type": "message",
                "role": "user",
                "content": [{"type": "input_text", "text": "Hello!"}]
            },
            {
                "type": "message",
                "role": "user",
                "content": [{"type": "input_text", "text": "How are you?"}]
            }
        ]
    )
    print(f"Items created: {items}")
    return items

def test_conversation_items_list(conv_id):
    print(f"Testing conversation items list for {conv_id}...")
    items = client.conversations.items.list(conv_id, limit=10)
    print(f"Items list: {items}")
    return items

def test_conversation_item_retrieve(conv_id, item_id):
    print(f"Testing conversation item retrieve for {conv_id}/{item_id}...")
    item = client.conversations.items.retrieve(conversation_id=conv_id, item_id=item_id)
    print(f"Item retrieved: {item}")
    return item

def test_conversation_item_delete(conv_id, item_id):
    print(f"Testing conversation item delete for {conv_id}/{item_id}...")
    deleted = client.conversations.items.delete(conversation_id=conv_id, item_id=item_id)
    print(f"Item deleted: {deleted}")
    return deleted

def test_conversation_responses_create():
    print("\nTesting conversation create for a responses example...")
    conversation = client.conversations.create()
    print(f"Created: {conversation}")

    response = client.responses.create(
      model="gpt-4.1",
      input=[{"role": "user", "content": "What are the 5 Ds of dodgeball?"}],
      conversation=conversation.id,
    )
    print(f"Created response: {response} for conversation {conversation.id}")

    return response, conversation

def test_conversations_responses_create_followup(
        conversation,
        content="Repeat what you just said but add 'this is my second time saying this'",
    ):
    print(f"Using: {conversation.id}")

    response = client.responses.create(
      model="gpt-4.1",
      input=[{"role": "user", "content": content}],
      conversation=conversation.id,
    )
    print(f"Created response: {response} for conversation {conversation.id}")

    conv_items = client.conversations.items.list(conversation.id)
    print(f"\nRetrieving list of items for conversation {conversation.id}:")
    print(conv_items.model_dump_json(indent=2))

def test_response_with_fake_conv_id():
    fake_conv_id = "conv_zzzzzzzzz5dc81908289d62779d2ac510a2b0b602ef00a44"
    print(f"Using {fake_conv_id}")
    try:
        response = client.responses.create(
          model="gpt-4.1",
          input=[{"role": "user", "content": "say hello"}],
          conversation=fake_conv_id,
        )
        print(f"Created response: {response} for conversation {fake_conv_id}")
    except Exception as e:
        print(f"failed to create response for conversation {fake_conv_id} with error {e}")


def main():
    print("Testing OpenAI Conversations API...")

    # Create conversation
    conversation = test_conversation_create()
    conv_id = conversation.id

    # Retrieve conversation
    test_conversation_retrieve(conv_id)

    # Update conversation
    test_conversation_update(conv_id)

    # Create items
    items = test_conversation_items_create(conv_id)

    # List items
    items_list = test_conversation_items_list(conv_id)

    # Retrieve specific item
    if items_list.data:
        item_id = items_list.data[0].id
        test_conversation_item_retrieve(conv_id, item_id)

        # Delete item
        test_conversation_item_delete(conv_id, item_id)

    # Delete conversation
    test_conversation_delete(conv_id)

    response, conversation2 = test_conversation_responses_create()
    print('\ntesting reseponse retrieval')
    test_conversation_retrieve(conversation2.id)

    print('\ntesting responses follow up')
    test_conversations_responses_create_followup(conversation2)

    print('\ntesting responses follow up x2!')

    test_conversations_responses_create_followup(
        conversation2,
        content="Repeat what you just said but add 'this is my third time saying this'",
    )

    test_response_with_fake_conv_id()

    print("All tests completed!")


if __name__ == "__main__":
    main()
```
</Details>

---------

Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
Co-authored-by: Ashwin Bharambe <ashwin.bharambe@gmail.com>
This commit is contained in:
Francisco Arceo 2025-10-10 14:57:40 -04:00 committed by GitHub
parent 932fea813a
commit e7d21e1ee3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
41 changed files with 6221 additions and 19 deletions

View file

@ -24,6 +24,11 @@ from llama_stack.apis.agents.openai_responses import (
OpenAIResponseText,
OpenAIResponseTextFormat,
)
from llama_stack.apis.common.errors import (
InvalidConversationIdError,
)
from llama_stack.apis.conversations import Conversations
from llama_stack.apis.conversations.conversations import ConversationItem
from llama_stack.apis.inference import (
Inference,
OpenAIMessageParam,
@ -61,12 +66,14 @@ class OpenAIResponsesImpl:
tool_runtime_api: ToolRuntime,
responses_store: ResponsesStore,
vector_io_api: VectorIO, # VectorIO
conversations_api: Conversations,
):
self.inference_api = inference_api
self.tool_groups_api = tool_groups_api
self.tool_runtime_api = tool_runtime_api
self.responses_store = responses_store
self.vector_io_api = vector_io_api
self.conversations_api = conversations_api
self.tool_executor = ToolExecutor(
tool_groups_api=tool_groups_api,
tool_runtime_api=tool_runtime_api,
@ -205,6 +212,7 @@ class OpenAIResponsesImpl:
model: str,
instructions: str | None = None,
previous_response_id: str | None = None,
conversation: str | None = None,
store: bool | None = True,
stream: bool | None = False,
temperature: float | None = None,
@ -221,11 +229,27 @@ class OpenAIResponsesImpl:
if shields is not None:
raise NotImplementedError("Shields parameter is not yet implemented in the meta-reference provider")
if conversation is not None and previous_response_id is not None:
raise ValueError(
"Mutually exclusive parameters: 'previous_response_id' and 'conversation'. Ensure you are only providing one of these parameters."
)
original_input = input # needed for syncing to Conversations
if conversation is not None:
if not conversation.startswith("conv_"):
raise InvalidConversationIdError(conversation)
# Check conversation exists (raises ConversationNotFoundError if not)
_ = await self.conversations_api.get_conversation(conversation)
input = await self._load_conversation_context(conversation, input)
stream_gen = self._create_streaming_response(
input=input,
original_input=original_input,
model=model,
instructions=instructions,
previous_response_id=previous_response_id,
conversation=conversation,
store=store,
temperature=temperature,
text=text,
@ -268,8 +292,10 @@ class OpenAIResponsesImpl:
self,
input: str | list[OpenAIResponseInput],
model: str,
original_input: str | list[OpenAIResponseInput] | None = None,
instructions: str | None = None,
previous_response_id: str | None = None,
conversation: str | None = None,
store: bool | None = True,
temperature: float | None = None,
text: OpenAIResponseText | None = None,
@ -296,7 +322,7 @@ class OpenAIResponsesImpl:
)
# Create orchestrator and delegate streaming logic
response_id = f"resp-{uuid.uuid4()}"
response_id = f"resp_{uuid.uuid4()}"
created_at = int(time.time())
orchestrator = StreamingResponseOrchestrator(
@ -319,13 +345,102 @@ class OpenAIResponsesImpl:
failed_response = stream_chunk.response
yield stream_chunk
# Store the response if requested
if store and final_response and failed_response is None:
await self._store_response(
response=final_response,
input=all_input,
messages=orchestrator.final_messages,
)
# Store and sync immediately after yielding terminal events
# This ensures the storage/syncing happens even if the consumer breaks early
if (
stream_chunk.type in {"response.completed", "response.incomplete"}
and store
and final_response
and failed_response is None
):
await self._store_response(
response=final_response,
input=all_input,
messages=orchestrator.final_messages,
)
if stream_chunk.type in {"response.completed", "response.incomplete"} and conversation and final_response:
# for Conversations, we need to use the original_input if it's available, otherwise use input
sync_input = original_input if original_input is not None else input
await self._sync_response_to_conversation(conversation, sync_input, final_response)
async def delete_openai_response(self, response_id: str) -> OpenAIDeleteResponseObject:
return await self.responses_store.delete_response_object(response_id)
async def _load_conversation_context(
self, conversation_id: str, content: str | list[OpenAIResponseInput]
) -> list[OpenAIResponseInput]:
"""Load conversation history and merge with provided content."""
conversation_items = await self.conversations_api.list(conversation_id, order="asc")
context_messages = []
for item in conversation_items.data:
if isinstance(item, OpenAIResponseMessage):
if item.role == "user":
context_messages.append(
OpenAIResponseMessage(
role="user", content=item.content, id=item.id if hasattr(item, "id") else None
)
)
elif item.role == "assistant":
context_messages.append(
OpenAIResponseMessage(
role="assistant", content=item.content, id=item.id if hasattr(item, "id") else None
)
)
# add new content to context
if isinstance(content, str):
context_messages.append(OpenAIResponseMessage(role="user", content=content))
elif isinstance(content, list):
context_messages.extend(content)
return context_messages
async def _sync_response_to_conversation(
self, conversation_id: str, content: str | list[OpenAIResponseInput], response: OpenAIResponseObject
) -> None:
"""Sync content and response messages to the conversation."""
conversation_items = []
# add user content message(s)
if isinstance(content, str):
conversation_items.append(
{"type": "message", "role": "user", "content": [{"type": "input_text", "text": content}]}
)
elif isinstance(content, list):
for item in content:
if not isinstance(item, OpenAIResponseMessage):
raise NotImplementedError(f"Unsupported input item type: {type(item)}")
if item.role == "user":
if isinstance(item.content, str):
conversation_items.append(
{
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": item.content}],
}
)
elif isinstance(item.content, list):
conversation_items.append({"type": "message", "role": "user", "content": item.content})
else:
raise NotImplementedError(f"Unsupported user message content type: {type(item.content)}")
elif item.role == "assistant":
if isinstance(item.content, list):
conversation_items.append({"type": "message", "role": "assistant", "content": item.content})
else:
raise NotImplementedError(f"Unsupported assistant message content type: {type(item.content)}")
else:
raise NotImplementedError(f"Unsupported message role: {item.role}")
# add assistant response message
for output_item in response.output:
if isinstance(output_item, OpenAIResponseMessage) and output_item.role == "assistant":
if hasattr(output_item, "content") and isinstance(output_item.content, list):
conversation_items.append({"type": "message", "role": "assistant", "content": output_item.content})
if conversation_items:
adapter = TypeAdapter(list[ConversationItem])
validated_items = adapter.validate_python(conversation_items)
await self.conversations_api.add_items(conversation_id, validated_items)