OpenAPI Responses - move tests under tests/verifications

This moves the OpenAI Responses API tests under
tests/verifications/openai_api/test_response.py and starts to wire
them up to our verification suite, so that we can test multiple
providers as well as OpenAI directly for the Responses API.

Signed-off-by: Ben Browning <bbrownin@redhat.com>
This commit is contained in:
Ben Browning 2025-04-18 15:26:34 -04:00 committed by Ashwin Bharambe
parent 591e6a3972
commit 207224a811
14 changed files with 353 additions and 273 deletions

View file

@ -75,11 +75,27 @@ class OpenAIResponseObject(BaseModel):
@json_schema_type
class OpenAIResponseObjectStream(BaseModel):
class OpenAIResponseObjectStreamResponseCreated(BaseModel):
response: OpenAIResponseObject
type: Literal["response.created"] = "response.created"
@json_schema_type
class OpenAIResponseObjectStreamResponseCompleted(BaseModel):
response: OpenAIResponseObject
type: Literal["response.completed"] = "response.completed"
OpenAIResponseObjectStream = Annotated[
Union[
OpenAIResponseObjectStreamResponseCreated,
OpenAIResponseObjectStreamResponseCompleted,
],
Field(discriminator="type"),
]
register_schema(OpenAIResponseObjectStream, name="OpenAIResponseObjectStream")
@json_schema_type
class OpenAIResponseInputMessageContentText(BaseModel):
text: str
@ -112,6 +128,7 @@ class OpenAIResponseInputMessage(BaseModel):
@json_schema_type
class OpenAIResponseInputToolWebSearch(BaseModel):
type: Literal["web_search", "web_search_preview_2025_03_11"] = "web_search"
# TODO: actually use search_context_size somewhere...
search_context_size: Optional[str] = Field(default="medium", pattern="^low|medium|high$")
# TODO: add user_location

View file

