#!/usr/bin/env python3 # Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the terms described in the LICENSE file in # the root directory of this source tree. """ Clean up unused test recordings based on CI test collection. This script: 1. Resolves the CI test matrix by combining the default CI_MATRIX with scheduled overrides 2. Uses pytest --collect-only with --json-report to gather all test IDs that run in CI 3. Compares against existing recordings to identify unused ones 4. Optionally deletes unused recordings Usage: # Dry run - see what would be deleted ./scripts/cleanup_recordings.py # Save manifest of CI test IDs for inspection ./scripts/cleanup_recordings.py --manifest ci_tests.txt # Actually delete unused recordings ./scripts/cleanup_recordings.py --delete """ import argparse import json import os import subprocess import tempfile from collections import defaultdict from pathlib import Path REPO_ROOT = Path(__file__).parent.parent from tests.integration.suites import CI_MATRIX # noqa: E402 # Additional scheduled CI configurations (keep in sync with scripts/generate_ci_matrix.py) ADDITIONAL_CI_CONFIGS = [ {"suite": "base", "setup": "vllm"}, # Weekly vLLM coverage ] def load_ci_configs() -> list[dict[str, str]]: """Return all (suite, setup) combinations exercised in CI.""" configs: list[dict[str, str]] = [] seen: set[tuple[str, str]] = set() def add(entry: dict[str, str]) -> None: suite = entry.get("suite") setup = entry.get("setup") if not suite or not setup: raise RuntimeError(f"Invalid CI matrix entry: {entry}") key = (suite, setup) if key in seen: return seen.add(key) configs.append({"suite": suite, "setup": setup}) for entry in CI_MATRIX: add(entry) for entry in ADDITIONAL_CI_CONFIGS: add(entry) return configs def collect_ci_tests(ci_configs: list[dict[str, str]]): """Collect all test IDs that would run in CI using --collect-only with JSON output.""" all_test_ids = set() for config in ci_configs: print(f"Collecting tests for suite={config['suite']}, setup={config['setup']}...") # Create a temporary file for JSON report with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: json_report_file = f.name try: # Configure environment for collection run env = os.environ.copy() env["PYTEST_ADDOPTS"] = f"--json-report --json-report-file={json_report_file}" repo_path = str(REPO_ROOT) existing_path = env.get("PYTHONPATH", "") env["PYTHONPATH"] = f"{repo_path}{os.pathsep}{existing_path}" if existing_path else repo_path result = subprocess.run( [ "./scripts/integration-tests.sh", "--collect-only", "--suite", config["suite"], "--setup", config["setup"], ], capture_output=True, text=True, cwd=REPO_ROOT, env=env, ) if result.returncode != 0: raise RuntimeError( "Test collection failed.\n" f"Command: {' '.join(result.args)}\n" f"stdout:\n{result.stdout}\n" f"stderr:\n{result.stderr}" ) # Parse JSON report to extract test IDs try: with open(json_report_file) as f: report = json.load(f) # The "collectors" field contains collected test items # Each collector has a "result" array with test node IDs for collector in report.get("collectors", []): for item in collector.get("result", []): # The "nodeid" field is the test ID if "nodeid" in item: all_test_ids.add(item["nodeid"]) print(f" Collected {len(all_test_ids)} test IDs so far") except (json.JSONDecodeError, FileNotFoundError) as e: print(f" Warning: Failed to parse JSON report: {e}") continue finally: # Clean up temp file if os.path.exists(json_report_file): os.unlink(json_report_file) print(f"\nTotal unique test IDs collected: {len(all_test_ids)}") return all_test_ids def get_base_test_id(test_id: str) -> str: """Extract base test ID without parameterization. Example: 'tests/integration/inference/test_foo.py::test_bar[param1-param2]' -> 'tests/integration/inference/test_foo.py::test_bar' """ return test_id.split("[")[0] if "[" in test_id else test_id def find_all_recordings(): """Find all recording JSON files.""" return list((REPO_ROOT / "tests/integration").rglob("recordings/*.json")) def analyze_recordings(ci_test_ids, dry_run=True): """Analyze recordings and identify unused ones.""" # Use full test IDs with parameterization for exact matching all_recordings = find_all_recordings() print(f"\nTotal recording files: {len(all_recordings)}") # Categorize recordings used_recordings = [] unused_recordings = [] shared_recordings = [] # model-list endpoints without test_id parse_errors = [] for json_file in all_recordings: try: with open(json_file) as f: data = json.load(f) test_id = data.get("test_id", "") if not test_id: # Shared/infrastructure recordings (model lists, etc) shared_recordings.append(json_file) continue # Match exact test_id (with full parameterization) if test_id in ci_test_ids: used_recordings.append(json_file) else: unused_recordings.append((json_file, test_id)) except Exception as e: parse_errors.append((json_file, str(e))) # Print summary print("\nRecording Analysis:") print(f" Used in CI: {len(used_recordings)}") print(f" Shared (no ID): {len(shared_recordings)}") print(f" UNUSED: {len(unused_recordings)}") print(f" Parse errors: {len(parse_errors)}") if unused_recordings: print("\nUnused recordings by test:") # Group by base test ID by_test = defaultdict(list) for file, test_id in unused_recordings: base = get_base_test_id(test_id) by_test[base].append(file) for base_test, files in sorted(by_test.items()): print(f"\n {base_test}") print(f" ({len(files)} recording(s))") for f in files[:3]: print(f" - {f.relative_to(REPO_ROOT / 'tests/integration')}") if len(files) > 3: print(f" ... and {len(files) - 3} more") if parse_errors: print("\nParse errors:") for file, error in parse_errors[:5]: print(f" {file.relative_to(REPO_ROOT)}: {error}") if len(parse_errors) > 5: print(f" ... and {len(parse_errors) - 5} more") # Perform cleanup if not dry_run: print(f"\nDeleting {len(unused_recordings)} unused recordings...") for file, _ in unused_recordings: file.unlink() print(f" Deleted: {file.relative_to(REPO_ROOT / 'tests/integration')}") print("✅ Cleanup complete") else: print("\n(Dry run - no files deleted)") print("\nTo delete these files, run with --delete") return len(unused_recordings) def main(): parser = argparse.ArgumentParser( description="Clean up unused test recordings based on CI test collection", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) parser.add_argument("--delete", action="store_true", help="Actually delete unused recordings (default is dry-run)") parser.add_argument("--manifest", help="Save collected test IDs to file (optional)") args = parser.parse_args() print("=" * 60) print("Recording Cleanup Utility") print("=" * 60) ci_configs = load_ci_configs() print(f"\nDetected CI configurations: {len(ci_configs)}") for config in ci_configs: print(f" - suite={config['suite']}, setup={config['setup']}") # Collect test IDs from CI configurations ci_test_ids = collect_ci_tests(ci_configs) if args.manifest: with open(args.manifest, "w") as f: for test_id in sorted(ci_test_ids): f.write(f"{test_id}\n") print(f"\nSaved test IDs to: {args.manifest}") # Analyze and cleanup unused_count = analyze_recordings(ci_test_ids, dry_run=not args.delete) print("\n" + "=" * 60) if unused_count > 0 and not args.delete: print("Run with --delete to remove unused recordings") if __name__ == "__main__": main()