llama-stack-mirror/scripts/cleanup_recordings.py
Ashwin Bharambe 8c918a132e chore(ci): remove unused recordings (#4074)
Added a script to cleanup recordings. While doing this, moved the CI
matrix generation to a separate script so there is a single source of
truth for the matrix.

Ran the cleanup script as:
```
PYTHONPATH=. python scripts/cleanup_recordings.py
```

Also added this as part of the pre-commit workflow to ensure that the
recordings are always up to date and that no stale recordings are left
in the repo.
2025-11-12 12:13:15 -08:00

272 lines
9 KiB
Python
Executable file

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the terms described in the LICENSE file in
# the root directory of this source tree.
"""
Clean up unused test recordings based on CI test collection.
This script:
1. Reads CI matrix definitions from tests/integration/ci_matrix.json (default + scheduled overrides)
2. Uses pytest --collect-only with --json-report to gather all test IDs that run in CI
3. Compares against existing recordings to identify unused ones
4. Optionally deletes unused recordings
Usage:
# Dry run - see what would be deleted
./scripts/cleanup_recordings.py
# Save manifest of CI test IDs for inspection
./scripts/cleanup_recordings.py --manifest ci_tests.txt
# Actually delete unused recordings
./scripts/cleanup_recordings.py --delete
"""
import argparse
import json
import os
import subprocess
import tempfile
from collections import defaultdict
from pathlib import Path
REPO_ROOT = Path(__file__).parent.parent
# Load CI matrix from JSON file
CI_MATRIX_FILE = REPO_ROOT / "tests/integration/ci_matrix.json"
with open(CI_MATRIX_FILE) as f:
_matrix_config = json.load(f)
DEFAULT_CI_MATRIX: list[dict[str, str]] = _matrix_config["default"]
SCHEDULED_MATRICES: dict[str, list[dict[str, str]]] = _matrix_config.get("schedules", {})
def _unique_configs(entries):
seen: set[tuple[str, str]] = set()
for entry in entries:
suite = entry["suite"]
setup = entry["setup"]
key = (suite, setup)
if key in seen:
continue
seen.add(key)
yield {"suite": suite, "setup": setup}
def iter_all_ci_configs() -> list[dict[str, str]]:
"""Return unique CI configs across default and scheduled matrices."""
combined = list(DEFAULT_CI_MATRIX)
for configs in SCHEDULED_MATRICES.values():
combined.extend(configs)
return list(_unique_configs(combined))
def collect_ci_tests():
"""Collect all test IDs that would run in CI using --collect-only with JSON output."""
all_test_ids = set()
configs = iter_all_ci_configs()
for config in configs:
print(f"Collecting tests for suite={config['suite']}, setup={config['setup']}...")
# Create a temporary file for JSON report
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json_report_file = f.name
try:
# Configure environment for collection run
env = os.environ.copy()
env["PYTEST_ADDOPTS"] = f"--json-report --json-report-file={json_report_file}"
repo_path = str(REPO_ROOT)
existing_path = env.get("PYTHONPATH", "")
env["PYTHONPATH"] = f"{repo_path}{os.pathsep}{existing_path}" if existing_path else repo_path
result = subprocess.run(
[
"./scripts/integration-tests.sh",
"--collect-only",
"--suite",
config["suite"],
"--setup",
config["setup"],
],
capture_output=True,
text=True,
cwd=REPO_ROOT,
env=env,
)
if result.returncode != 0:
raise RuntimeError(
"Test collection failed.\n"
f"Command: {' '.join(result.args)}\n"
f"stdout:\n{result.stdout}\n"
f"stderr:\n{result.stderr}"
)
# Parse JSON report to extract test IDs
try:
with open(json_report_file) as f:
report = json.load(f)
# The "collectors" field contains collected test items
# Each collector has a "result" array with test node IDs
for collector in report.get("collectors", []):
for item in collector.get("result", []):
# The "nodeid" field is the test ID
if "nodeid" in item:
all_test_ids.add(item["nodeid"])
print(f" Collected {len(all_test_ids)} test IDs so far")
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f" Warning: Failed to parse JSON report: {e}")
continue
finally:
# Clean up temp file
if os.path.exists(json_report_file):
os.unlink(json_report_file)
print(f"\nTotal unique test IDs collected: {len(all_test_ids)}")
return all_test_ids, configs
def get_base_test_id(test_id: str) -> str:
"""Extract base test ID without parameterization.
Example:
'tests/integration/inference/test_foo.py::test_bar[param1-param2]'
-> 'tests/integration/inference/test_foo.py::test_bar'
"""
return test_id.split("[")[0] if "[" in test_id else test_id
def find_all_recordings():
"""Find all recording JSON files."""
return list((REPO_ROOT / "tests/integration").rglob("recordings/*.json"))
def analyze_recordings(ci_test_ids, dry_run=True):
"""Analyze recordings and identify unused ones."""
# Use full test IDs with parameterization for exact matching
all_recordings = find_all_recordings()
print(f"\nTotal recording files: {len(all_recordings)}")
# Categorize recordings
used_recordings = []
unused_recordings = []
shared_recordings = [] # model-list endpoints without test_id
parse_errors = []
for json_file in all_recordings:
try:
with open(json_file) as f:
data = json.load(f)
test_id = data.get("test_id", "")
if not test_id:
# Shared/infrastructure recordings (model lists, etc)
shared_recordings.append(json_file)
continue
# Match exact test_id (with full parameterization)
if test_id in ci_test_ids:
used_recordings.append(json_file)
else:
unused_recordings.append((json_file, test_id))
except Exception as e:
parse_errors.append((json_file, str(e)))
# Print summary
print("\nRecording Analysis:")
print(f" Used in CI: {len(used_recordings)}")
print(f" Shared (no ID): {len(shared_recordings)}")
print(f" UNUSED: {len(unused_recordings)}")
print(f" Parse errors: {len(parse_errors)}")
if unused_recordings:
print("\nUnused recordings by test:")
# Group by base test ID
by_test = defaultdict(list)
for file, test_id in unused_recordings:
base = get_base_test_id(test_id)
by_test[base].append(file)
for base_test, files in sorted(by_test.items()):
print(f"\n {base_test}")
print(f" ({len(files)} recording(s))")
for f in files[:3]:
print(f" - {f.relative_to(REPO_ROOT / 'tests/integration')}")
if len(files) > 3:
print(f" ... and {len(files) - 3} more")
if parse_errors:
print("\nParse errors:")
for file, error in parse_errors[:5]:
print(f" {file.relative_to(REPO_ROOT)}: {error}")
if len(parse_errors) > 5:
print(f" ... and {len(parse_errors) - 5} more")
# Perform cleanup
if not dry_run:
print(f"\nDeleting {len(unused_recordings)} unused recordings...")
for file, _ in unused_recordings:
file.unlink()
print(f" Deleted: {file.relative_to(REPO_ROOT / 'tests/integration')}")
print("✅ Cleanup complete")
else:
print("\n(Dry run - no files deleted)")
print("\nTo delete these files, run with --delete")
return len(unused_recordings)
def main():
parser = argparse.ArgumentParser(
description="Clean up unused test recordings based on CI test collection",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument("--delete", action="store_true", help="Actually delete unused recordings (default is dry-run)")
parser.add_argument("--manifest", help="Save collected test IDs to file (optional)")
args = parser.parse_args()
print("=" * 60)
print("Recording Cleanup Utility")
print("=" * 60)
ci_configs = iter_all_ci_configs()
print(f"\nDetected CI configurations: {len(ci_configs)}")
for config in ci_configs:
print(f" - suite={config['suite']}, setup={config['setup']}")
# Collect test IDs from CI configurations
ci_test_ids, _ = collect_ci_tests()
if args.manifest:
with open(args.manifest, "w") as f:
for test_id in sorted(ci_test_ids):
f.write(f"{test_id}\n")
print(f"\nSaved test IDs to: {args.manifest}")
# Analyze and cleanup
unused_count = analyze_recordings(ci_test_ids, dry_run=not args.delete)
print("\n" + "=" * 60)
if unused_count > 0 and not args.delete:
print("Run with --delete to remove unused recordings")
if __name__ == "__main__":
main()