@ -33,6 +33,8 @@ from llama_stack.apis.openai_responses.openai_responses import (
OpenAIResponseInputTool,
OpenAIResponseObject,
OpenAIResponseObjectStream,
OpenAIResponseObjectStreamResponseCompleted,
OpenAIResponseObjectStreamResponseCreated,
OpenAIResponseOutput,
OpenAIResponseOutputMessage,
OpenAIResponseOutputMessageContentOutputText,
@ -174,7 +176,8 @@ class OpenAIResponsesImpl(OpenAIResponses):
for chunk_choice in chunk.choices:
# TODO: this only works for text content
chat_response_content.append(chunk_choice.delta.content or "")
chunk_finish_reason = chunk_choice.finish_reason
if chunk_choice.finish_reason:
chunk_finish_reason = chunk_choice.finish_reason
assistant_message = OpenAIAssistantMessageParam(content="".join(chat_response_content))
chat_response = OpenAIChatCompletion(
id=chat_response_id,
@ -219,7 +222,9 @@ class OpenAIResponsesImpl(OpenAIResponses):
if stream:
async def async_response() -> AsyncIterator[OpenAIResponseObjectStream]:
yield OpenAIResponseObjectStream(response=response)
# TODO: response created should actually get emitted much earlier in the process
yield OpenAIResponseObjectStreamResponseCreated(response=response)
yield OpenAIResponseObjectStreamResponseCompleted(response=response)
return async_response()
@ -270,40 +275,40 @@ class OpenAIResponsesImpl(OpenAIResponses):
# Add the assistant message with tool_calls response to the messages list
messages.append(choice.message)
# TODO: handle multiple tool calls
tool_call = choice.message.tool_calls[0]
tool_call_id = tool_call.id
function = tool_call.function
for tool_call in choice.message.tool_calls:
tool_call_id = tool_call.id
function = tool_call.function
# If for some reason the tool call doesn't have a function or id, we can't execute it
if not function or not tool_call_id:
return output_messages
# If for some reason the tool call doesn't have a function or id, we can't execute it
if not function or not tool_call_id:
continue
# TODO: telemetry spans for tool calls
result = await self._execute_tool_call(function)
# TODO: telemetry spans for tool calls
result = await self._execute_tool_call(function)
# Handle tool call failure
if not result:
output_messages.append(
OpenAIResponseOutputMessageWebSearchToolCall(
id=tool_call_id,
status="failed",
)
)
continue
# Handle tool call failure
if not result:
output_messages.append(
OpenAIResponseOutputMessageWebSearchToolCall(
id=tool_call_id,
status="failed",
)
status="completed",
),
)
return output_messages
output_messages.append(
OpenAIResponseOutputMessageWebSearchToolCall(
id=tool_call_id,
status="completed",
),
)
result_content = ""
# TODO: handle other result content types and lists
if isinstance(result.content, str):
result_content = result.content
messages.append(OpenAIToolMessageParam(content=result_content, tool_call_id=tool_call_id))
result_content = ""
# TODO: handle other result content types and lists
if isinstance(result.content, str):
result_content = result.content
messages.append(OpenAIToolMessageParam(content=result_content, tool_call_id=tool_call_id))
tool_results_chat_response = await self.inference_api.openai_chat_completion(
model=model_id,
messages=messages,

View file

@ -1,5 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.

View file

@ -1,83 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import pytest
from ..test_cases.test_case import TestCase
@pytest.mark.parametrize(
"test_case",
[
"openai:responses:non_streaming_01",
"openai:responses:non_streaming_02",
],
)
def test_basic_non_streaming(openai_client, client_with_models, text_model_id, test_case):
tc = TestCase(test_case)
question = tc["question"]
expected = tc["expected"]
response = openai_client.responses.create(
model=text_model_id,
input=question,
stream=False,
)
output_text = response.output_text.lower().strip()
assert len(output_text) > 0
assert expected.lower() in output_text
retrieved_response = openai_client.responses.retrieve(response_id=response.id)
assert retrieved_response.output_text == response.output_text
next_response = openai_client.responses.create(
model=text_model_id, input="Repeat your previous response in all caps.", previous_response_id=response.id
)
next_output_text = next_response.output_text.strip()
assert expected.upper() in next_output_text
@pytest.mark.parametrize(
"test_case",
[
"openai:responses:streaming_01",
"openai:responses:streaming_02",
],
)
def test_basic_streaming(openai_client, client_with_models, text_model_id, test_case):
tc = TestCase(test_case)
question = tc["question"]
expected = tc["expected"]
response = openai_client.responses.create(
model=text_model_id,
input=question,
stream=True,
timeout=120, # Increase timeout to 2 minutes for large conversation history
)
streamed_content = []
response_id = ""
for chunk in response:
response_id = chunk.response.id
streamed_content.append(chunk.response.output_text.strip())
assert len(streamed_content) > 0
assert expected.lower() in "".join(streamed_content).lower()
retrieved_response = openai_client.responses.retrieve(response_id=response_id)
assert retrieved_response.output_text == "".join(streamed_content)
next_response = openai_client.responses.create(
model=text_model_id,
input="Repeat your previous response in all caps.",
previous_response_id=response_id,
stream=True,
)
next_streamed_content = []
for chunk in next_response:
next_streamed_content.append(chunk.response.output_text.strip())
assert expected.upper() in "".join(next_streamed_content)

View file

@ -1,101 +0,0 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import pytest
from ..test_cases.test_case import TestCase
@pytest.mark.parametrize(
"test_case",
[
"openai:responses:tools_web_search_01",
],
)
def test_web_search_non_streaming(openai_client, client_with_models, text_model_id, test_case):
tc = TestCase(test_case)
input = tc["input"]
expected = tc["expected"]
tools = tc["tools"]
response = openai_client.responses.create(
model=text_model_id,
input=input,
tools=tools,
stream=False,
)
assert len(response.output) > 1
assert response.output[0].type == "web_search_call"
assert response.output[0].status == "completed"
assert response.output[1].type == "message"
assert response.output[1].status == "completed"
assert response.output[1].role == "assistant"
assert len(response.output[1].content) > 0
assert expected.lower() in response.output_text.lower().strip()
def test_input_image_non_streaming(openai_client, vision_model_id):
supported_models = ["llama-4", "gpt-4o", "llama4"]
if not any(model in vision_model_id.lower() for model in supported_models):
pytest.skip(f"Skip for non-supported model: {vision_model_id}")
response = openai_client.with_options(max_retries=0).responses.create(
model=vision_model_id,
input=[
{
"role": "user",
"content": [
{
"type": "input_text",
"text": "Identify the type of animal in this image.",
},
{
"type": "input_image",
"image_url": "https://upload.wikimedia.org/wikipedia/commons/f/f7/Llamas%2C_Vernagt-Stausee%2C_Italy.jpg",
},
],
}
],
)
output_text = response.output_text.lower()
assert "llama" in output_text
def test_multi_turn_web_search_from_image_non_streaming(openai_client, vision_model_id):
supported_models = ["llama-4", "gpt-4o", "llama4"]
if not any(model in vision_model_id.lower() for model in supported_models):
pytest.skip(f"Skip for non-supported model: {vision_model_id}")
response = openai_client.with_options(max_retries=0).responses.create(
model=vision_model_id,
input=[
{
"role": "user",
"content": [
{
"type": "input_text",
"text": "Extract a single search keyword that represents the type of animal in this image.",
},
{
"type": "input_image",
"image_url": "https://upload.wikimedia.org/wikipedia/commons/f/f7/Llamas%2C_Vernagt-Stausee%2C_Italy.jpg",
},
],
}
],
)
output_text = response.output_text.lower()
assert "llama" in output_text
search_response = openai_client.with_options(max_retries=0).responses.create(
model=vision_model_id,
input="Search the web using the search tool for those keywords plus the words 'maverick' and 'scout' and summarize the results.",
previous_response_id=response.id,
tools=[{"type": "web_search"}],
)
output_text = search_response.output_text.lower()
assert "model" in output_text

View file

@ -13,3 +13,5 @@ test_exclusions:
- test_chat_non_streaming_image
- test_chat_streaming_image
- test_chat_multi_turn_multiple_images
- test_response_non_streaming_image
- test_response_non_streaming_multi_turn_image

View file

@ -13,3 +13,5 @@ test_exclusions:
- test_chat_non_streaming_image
- test_chat_streaming_image
- test_chat_multi_turn_multiple_images
- test_response_non_streaming_image
- test_response_non_streaming_multi_turn_image

View file

@ -13,3 +13,5 @@ test_exclusions:
- test_chat_non_streaming_image
- test_chat_streaming_image
- test_chat_multi_turn_multiple_images
- test_response_non_streaming_image
- test_response_non_streaming_multi_turn_image

View file

@ -16,7 +16,7 @@ Description:
Configuration:
- Provider details (models, display names) are loaded from `tests/verifications/config.yaml`.
- Provider details (models, display names) are loaded from `tests/verifications/conf/*.yaml`.
- Test cases are defined in YAML files within `tests/verifications/openai_api/fixtures/test_cases/`.
- Test results are stored in `tests/verifications/test_results/`.

View file

@ -0,0 +1,35 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
from tests.verifications.openai_api.fixtures.fixtures import _load_all_verification_configs
def pytest_generate_tests(metafunc):
"""Dynamically parametrize tests based on the selected provider and config."""
if "model" in metafunc.fixturenames:
provider = metafunc.config.getoption("provider")
if not provider:
print("Warning: --provider not specified. Skipping model parametrization.")
metafunc.parametrize("model", [])
return
try:
config_data = _load_all_verification_configs()
except (FileNotFoundError, IOError) as e:
print(f"ERROR loading verification configs: {e}")
config_data = {"providers": {}}
provider_config = config_data.get("providers", {}).get(provider)
if provider_config:
models = provider_config.get("models", [])
if models:
metafunc.parametrize("model", models)
else:
print(f"Warning: No models found for provider '{provider}' in config.")
metafunc.parametrize("model", []) # Parametrize empty if no models found
else:
print(f"Warning: Provider '{provider}' not found in config. No models parametrized.")
metafunc.parametrize("model", []) # Parametrize empty if provider not found

View file

@ -5,14 +5,16 @@
# the root directory of this source tree.
import os
import re
from pathlib import Path
import pytest
import yaml
from openai import OpenAI
# --- Helper Functions ---
# --- Helper Function to Load Config ---
def _load_all_verification_configs():
"""Load and aggregate verification configs from the conf/ directory."""
# Note: Path is relative to *this* file (fixtures.py)
@ -44,7 +46,30 @@ def _load_all_verification_configs():
return {"providers": all_provider_configs}
# --- End Helper Function ---
def case_id_generator(case):
"""Generate a test ID from the case's 'case_id' field, or use a default."""
case_id = case.get("case_id")
if isinstance(case_id, (str, int)):
return re.sub(r"\\W|^(?=\\d)", "_", str(case_id))
return None
def should_skip_test(verification_config, provider, model, test_name_base):
"""Check if a test should be skipped based on config exclusions."""
provider_config = verification_config.get("providers", {}).get(provider)
if not provider_config:
return False # No config for provider, don't skip
exclusions = provider_config.get("test_exclusions", {}).get(model, [])
return test_name_base in exclusions
# Helper to get the base test name from the request object
def get_base_test_name(request):
return request.node.originalname
# --- End Helper Functions ---
@pytest.fixture(scope="session")

View file

@ -0,0 +1,65 @@
test_response_basic:
test_name: test_response_basic
test_params:
case:
- case_id: "earth"
input: "Which planet do humans live on?"
output: "earth"
- case_id: "saturn"
input: "Which planet has rings around it with a name starting with letter S?"
output: "saturn"
test_response_multi_turn:
test_name: test_response_multi_turn
test_params:
case:
- case_id: "earth"
turns:
- input: "Which planet do humans live on?"
output: "earth"
- input: "What is the name of the planet from your previous response?"
output: "earth"
test_response_web_search:
test_name: test_response_web_search
test_params:
case:
- case_id: "llama_experts"
input: "How many experts does the Llama 4 Maverick model have?"
tools:
- type: web_search
search_context_size: "low"
output: "128"
test_response_image:
test_name: test_response_image
test_params:
case:
- case_id: "llama_image"
input:
- role: user
content:
- type: input_text
text: "Identify the type of animal in this image."
- type: input_image
image_url: "https://upload.wikimedia.org/wikipedia/commons/f/f7/Llamas%2C_Vernagt-Stausee%2C_Italy.jpg"
output: "llama"
test_response_multi_turn_image:
test_name: test_response_multi_turn_image
test_params:
case:
- case_id: "llama_image_search"
turns:
- input:
- role: user
content:
- type: input_text
text: "What type of animal is in this image? Please respond with a single word that starts with the letter 'L'."
- type: input_image
image_url: "https://upload.wikimedia.org/wikipedia/commons/f/f7/Llamas%2C_Vernagt-Stausee%2C_Italy.jpg"
output: "llama"
- input: "Search the web using the search tool for the animal from the previous response. Your search query should be a single phrase that includes the animal's name and the words 'maverick' and 'scout'."
tools:
- type: web_search
output: "model"

View file

@ -7,7 +7,6 @@
import base64
import copy
import json
import re
from pathlib import Path
from typing import Any
@ -16,7 +15,9 @@ from openai import APIError
from pydantic import BaseModel
from tests.verifications.openai_api.fixtures.fixtures import (
_load_all_verification_configs,
case_id_generator,
get_base_test_name,
should_skip_test,
)
from tests.verifications.openai_api.fixtures.load import load_test_cases
@ -25,57 +26,6 @@ chat_completion_test_cases = load_test_cases("chat_completion")
THIS_DIR = Path(__file__).parent
def case_id_generator(case):
"""Generate a test ID from the case's 'case_id' field, or use a default."""
case_id = case.get("case_id")
if isinstance(case_id, (str, int)):
return re.sub(r"\\W|^(?=\\d)", "_", str(case_id))
return None
def pytest_generate_tests(metafunc):
"""Dynamically parametrize tests based on the selected provider and config."""
if "model" in metafunc.fixturenames:
provider = metafunc.config.getoption("provider")
if not provider:
print("Warning: --provider not specified. Skipping model parametrization.")
metafunc.parametrize("model", [])
return
try:
config_data = _load_all_verification_configs()
except (FileNotFoundError, IOError) as e:
print(f"ERROR loading verification configs: {e}")
config_data = {"providers": {}}
provider_config = config_data.get("providers", {}).get(provider)
if provider_config:
models = provider_config.get("models", [])
if models:
metafunc.parametrize("model", models)
else:
print(f"Warning: No models found for provider '{provider}' in config.")
metafunc.parametrize("model", []) # Parametrize empty if no models found
else:
print(f"Warning: Provider '{provider}' not found in config. No models parametrized.")
metafunc.parametrize("model", []) # Parametrize empty if provider not found
def should_skip_test(verification_config, provider, model, test_name_base):
"""Check if a test should be skipped based on config exclusions."""
provider_config = verification_config.get("providers", {}).get(provider)
if not provider_config:
return False # No config for provider, don't skip
exclusions = provider_config.get("test_exclusions", {}).get(model, [])
return test_name_base in exclusions
# Helper to get the base test name from the request object
def get_base_test_name(request):
return request.node.originalname
@pytest.fixture
def multi_image_data():
files = [

View file

@ -0,0 +1,166 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
import pytest
from tests.verifications.openai_api.fixtures.fixtures import (
case_id_generator,
get_base_test_name,
should_skip_test,
)
from tests.verifications.openai_api.fixtures.load import load_test_cases
response_test_cases = load_test_cases("response")
@pytest.mark.parametrize(
"case",
response_test_cases["test_response_basic"]["test_params"]["case"],
ids=case_id_generator,
)
def test_response_non_streaming_basic(request, openai_client, model, provider, verification_config, case):
test_name_base = get_base_test_name(request)
if should_skip_test(verification_config, provider, model, test_name_base):
pytest.skip(f"Skipping {test_name_base} for model {model} on provider {provider} based on config.")
response = openai_client.responses.create(
model=model,
input=case["input"],
stream=False,
)
output_text = response.output_text.lower().strip()
assert len(output_text) > 0
assert case["output"].lower() in output_text
retrieved_response = openai_client.responses.retrieve(response_id=response.id)
assert retrieved_response.output_text == response.output_text
next_response = openai_client.responses.create(
model=model, input="Repeat your previous response in all caps.", previous_response_id=response.id
)
next_output_text = next_response.output_text.strip()
assert case["output"].upper() in next_output_text
@pytest.mark.parametrize(
"case",
response_test_cases["test_response_basic"]["test_params"]["case"],
ids=case_id_generator,
)
def test_response_streaming_basic(request, openai_client, model, provider, verification_config, case):
test_name_base = get_base_test_name(request)
if should_skip_test(verification_config, provider, model, test_name_base):
pytest.skip(f"Skipping {test_name_base} for model {model} on provider {provider} based on config.")
response = openai_client.responses.create(
model=model,
input=case["input"],
stream=True,
)
streamed_content = []
response_id = ""
for chunk in response:
if chunk.type == "response.completed":
response_id = chunk.response.id
streamed_content.append(chunk.response.output_text.strip())
assert len(streamed_content) > 0
assert case["output"].lower() in "".join(streamed_content).lower()
retrieved_response = openai_client.responses.retrieve(response_id=response_id)
assert retrieved_response.output_text == "".join(streamed_content)
@pytest.mark.parametrize(
"case",
response_test_cases["test_response_multi_turn"]["test_params"]["case"],
ids=case_id_generator,
)
def test_response_non_streaming_multi_turn(request, openai_client, model, provider, verification_config, case):
test_name_base = get_base_test_name(request)
if should_skip_test(verification_config, provider, model, test_name_base):
pytest.skip(f"Skipping {test_name_base} for model {model} on provider {provider} based on config.")
previous_response_id = None
for turn in case["turns"]:
response = openai_client.responses.create(
model=model,
input=turn["input"],
previous_response_id=previous_response_id,
tools=turn["tools"] if "tools" in turn else None,
)
previous_response_id = response.id
output_text = response.output_text.lower()
assert turn["output"].lower() in output_text
@pytest.mark.parametrize(
"case",
response_test_cases["test_response_web_search"]["test_params"]["case"],
ids=case_id_generator,
)
def test_response_non_streaming_web_search(request, openai_client, model, provider, verification_config, case):
test_name_base = get_base_test_name(request)
if should_skip_test(verification_config, provider, model, test_name_base):
pytest.skip(f"Skipping {test_name_base} for model {model} on provider {provider} based on config.")
response = openai_client.responses.create(
model=model,
input=case["input"],
tools=case["tools"],
stream=False,
)
assert len(response.output) > 1
assert response.output[0].type == "web_search_call"
assert response.output[0].status == "completed"
assert response.output[1].type == "message"
assert response.output[1].status == "completed"
assert response.output[1].role == "assistant"
assert len(response.output[1].content) > 0
assert case["output"].lower() in response.output_text.lower().strip()
@pytest.mark.parametrize(
"case",
response_test_cases["test_response_image"]["test_params"]["case"],
ids=case_id_generator,
)
def test_response_non_streaming_image(request, openai_client, model, provider, verification_config, case):
test_name_base = get_base_test_name(request)
if should_skip_test(verification_config, provider, model, test_name_base):
pytest.skip(f"Skipping {test_name_base} for model {model} on provider {provider} based on config.")
response = openai_client.responses.create(
model=model,
input=case["input"],
stream=False,
)
output_text = response.output_text.lower()
assert case["output"].lower() in output_text
@pytest.mark.parametrize(
"case",
response_test_cases["test_response_multi_turn_image"]["test_params"]["case"],
ids=case_id_generator,
)
def test_response_non_streaming_multi_turn_image(request, openai_client, model, provider, verification_config, case):
test_name_base = get_base_test_name(request)
if should_skip_test(verification_config, provider, model, test_name_base):
pytest.skip(f"Skipping {test_name_base} for model {model} on provider {provider} based on config.")
previous_response_id = None
for turn in case["turns"]:
response = openai_client.responses.create(
model=model,
input=turn["input"],
previous_response_id=previous_response_id,
tools=turn["tools"] if "tools" in turn else None,
)
previous_response_id = response.id
output_text = response.output_text.lower()
assert turn["output"].lower() in output_text