llama-stack-mirror/tests/integration/agents/test_openai_responses.py
Eric Huang f39d1732ea list responses
# What does this PR do?


## Test Plan
2025-05-23 13:00:59 -07:00

97 lines
2.9 KiB
Python

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from urllib.parse import urljoin
import pytest
import requests
from openai import OpenAI
from llama_stack.distribution.library_client import LlamaStackAsLibraryClient
@pytest.fixture
def openai_client(client_with_models):
base_url = f"{client_with_models.base_url}/v1/openai/v1"
return OpenAI(base_url=base_url, api_key="bar")
@pytest.mark.parametrize(
"stream",
[
True,
False,
],
)
@pytest.mark.parametrize(
"tools",
[
[],
[
{
"type": "function",
"name": "get_weather",
"description": "Get the weather in a given city",
"parameters": {
"type": "object",
"properties": {
"city": {"type": "string", "description": "The city to get the weather for"},
},
},
}
],
],
)
def test_responses_store(openai_client, client_with_models, text_model_id, stream, tools):
if isinstance(client_with_models, LlamaStackAsLibraryClient):
pytest.skip("OpenAI responses are not supported when testing with library client yet.")
client = openai_client
message = "What's the weather in Tokyo?" + (
" YOU MUST USE THE get_weather function to get the weather." if tools else ""
)
response = client.responses.create(
model=text_model_id,
input=[
{
"role": "user",
"content": message,
}
],
stream=stream,
tools=tools,
)
if stream:
# accumulate the streamed content
content = ""
response_id = None
for chunk in response:
if response_id is None:
response_id = chunk.response.id
if not tools:
if chunk.type == "response.completed":
response_id = chunk.response.id
content = chunk.response.output[0].content[0].text
else:
response_id = response.id
if not tools:
content = response.output[0].content[0].text
# list responses is not available in the SDK
url = urljoin(str(client.base_url), "responses")
response = requests.get(url, headers={"Authorization": f"Bearer {client.api_key}"})
assert response.status_code == 200
data = response.json()["data"]
assert response_id in [r["id"] for r in data]
# test retrieve response
retrieved_response = client.responses.retrieve(response_id)
assert retrieved_response.id == response_id
assert retrieved_response.model == text_model_id
if tools:
assert retrieved_response.output[0].type == "function_call"
else:
assert retrieved_response.output[0].content[0].text == content