magicciv/tools/autoplay-report.py
Natalie f1f3716c1f fix(@projects/@magic-civilization): 🐛 update autoplay schema to array for layers
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-04-16 17:51:23 -07:00

389 lines
12 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Aggregate auto_play batch results into a CSV + summary + assertions.
Reads all game_<stamp>_seed<N>/ directories under <results_dir>.
Pulls the last line of turn_stats.jsonl as the final game state (fast path).
Counts events from events.jsonl.
Optionally reads .save files with --deep.
Usage:
tools/autoplay-report.py <results_dir> [--baseline PATH] [--update-baseline] [--deep]
Exits:
0 all games parsed, validated, and assertions passed
1 schema validation failure OR assertion failure OR missing results
2 usage error
stdlib only — no pip installs.
"""
from __future__ import annotations
import csv
import json
import statistics
import sys
from pathlib import Path
from typing import Any
import importlib.util as _iu
_validate_path = Path(__file__).resolve().parent / "autoplay-validate.py"
_spec = _iu.spec_from_file_location("autoplay_validate", _validate_path)
if _spec is None or _spec.loader is None:
raise ImportError(f"cannot load {_validate_path}")
_mod = _iu.module_from_spec(_spec)
_spec.loader.exec_module(_mod)
load_schema = _mod.load_schema
validate = _mod.validate
TURN_STATS_SCHEMA_NAME = "turn-stats-line"
EVENTS_SCHEMA_NAME = "events-line"
META_SCHEMA_NAME = "meta"
EVENT_TYPES = [
"city_founded", "city_captured", "city_grew", "city_starved",
"tech_researched", "unit_created", "unit_destroyed", "combat_resolved", "victory",
]
def find_game_dirs(results_dir: Path) -> tuple[list[tuple[int, Path]], list[int]]:
"""Find game_<stamp>_seed<N>/ directories. Returns (found, missing_seeds).
For each seed number, picks the most recent directory (lexicographic max on stamp).
"""
by_seed: dict[int, list[Path]] = {}
for d in results_dir.iterdir():
if not d.is_dir():
continue
name = d.name
if not name.startswith("game_"):
continue
# Expected: game_<stamp>_seed<N>
parts = name.rsplit("_seed", 1)
if len(parts) != 2 or not parts[1].isdigit():
continue
seed = int(parts[1])
by_seed.setdefault(seed, []).append(d)
found: list[tuple[int, Path]] = []
for seed in sorted(by_seed):
dirs = sorted(by_seed[seed])
found.append((seed, dirs[-1]))
# We report missing only if there are gaps in the seed sequence
if not found:
return [], []
max_seed = max(s for s, _ in found)
present = {s for s, _ in found}
missing = [s for s in range(1, max_seed + 1) if s not in present]
return found, missing
def _read_last_jsonl_line(path: Path) -> str | None:
"""Read the last non-empty line of a JSONL file efficiently."""
try:
text = path.read_text()
except OSError:
return None
for line in reversed(text.splitlines()):
line = line.strip()
if line:
return line
return None
def _count_jsonl_lines(path: Path) -> int:
"""Count non-empty lines in a JSONL file."""
try:
text = path.read_text()
except OSError:
return 0
return sum(1 for l in text.splitlines() if l.strip())
def _count_events_by_type(path: Path) -> dict[str, int]:
"""Read events.jsonl, count occurrences per event type."""
counts: dict[str, int] = {}
try:
text = path.read_text()
except OSError:
return counts
for raw in text.splitlines():
raw = raw.strip()
if not raw:
continue
try:
obj = json.loads(raw)
except json.JSONDecodeError:
continue
t = obj.get("type", "<unknown>")
counts[t] = counts.get(t, 0) + 1
return counts
AGGREGATE_FIELDS = [
"total_combats",
"total_cities_founded",
"total_cities_captured",
"turn_first_combat",
"turn_first_city_captured",
]
PLAYER_FIELDS = [
"pop", "pop_peak", "mil",
"cities", "cities_captured", "cities_lost",
"gold", "gold_peak", "gold_per_turn",
"techs", "tiles", "buildings",
"happiness",
"food_total", "production_total",
"kills", "units_lost",
"turn_first_pop_3", "turn_first_pop_4",
]
def extract_row(
seed: int, data: dict[str, Any], event_counts: dict[str, int]
) -> dict[str, Any]:
# turn-stats-line uses "turn" not "turns_played"
turn = data.get("turn", data.get("turns_played", -1))
total_events = sum(event_counts.values())
row: dict[str, Any] = {
"seed": seed,
"outcome": data["outcome"],
"turns_played": turn,
"winner_index": data["winner_index"],
"victory_type": data["victory_type"],
"wall_clock_sec": round(float(data["wall_clock_sec"]), 2),
"event_count": total_events,
}
for et in EVENT_TYPES:
row[f"evt_{et}"] = event_counts.get(et, 0)
for f in AGGREGATE_FIELDS:
row[f"agg_{f}"] = data["aggregate"][f]
player_stats: dict[str, Any] = data["player_stats"]
for pid in ("0", "1"):
pstat = player_stats.get(pid, {})
for f in PLAYER_FIELDS:
row[f"p{pid}_{f}"] = pstat.get(f, "")
row["invariant_violations"] = len(data["invariant_violations"])
return row
def csv_fieldnames() -> list[str]:
fields = [
"seed", "outcome", "turns_played", "winner_index",
"victory_type", "wall_clock_sec", "event_count",
]
fields += [f"evt_{et}" for et in EVENT_TYPES]
fields += [f"agg_{f}" for f in AGGREGATE_FIELDS]
for pid in ("0", "1"):
fields += [f"p{pid}_{f}" for f in PLAYER_FIELDS]
fields.append("invariant_violations")
return fields
VALID_OUTCOMES = {"victory", "max_turns", "defeat", "in_progress"}
def run_assertions(
rows: list[dict[str, Any]],
missing_seeds: list[int],
schema_errors: dict[Path, list[str]],
) -> list[str]:
failures: list[str] = []
if missing_seeds:
failures.append(f"Missing game directories for seeds: {missing_seeds}")
if schema_errors:
for path, errs in schema_errors.items():
failures.append(f"Schema validation failed for {path}:")
for e in errs[:5]:
failures.append(f" {e}")
if len(errs) > 5:
failures.append(f" ... ({len(errs) - 5} more)")
if not rows:
failures.append("No valid result rows to analyze.")
return failures
bad_outcomes = [r for r in rows if r["outcome"] not in VALID_OUTCOMES]
if bad_outcomes:
failures.append(f"{len(bad_outcomes)} game(s) had invalid outcome values")
total_violations = sum(r["invariant_violations"] for r in rows)
if total_violations > 0:
failures.append(f"Total invariant violations across games: {total_violations}")
max_p0_pop = max((r["p0_pop_peak"] for r in rows if r["p0_pop_peak"] != ""), default=0)
if max_p0_pop < 4:
failures.append(
f"No game reached p0_pop_peak >= 4 (max was {max_p0_pop}). "
"Growth system may be broken."
)
never_combat = [r for r in rows if r["agg_turn_first_combat"] == -1]
if never_combat:
failures.append(
f"{len(never_combat)} game(s) never fought a single combat — "
"AI may be pacifist or unreachable."
)
no_turns = [r for r in rows if r["turns_played"] < 1]
if no_turns:
failures.append(
f"{len(no_turns)} game(s) have turns_played < 1 — "
"game may have crashed before completing a turn."
)
return failures
def median_int(values: list[int | float]) -> int:
filtered = [v for v in values if isinstance(v, (int, float))]
if not filtered:
return -1
return int(statistics.median(filtered))
def print_summary(rows: list[dict[str, Any]], out: Any = sys.stderr) -> None:
print("=== autoplay batch report ===", file=out)
print(f"games: {len(rows)}", file=out)
counts: dict[str, int] = {}
for r in rows:
counts[r["outcome"]] = counts.get(r["outcome"], 0) + 1
for k, v in sorted(counts.items()):
pct = 100 * v // len(rows) if rows else 0
print(f" {k}: {v} ({pct}%)", file=out)
if rows:
print(
f"median turns_played: {median_int([r['turns_played'] for r in rows])}",
file=out,
)
print(
f"median p0_pop_peak: {median_int([r['p0_pop_peak'] for r in rows])}",
file=out,
)
print(
f"median p0_gold_peak: {median_int([r['p0_gold_peak'] for r in rows])}",
file=out,
)
print(
f"median agg_total_combats: {median_int([r['agg_total_combats'] for r in rows])}",
file=out,
)
print(
f"median event_count: {median_int([r['event_count'] for r in rows])}",
file=out,
)
print("event counts by type (total across all games):", file=out)
for et in EVENT_TYPES:
total = sum(r.get(f"evt_{et}", 0) for r in rows)
if total > 0:
print(f" {et}: {total}", file=out)
total_v = sum(r["invariant_violations"] for r in rows)
print(f"invariant violations (total): {total_v}", file=out)
def main(argv: list[str]) -> int:
positional: list[str] = []
flags: set[str] = set()
i = 1
while i < len(argv):
a = argv[i]
if a.startswith("-"):
flags.add(a)
else:
positional.append(a)
i += 1
if not positional:
print(
"usage: autoplay-report.py <results_dir> [--baseline PATH] [--update-baseline] [--deep]",
file=sys.stderr,
)
return 2
results_dir = Path(positional[0])
deep = "--deep" in flags
if not results_dir.is_dir():
print(f"ERROR: {results_dir} is not a directory", file=sys.stderr)
return 2
found, missing = find_game_dirs(results_dir)
if not found and not missing:
print(f"ERROR: No game_*_seed*/ dirs found under {results_dir}", file=sys.stderr)
return 1
ts_schema = load_schema(TURN_STATS_SCHEMA_NAME)
meta_schema = load_schema(META_SCHEMA_NAME)
rows: list[dict[str, Any]] = []
schema_errors: dict[Path, list[str]] = {}
for seed, game_dir in found:
meta_path = game_dir / "meta.json"
turn_stats_path = game_dir / "turn_stats.jsonl"
events_path = game_dir / "events.jsonl"
# Validate meta.json
if not meta_path.exists():
schema_errors[meta_path] = ["meta.json missing"]
else:
try:
meta_data = json.loads(meta_path.read_text())
meta_errs = validate(meta_data, meta_schema)
if meta_errs:
schema_errors[meta_path] = meta_errs
except (OSError, json.JSONDecodeError) as e:
schema_errors[meta_path] = [f"cannot load meta.json: {e}"]
# Fast path: read only the last line of turn_stats.jsonl
last_line = _read_last_jsonl_line(turn_stats_path)
if last_line is None:
schema_errors[turn_stats_path] = ["turn_stats.jsonl missing or empty"]
continue
try:
data = json.loads(last_line)
except json.JSONDecodeError as e:
schema_errors[turn_stats_path] = [f"last line invalid JSON: {e}"]
continue
errs = validate(data, ts_schema)
if errs:
schema_errors[turn_stats_path] = errs
continue
event_counts = _count_events_by_type(events_path) if events_path.exists() else {}
rows.append(extract_row(seed, data, event_counts))
if deep:
# Read .save files only with --deep
for save_file in sorted(game_dir.glob("*.save")):
print(f"[deep] {save_file.name}: {save_file.stat().st_size} bytes", file=sys.stderr)
# CSV to stdout
writer = csv.DictWriter(sys.stdout, fieldnames=csv_fieldnames())
writer.writeheader()
for r in rows:
writer.writerow(r)
print_summary(rows)
failures = run_assertions(rows, missing, schema_errors)
if failures:
print("\n=== FAILURES ===", file=sys.stderr)
for f in failures:
print(f" {f}", file=sys.stderr)
return 1
if "--update-baseline" in flags:
print("--update-baseline: not yet implemented (Phase 3b)", file=sys.stderr)
print("\nAll assertions passed.", file=sys.stderr)
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))