llama-stack-mirror/tests/verifications/generate_report.py
ehhuang 14146e4b3f
feat(verification): various improvements (#1921)
# What does this PR do?
- provider and their models now live in config.yaml
- better distinguish different cases within a test
- add model key to surface provider's model_id
- include example command to rerun single test case

## Test Plan
<img width="1173" alt="image"
src="https://github.com/user-attachments/assets/b414baf0-c768-451f-8c3b-c2905cf36fac"
/>
2025-04-10 10:26:19 -07:00

540 lines
22 KiB
Python
Executable file

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "pytest-json-report",
# "pyyaml",
# ]
# ///
"""
Test Report Generator
Description:
This script runs pytest tests (specifically designed for OpenAI API compatibility checks)
for different providers, aggregates the results from JSON reports, and generates
a markdown summary report (REPORT.md).
It automatically cleans up old test result files, keeping only the latest
per provider.
Configuration:
- Provider details (models, display names) are loaded from `tests/verifications/config.yaml`.
- Test cases are defined in YAML files within `tests/verifications/openai_api/fixtures/test_cases/`.
- Test results are stored in `tests/verifications/test_results/`.
Usage:
# Generate a report using the latest existing test results
python tests/verifications/generate_report.py
# Run tests for all configured providers and generate a report
python tests/verifications/generate_report.py --run-tests
# Run tests only for specific providers (space-separated)
python tests/verifications/generate_report.py --run-tests --providers fireworks openai
# Run tests matching a keyword expression (uses pytest -k)
python tests/verifications/generate_report.py --run-tests --providers fireworks --k "streaming"
# Run a specific test case for a provider
python tests/verifications/generate_report.py --run-tests --providers fireworks --k "test_chat_streaming_basic and basic_earth"
# Save the report to a custom location
python tests/verifications/generate_report.py --output custom_report.md
"""
import argparse
import json
import os
import re
import subprocess
import time
from collections import defaultdict
from pathlib import Path
from typing import Any, DefaultDict, Dict, Set, Tuple
from tests.verifications.openai_api.fixtures.fixtures import _load_all_verification_configs
# Define the root directory for test results
RESULTS_DIR = Path(__file__).parent / "test_results"
RESULTS_DIR.mkdir(exist_ok=True)
# Maximum number of test result files to keep per provider
MAX_RESULTS_PER_PROVIDER = 1
PROVIDER_ORDER = ["together", "fireworks", "groq", "cerebras", "openai"]
VERIFICATION_CONFIG = _load_all_verification_configs()
def run_tests(provider, keyword=None):
"""Run pytest for a specific provider and save results"""
print(f"Running tests for provider: {provider}")
timestamp = int(time.time())
result_file = RESULTS_DIR / f"{provider}_{timestamp}.json"
temp_json_file = RESULTS_DIR / f"temp_{provider}_{timestamp}.json"
# Determine project root directory relative to this script
project_root = Path(__file__).parent.parent.parent
# Run pytest with JSON output
cmd = [
"python",
"-m",
"pytest",
"tests/verifications/openai_api/test_chat_completion.py",
f"--provider={provider}",
"-v",
"--json-report",
f"--json-report-file={temp_json_file}",
]
# Append -k argument if provided
if keyword:
cmd.extend(["-k", keyword])
try:
# Run subprocess with cwd set to project root
result = subprocess.run(cmd, capture_output=True, text=True, cwd=project_root)
print(f"Pytest exit code: {result.returncode}")
# Check if the JSON file was created
if temp_json_file.exists():
# Read the JSON file and save it to our results format
with open(temp_json_file, "r") as f:
test_results = json.load(f)
# Save results to our own format with a trailing newline
with open(result_file, "w") as f:
json.dump(test_results, f, indent=2)
f.write("\n") # Add a trailing newline for precommit
# Clean up temp file
temp_json_file.unlink()
print(f"Test results saved to {result_file}")
return result_file
else:
print(f"Error: JSON report file not created for {provider}")
print(f"Command stdout: {result.stdout}")
print(f"Command stderr: {result.stderr}")
return None
except Exception as e:
print(f"Error running tests for {provider}: {e}")
return None
def parse_results(
result_file,
) -> Tuple[DefaultDict[str, DefaultDict[str, Dict[str, bool]]], DefaultDict[str, Set[str]], Set[str]]:
"""Parse a single test results file.
Returns:
Tuple containing:
- parsed_results: DefaultDict[provider, DefaultDict[model, Dict[test_name, pass_status]]]
- providers_in_file: DefaultDict[provider, Set[model]] found in this file.
- tests_in_file: Set[test_name] found in this file.
"""
if not os.path.exists(result_file):
print(f"Results file does not exist: {result_file}")
# Return empty defaultdicts/set matching the type hint
return defaultdict(lambda: defaultdict(dict)), defaultdict(set), set()
with open(result_file, "r") as f:
results = json.load(f)
# Initialize results dictionary with specific types
parsed_results: DefaultDict[str, DefaultDict[str, Dict[str, bool]]] = defaultdict(lambda: defaultdict(dict))
providers_in_file: DefaultDict[str, Set[str]] = defaultdict(set)
tests_in_file: Set[str] = set()
provider: str = os.path.basename(result_file).split("_")[0]
# Debug: Print summary of test results
print(f"Test results summary for {provider}:")
print(f"Total tests: {results.get('summary', {}).get('total', 0)}")
print(f"Passed: {results.get('summary', {}).get('passed', 0)}")
print(f"Failed: {results.get('summary', {}).get('failed', 0)}")
print(f"Error: {results.get('summary', {}).get('error', 0)}")
print(f"Skipped: {results.get('summary', {}).get('skipped', 0)}")
# Extract test results
if "tests" not in results or not results["tests"]:
print(f"No test results found in {result_file}")
# Return empty defaultdicts/set matching the type hint
return defaultdict(lambda: defaultdict(dict)), defaultdict(set), set()
# Process the tests
for test in results["tests"]:
test_id = test.get("nodeid", "")
if not (call_phase := test.get("call")):
continue
call_outcome = call_phase.get("outcome")
if call_outcome not in ("passed", "failed"):
continue
# --- Extract data from metadata ---
metadata = test.get("metadata", {})
model = metadata.get("model")
case_id = metadata.get("case_id") # String ID (if provided)
case_index = metadata.get("case_index") # Integer index (if no ID provided)
# Check if we have a model and at least one case identifier
if not model or (case_id is None and case_index is None):
print(
f"Warning: Missing 'model' or case identifier ('case_id'/'case_index') metadata for test: {test_id}. Skipping."
)
continue
try:
test_name_base = test_id.split("::")[1].split("[")[0]
except (IndexError, ValueError) as e:
print(f"Warning: Could not parse base test name for {test_id}. Error: {e}. Skipping.")
continue
# Construct detailed test name using ID or index
if case_id is not None:
detailed_test_name = f"{test_name_base} ({case_id})"
elif case_index == 0:
# If case_id is missing and index is 0, assume single case, use base name only
detailed_test_name = test_name_base
elif case_index is not None: # case_index > 0
# Use case_index for naming if case_id wasn't provided and index > 0
detailed_test_name = f"{test_name_base} (case{case_index})"
else:
# This case should be prevented by the earlier check, but handle defensively
print(f"Error: No case identifier found for test {test_id} after initial check. Skipping.")
continue
# Populate collections for this file
tests_in_file.add(detailed_test_name)
providers_in_file[provider].add(model)
if call_outcome == "passed":
parsed_results[provider][model][detailed_test_name] = True
elif call_outcome == "failed":
parsed_results[provider][model][detailed_test_name] = False
# Final Summary Warning (Optional)
if not parsed_results.get(provider):
print(f"Warning: No valid test results parsed for provider {provider} from file {result_file}")
return parsed_results, providers_in_file, tests_in_file
def cleanup_old_results(providers_to_clean: Dict[str, Set[str]]):
"""Clean up old test result files, keeping only the newest N per provider."""
# Use the passed-in providers dictionary
for provider in providers_to_clean.keys():
# Get all result files for this provider
provider_files = list(RESULTS_DIR.glob(f"{provider}_*.json"))
# Sort by timestamp (newest first)
provider_files.sort(key=lambda x: int(x.stem.split("_")[1]), reverse=True)
# Remove old files beyond the max to keep
if len(provider_files) > MAX_RESULTS_PER_PROVIDER:
for old_file in provider_files[MAX_RESULTS_PER_PROVIDER:]:
try:
old_file.unlink()
print(f"Removed old result file: {old_file}")
except Exception as e:
print(f"Error removing file {old_file}: {e}")
def get_latest_results_by_provider():
"""Get the latest test result file for each provider"""
provider_results = {}
# Get all result files
result_files = list(RESULTS_DIR.glob("*.json"))
# Extract all provider names from filenames
all_providers = set()
for file in result_files:
# File format is provider_timestamp.json
parts = file.stem.split("_")
if len(parts) >= 2:
all_providers.add(parts[0])
# Group by provider
for provider in all_providers:
provider_files = [f for f in result_files if f.name.startswith(f"{provider}_")]
# Sort by timestamp (newest first)
provider_files.sort(key=lambda x: int(x.stem.split("_")[1]), reverse=True)
if provider_files:
provider_results[provider] = provider_files[0]
return provider_results
def generate_report(
results_dict: Dict[str, Any], providers: Dict[str, Set[str]], all_tests: Set[str], output_file=None
):
"""Generate the markdown report.
Args:
results_dict: Aggregated results [provider][model][test_name] -> status.
providers: Dict of all providers and their models {provider: {models}}.
all_tests: Set of all test names found.
output_file: Optional path to save the report.
"""
if output_file is None:
# Default to creating the report in the same directory as this script
output_file = Path(__file__).parent / "REPORT.md"
else:
output_file = Path(output_file)
# Get the timestamp from result files
provider_timestamps = {}
provider_results_files = get_latest_results_by_provider()
for provider, result_file in provider_results_files.items():
# Extract timestamp from filename (format: provider_timestamp.json)
try:
timestamp_str = result_file.stem.split("_")[1]
timestamp = int(timestamp_str)
formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp))
provider_timestamps[provider] = formatted_time
except (IndexError, ValueError):
provider_timestamps[provider] = "Unknown"
# Convert provider model sets to sorted lists (use passed-in providers dict)
providers_sorted = {prov: sorted(models) for prov, models in providers.items()}
# Sort tests alphabetically (use passed-in all_tests set)
sorted_tests = sorted(all_tests)
# Calculate counts for each base test name
base_test_case_counts: DefaultDict[str, int] = defaultdict(int)
base_test_name_map: Dict[str, str] = {}
for test_name in sorted_tests:
match = re.match(r"^(.*?)( \([^)]+\))?$", test_name)
if match:
base_name = match.group(1).strip()
base_test_case_counts[base_name] += 1
base_test_name_map[test_name] = base_name
else:
# Should not happen with current naming, but handle defensively
base_test_case_counts[test_name] += 1
base_test_name_map[test_name] = test_name
if not sorted_tests:
print("Warning: No test results found to generate a report.")
# Optionally create an empty report or return early
with open(output_file, "w") as f:
f.write("# Test Results Report\n\nNo test results found.\n")
print(f"Generated empty report: {output_file}")
return
report = ["# Test Results Report\n"]
report.append(f"*Generated on: {time.strftime('%Y-%m-%d %H:%M:%S')}*\n")
report.append("*This report was generated by running `python tests/verifications/generate_report.py`*\n")
# Icons for pass/fail
pass_icon = ""
fail_icon = ""
na_icon = ""
# Add emoji legend
report.append("## Legend\n")
report.append(f"- {pass_icon} - Test passed")
report.append(f"- {fail_icon} - Test failed")
report.append(f"- {na_icon} - Test not applicable or not run for this model")
report.append("\n")
# Add a summary section
report.append("## Summary\n")
# Count total tests and passes (use passed-in providers and all_tests)
total_tests = 0
passed_tests = 0
provider_totals = {}
for provider, models in providers_sorted.items():
provider_passed = 0
provider_total = 0
if provider in results_dict:
for model in models:
if model in results_dict[provider]:
model_results = results_dict[provider][model]
for test in sorted_tests:
if test in model_results:
provider_total += 1
total_tests += 1
if model_results[test]:
provider_passed += 1
passed_tests += 1
provider_totals[provider] = (provider_passed, provider_total)
# Add summary table (use passed-in providers dict)
report.append("| Provider | Pass Rate | Tests Passed | Total Tests |")
report.append("| --- | --- | --- | --- |")
for provider in [p for p in PROVIDER_ORDER if p in providers]: # Check against keys of passed-in dict
passed, total = provider_totals.get(provider, (0, 0))
pass_rate = f"{(passed / total * 100):.1f}%" if total > 0 else "N/A"
report.append(f"| {provider.capitalize()} | {pass_rate} | {passed} | {total} |")
for provider in [p for p in providers if p not in PROVIDER_ORDER]: # Check against keys of passed-in dict
passed, total = provider_totals.get(provider, (0, 0))
pass_rate = f"{(passed / total * 100):.1f}%" if total > 0 else "N/A"
report.append(f"| {provider.capitalize()} | {pass_rate} | {passed} | {total} |")
report.append("\n")
for provider in sorted(
providers_sorted.keys(), key=lambda p: (PROVIDER_ORDER.index(p) if p in PROVIDER_ORDER else float("inf"), p)
):
provider_models = providers_sorted[provider] # Use sorted models
if not provider_models:
continue
report.append(f"\n## {provider.capitalize()}\n")
# Add timestamp when test was run
if provider in provider_timestamps:
report.append(f"*Tests run on: {provider_timestamps[provider]}*\n")
# Add test command for reproducing results
test_cmd_all = f"pytest tests/verifications/openai_api/test_chat_completion.py --provider={provider} -v"
report.append(f"```bash\n# Run all tests for this provider:\n{test_cmd_all}\n")
# Find an example test with a case ID
example_base_test_name = None
example_case_id = None
# Get first test as fallback base, handle empty list
first_test_name = sorted_tests[0] if sorted_tests else "unknown_test"
match = re.match(r"^(.*?) \((.*?)\)$", first_test_name)
if match:
example_base_test_name = match.group(1).strip()
example_case_id = match.group(2).strip()
else:
example_base_test_name = first_test_name
base_name = base_test_name_map.get(test, test) # Get base name
case_count = base_test_case_counts.get(base_name, 1) # Get count
filter_str = f"{example_base_test_name} and {example_case_id}" if case_count > 1 else example_base_test_name
test_cmd_specific_case = (
f'pytest tests/verifications/openai_api/test_chat_completion.py --provider={provider} -k "{filter_str}"'
)
report.append(
f"# Example: Run only the '{example_case_id}' case of {example_base_test_name}:\n{test_cmd_specific_case}\n```\n"
)
# Get display names (use passed-in providers dict)
provider_config = VERIFICATION_CONFIG.get("providers", {}).get(provider, {})
display_name_map = provider_config.get("model_display_names", {})
# Add Model Key Table (use provider_models)
report.append(f"\n**Model Key ({provider.capitalize()})**\n")
provider_key_lines = ["| Display Name | Full Model ID |", "| --- | --- |"]
for model_id in provider_models:
display_name = display_name_map.get(model_id, model_id)
provider_key_lines.append(f"| {display_name} | `{model_id}` |")
report.extend(provider_key_lines)
report.append("\n")
# Create results table header (use provider_models)
display_names = [display_name_map.get(m, m) for m in provider_models]
header = "| Test | " + " | ".join(display_names) + " |"
separator = "| --- | " + " | ".join(["---"] * len(provider_models)) + " |"
report.append(header)
report.append(separator)
# Get results for this provider from results_dict
provider_results_data = results_dict.get(provider, {})
# Add rows for each test (use sorted_tests)
for test in sorted_tests:
# Determine display name based on case count
base_name = base_test_name_map.get(test, test) # Get base name
case_count = base_test_case_counts.get(base_name, 1) # Get count
display_test_name = base_name if case_count == 1 else test # Choose display name
row = f"| {display_test_name} |" # Use display name
for model_id in provider_models:
if model_id in provider_results_data and test in provider_results_data[model_id]:
result = pass_icon if provider_results_data[model_id][test] else fail_icon
else:
result = na_icon
row += f" {result} |"
report.append(row)
# Write to file
with open(output_file, "w") as f:
f.write("\n".join(report))
f.write("\n")
print(f"Report generated: {output_file}")
def main():
parser = argparse.ArgumentParser(description="Generate test report")
parser.add_argument("--run-tests", action="store_true", help="Run tests before generating report")
parser.add_argument(
"--providers",
type=str,
nargs="+",
help="Specify providers to test (comma-separated or space-separated, default: all)",
)
parser.add_argument("--output", type=str, help="Output file location (default: tests/verifications/REPORT.md)")
parser.add_argument("--k", type=str, help="Keyword expression to filter tests (passed to pytest -k)")
args = parser.parse_args()
all_results = {}
# Initialize collections to aggregate results in main
aggregated_providers = defaultdict(set)
aggregated_tests = set()
if args.run_tests:
# Get list of available providers from command line or use detected providers
if args.providers:
# Handle both comma-separated and space-separated lists
test_providers = []
for provider_arg in args.providers:
# Split by comma if commas are present
if "," in provider_arg:
test_providers.extend(provider_arg.split(","))
else:
test_providers.append(provider_arg)
else:
# Default providers to test
test_providers = PROVIDER_ORDER
for provider in test_providers:
provider = provider.strip() # Remove any whitespace
result_file = run_tests(provider, keyword=args.k)
if result_file:
# Parse and aggregate results
parsed_results, providers_in_file, tests_in_file = parse_results(result_file)
all_results.update(parsed_results)
for prov, models in providers_in_file.items():
aggregated_providers[prov].update(models)
aggregated_tests.update(tests_in_file)
else:
# Use existing results
provider_result_files = get_latest_results_by_provider()
for result_file in provider_result_files.values():
# Parse and aggregate results
parsed_results, providers_in_file, tests_in_file = parse_results(result_file)
all_results.update(parsed_results)
for prov, models in providers_in_file.items():
aggregated_providers[prov].update(models)
aggregated_tests.update(tests_in_file)
# Generate the report, passing aggregated data
generate_report(all_results, aggregated_providers, aggregated_tests, args.output)
# Cleanup, passing aggregated providers
cleanup_old_results(aggregated_providers)
if __name__ == "__main__":
main()