test(verification): overwrite test result instead of creating new ones (#1934)

# What does this PR do?


## Test Plan
(myenv) ➜ llama-stack python tests/verifications/generate_report.py
--providers fireworks,together,openai --run-tests
This commit is contained in:
ehhuang 2025-04-10 16:59:28 -07:00 committed by GitHub
parent a4cc4b7e31
commit 2fcb70b789
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 926 additions and 580 deletions

View file

@ -77,8 +77,9 @@ def run_tests(provider, keyword=None):
print(f"Running tests for provider: {provider}")
timestamp = int(time.time())
result_file = RESULTS_DIR / f"{provider}_{timestamp}.json"
temp_json_file = RESULTS_DIR / f"temp_{provider}_{timestamp}.json"
# Use a constant filename for the final result and temp file
result_file = RESULTS_DIR / f"{provider}.json"
temp_json_file = RESULTS_DIR / f"temp_{provider}.json"
# Determine project root directory relative to this script
project_root = Path(__file__).parent.parent.parent
@ -106,11 +107,12 @@ def run_tests(provider, keyword=None):
# Check if the JSON file was created
if temp_json_file.exists():
# Read the JSON file and save it to our results format
with open(temp_json_file, "r") as f:
test_results = json.load(f)
# Save results to our own format with a trailing newline
test_results["run_timestamp"] = timestamp
# Save results to the final (overwritten) file
with open(result_file, "w") as f:
json.dump(test_results, f, indent=2)
f.write("\n") # Add a trailing newline for precommit
@ -132,7 +134,7 @@ def run_tests(provider, keyword=None):
def parse_results(
result_file,
) -> Tuple[DefaultDict[str, DefaultDict[str, Dict[str, bool]]], DefaultDict[str, Set[str]], Set[str]]:
) -> Tuple[DefaultDict[str, DefaultDict[str, Dict[str, bool]]], DefaultDict[str, Set[str]], Set[str], str]:
"""Parse a single test results file.
Returns:
@ -140,11 +142,12 @@ def parse_results(
- parsed_results: DefaultDict[provider, DefaultDict[model, Dict[test_name, pass_status]]]
- providers_in_file: DefaultDict[provider, Set[model]] found in this file.
- tests_in_file: Set[test_name] found in this file.
- run_timestamp: Timestamp when the test was run
"""
if not os.path.exists(result_file):
print(f"Results file does not exist: {result_file}")
# Return empty defaultdicts/set matching the type hint
return defaultdict(lambda: defaultdict(dict)), defaultdict(set), set()
return defaultdict(lambda: defaultdict(dict)), defaultdict(set), set(), ""
with open(result_file, "r") as f:
results = json.load(f)
@ -153,7 +156,16 @@ def parse_results(
parsed_results: DefaultDict[str, DefaultDict[str, Dict[str, bool]]] = defaultdict(lambda: defaultdict(dict))
providers_in_file: DefaultDict[str, Set[str]] = defaultdict(set)
tests_in_file: Set[str] = set()
provider: str = os.path.basename(result_file).split("_")[0]
# Extract provider from filename (e.g., "openai.json" -> "openai")
provider: str = result_file.stem
# Extract run timestamp from the JSON data
run_timestamp_unix = results.get("run_timestamp")
run_timestamp_str = (
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(run_timestamp_unix))
if run_timestamp_unix is not None
else "Unknown"
)
# Debug: Print summary of test results
print(f"Test results summary for {provider}:")
@ -167,7 +179,7 @@ def parse_results(
if "tests" not in results or not results["tests"]:
print(f"No test results found in {result_file}")
# Return empty defaultdicts/set matching the type hint
return defaultdict(lambda: defaultdict(dict)), defaultdict(set), set()
return defaultdict(lambda: defaultdict(dict)), defaultdict(set), set(), ""
# Process the tests
for test in results["tests"]:
@ -225,59 +237,29 @@ def parse_results(
if not parsed_results.get(provider):
print(f"Warning: No valid test results parsed for provider {provider} from file {result_file}")
return parsed_results, providers_in_file, tests_in_file
return parsed_results, providers_in_file, tests_in_file, run_timestamp_str
def cleanup_old_results(providers_to_clean: Dict[str, Set[str]]):
"""Clean up old test result files, keeping only the newest N per provider."""
# Use the passed-in providers dictionary
for provider in providers_to_clean.keys():
# Get all result files for this provider
provider_files = list(RESULTS_DIR.glob(f"{provider}_*.json"))
# Sort by timestamp (newest first)
provider_files.sort(key=lambda x: int(x.stem.split("_")[1]), reverse=True)
# Remove old files beyond the max to keep
if len(provider_files) > MAX_RESULTS_PER_PROVIDER:
for old_file in provider_files[MAX_RESULTS_PER_PROVIDER:]:
try:
old_file.unlink()
print(f"Removed old result file: {old_file}")
except Exception as e:
print(f"Error removing file {old_file}: {e}")
def get_latest_results_by_provider():
"""Get the latest test result file for each provider"""
def get_all_result_files_by_provider():
"""Get all test result files, keyed by provider."""
provider_results = {}
# Get all result files
result_files = list(RESULTS_DIR.glob("*.json"))
# Extract all provider names from filenames
all_providers = set()
for file in result_files:
# File format is provider_timestamp.json
parts = file.stem.split("_")
if len(parts) >= 2:
all_providers.add(parts[0])
# Group by provider
for provider in all_providers:
provider_files = [f for f in result_files if f.name.startswith(f"{provider}_")]
# Sort by timestamp (newest first)
provider_files.sort(key=lambda x: int(x.stem.split("_")[1]), reverse=True)
if provider_files:
provider_results[provider] = provider_files[0]
provider = file.stem
if provider:
provider_results[provider] = file
return provider_results
def generate_report(
results_dict: Dict[str, Any], providers: Dict[str, Set[str]], all_tests: Set[str], output_file=None
results_dict: Dict[str, Any],
providers: Dict[str, Set[str]],
all_tests: Set[str],
provider_timestamps: Dict[str, str],
output_file=None,
):
"""Generate the markdown report.
@ -285,6 +267,7 @@ def generate_report(
results_dict: Aggregated results [provider][model][test_name] -> status.
providers: Dict of all providers and their models {provider: {models}}.
all_tests: Set of all test names found.
provider_timestamps: Dict of provider to timestamp when tests were run
output_file: Optional path to save the report.
"""
if output_file is None:
@ -293,19 +276,6 @@ def generate_report(
else:
output_file = Path(output_file)
# Get the timestamp from result files
provider_timestamps = {}
provider_results_files = get_latest_results_by_provider()
for provider, result_file in provider_results_files.items():
# Extract timestamp from filename (format: provider_timestamp.json)
try:
timestamp_str = result_file.stem.split("_")[1]
timestamp = int(timestamp_str)
formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp))
provider_timestamps[provider] = formatted_time
except (IndexError, ValueError):
provider_timestamps[provider] = "Unknown"
# Convert provider model sets to sorted lists (use passed-in providers dict)
providers_sorted = {prov: sorted(models) for prov, models in providers.items()}
@ -416,7 +386,7 @@ def generate_report(
else:
example_base_test_name = first_test_name
base_name = base_test_name_map.get(test, test) # Get base name
base_name = base_test_name_map.get(first_test_name, first_test_name) # Get base name
case_count = base_test_case_counts.get(base_name, 1) # Get count
filter_str = f"{example_base_test_name} and {example_case_id}" if case_count > 1 else example_base_test_name
@ -491,6 +461,7 @@ def main():
# Initialize collections to aggregate results in main
aggregated_providers = defaultdict(set)
aggregated_tests = set()
provider_timestamps = {}
if args.run_tests:
# Get list of available providers from command line or use detected providers
@ -512,28 +483,28 @@ def main():
result_file = run_tests(provider, keyword=args.k)
if result_file:
# Parse and aggregate results
parsed_results, providers_in_file, tests_in_file = parse_results(result_file)
parsed_results, providers_in_file, tests_in_file, run_timestamp = parse_results(result_file)
all_results.update(parsed_results)
for prov, models in providers_in_file.items():
aggregated_providers[prov].update(models)
if run_timestamp:
provider_timestamps[prov] = run_timestamp
aggregated_tests.update(tests_in_file)
else:
# Use existing results
provider_result_files = get_latest_results_by_provider()
provider_result_files = get_all_result_files_by_provider()
for result_file in provider_result_files.values():
# Parse and aggregate results
parsed_results, providers_in_file, tests_in_file = parse_results(result_file)
parsed_results, providers_in_file, tests_in_file, run_timestamp = parse_results(result_file)
all_results.update(parsed_results)
for prov, models in providers_in_file.items():
aggregated_providers[prov].update(models)
if run_timestamp:
provider_timestamps[prov] = run_timestamp
aggregated_tests.update(tests_in_file)
# Generate the report, passing aggregated data
generate_report(all_results, aggregated_providers, aggregated_tests, args.output)
# Cleanup, passing aggregated providers
cleanup_old_results(aggregated_providers)
generate_report(all_results, aggregated_providers, aggregated_tests, provider_timestamps, args.output)
if __name__ == "__main__":