magicciv/tools/autoplay-report.py
Natalie 2607491a1b feat(@projects/@magic-civilization): add weather/climate event tracking
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-04-17 23:40:04 -07:00

833 lines
31 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Aggregate auto_play batch results into a CSV + summary + assertions.
Reads all game_<stamp>_seed<N>/ directories under <results_dir>.
Pulls the last line of turn_stats.jsonl as the final game state (fast path).
Counts events from events.jsonl.
Optionally reads .save files with --deep.
Usage:
tools/autoplay-report.py <results_dir> [--baseline PATH] [--update-baseline] [--deep]
Exits:
0 all games parsed, validated, and assertions passed
1 schema validation failure OR assertion failure OR missing results
2 usage error
stdlib only — no pip installs.
"""
from __future__ import annotations
import csv
import json
import statistics
import sys
from pathlib import Path
from typing import Any
import importlib.util as _iu
_validate_path = Path(__file__).resolve().parent / "autoplay-validate.py"
_spec = _iu.spec_from_file_location("autoplay_validate", _validate_path)
if _spec is None or _spec.loader is None:
raise ImportError(f"cannot load {_validate_path}")
_mod = _iu.module_from_spec(_spec)
_spec.loader.exec_module(_mod)
load_schema = _mod.load_schema
validate = _mod.validate
TURN_STATS_SCHEMA_NAME = "turn-stats-line"
EVENTS_SCHEMA_NAME = "events-line"
META_SCHEMA_NAME = "meta"
EVENT_TYPES = [
"city_founded", "city_captured", "city_grew", "city_starved",
"tech_researched", "unit_created", "unit_destroyed", "combat_resolved", "victory",
"weather_event", "climate_effect",
]
def find_game_dirs(results_dir: Path) -> tuple[list[tuple[int, Path]], list[int]]:
"""Find game_<stamp>_seed<N>/ directories. Returns (found, missing_seeds).
For each seed number, picks the most recent directory (lexicographic max on stamp).
"""
by_seed: dict[int, list[Path]] = {}
for d in results_dir.iterdir():
if not d.is_dir():
continue
name = d.name
if not name.startswith("game_"):
continue
# Expected: game_<stamp>_seed<N>
parts = name.rsplit("_seed", 1)
if len(parts) != 2 or not parts[1].isdigit():
continue
seed = int(parts[1])
by_seed.setdefault(seed, []).append(d)
found: list[tuple[int, Path]] = []
for seed in sorted(by_seed):
dirs = sorted(by_seed[seed])
found.append((seed, dirs[-1]))
# We report missing only if there are gaps in the seed sequence
if not found:
return [], []
max_seed = max(s for s, _ in found)
present = {s for s, _ in found}
missing = [s for s in range(1, max_seed + 1) if s not in present]
return found, missing
def _read_last_jsonl_line(path: Path) -> str | None:
"""Read the last non-empty line of a JSONL file efficiently."""
try:
text = path.read_text()
except OSError:
return None
for line in reversed(text.splitlines()):
line = line.strip()
if line:
return line
return None
def _count_jsonl_lines(path: Path) -> int:
"""Count non-empty lines in a JSONL file."""
try:
text = path.read_text()
except OSError:
return 0
return sum(1 for l in text.splitlines() if l.strip())
def _count_events_by_type(path: Path) -> dict[str, int]:
"""Read events.jsonl, count occurrences per event type."""
counts: dict[str, int] = {}
try:
text = path.read_text()
except OSError:
return counts
for raw in text.splitlines():
raw = raw.strip()
if not raw:
continue
try:
obj = json.loads(raw)
except json.JSONDecodeError:
continue
t = obj.get("type", "<unknown>")
counts[t] = counts.get(t, 0) + 1
return counts
def _load_player_clans(meta_path: Path) -> dict[str, str]:
"""Read meta.json and return the {player_index_str: clan_id} mapping.
Returns an empty dict when meta.json is missing, unparseable, lacks the
field, or records no clan entries. Older batches (pre-`player_clans`)
hit this path cleanly — callers must tolerate missing data.
"""
try:
data = json.loads(meta_path.read_text())
except (OSError, json.JSONDecodeError):
return {}
raw = data.get("player_clans", {})
if not isinstance(raw, dict):
return {}
return {str(k): str(v) for k, v in raw.items() if v}
def _stats_at_turn(
turn_stats_path: Path, target_turns: tuple[int, ...]
) -> dict[int, dict[str, Any]]:
"""Return {turn: turn_stats_line} for each target turn present in the file.
Uses the last snapshot at-or-before each target (so if the game ended at
T216 we still get a T200 reading, and T300 falls through to the final line
only when the game ran to T300). Missing turns are simply absent from the
result — callers treat absence as "no data for this snapshot".
"""
if not turn_stats_path.exists():
return {}
try:
text = turn_stats_path.read_text()
except OSError:
return {}
# Walk forward, tracking the most-recent line per bucket.
by_turn: dict[int, dict[str, Any]] = {}
remaining = sorted(target_turns)
# Parse once, keep only what we need (turn + player_stats) to bound memory.
for raw in text.splitlines():
raw = raw.strip()
if not raw:
continue
try:
line = json.loads(raw)
except json.JSONDecodeError:
continue
t = int(line.get("turn", -1))
if t < 0:
continue
for target in remaining:
if t <= target:
by_turn[target] = line
return by_turn
KNOWN_CLAN_IDS = ["blackhammer", "deepforge", "goldvein", "ironhold", "runesmith"]
# Per-clan table samples per-player stats at these turn marks.
PER_CLAN_SNAPSHOT_TURNS = (100, 200, 300)
AGGREGATE_FIELDS = [
"total_combats",
"total_cities_founded",
"total_cities_captured",
"turn_first_combat",
"turn_first_city_captured",
]
PLAYER_FIELDS = [
"pop", "pop_peak", "mil",
"cities", "cities_captured", "cities_lost",
"gold", "gold_peak", "gold_per_turn",
"techs", "tiles", "buildings",
"happiness",
"food_total", "production_total",
"kills", "units_lost",
"turn_first_pop_3", "turn_first_pop_4",
"tier_peak", "peak_unit_tier", "wonder_count",
]
# Sentinel the reporter emits when a quality-metric field is absent from
# the incoming jsonl (i.e. the batch pre-dates p0-25 instrumentation). Keeps
# the CSV/stdout pipeline non-crashing when ingesting historical data.
QUALITY_METRIC_ABSENT = -1
QUALITY_METRIC_FIELDS = ("tier_peak", "peak_unit_tier", "wonder_count")
def extract_row(
seed: int,
data: dict[str, Any],
event_counts: dict[str, int],
player_clans: dict[str, str] | None = None,
snapshots: dict[int, dict[str, Any]] | None = None,
) -> dict[str, Any]:
# turn-stats-line uses "turn" not "turns_played"
turn = data.get("turn", data.get("turns_played", -1))
total_events = sum(event_counts.values())
row: dict[str, Any] = {
"seed": seed,
"outcome": data["outcome"],
"turns_played": turn,
"winner_index": data["winner_index"],
"winner_personality": data.get("winner_personality", ""),
"victory_type": data["victory_type"],
"wall_clock_sec": round(float(data["wall_clock_sec"]), 2),
"event_count": total_events,
}
for et in EVENT_TYPES:
row[f"evt_{et}"] = event_counts.get(et, 0)
for f in AGGREGATE_FIELDS:
row[f"agg_{f}"] = data["aggregate"][f]
player_stats: dict[str, Any] = data["player_stats"]
for pid in ("0", "1"):
pstat = player_stats.get(pid, {})
for f in PLAYER_FIELDS:
if f in QUALITY_METRIC_FIELDS:
# Backward compat: pre-p0-25 batches omit these fields. Use
# the absent sentinel so downstream medians filter them out
# rather than blend 0 into the distribution.
row[f"p{pid}_{f}"] = pstat.get(f, QUALITY_METRIC_ABSENT)
else:
row[f"p{pid}_{f}"] = pstat.get(f, "")
row["invariant_violations"] = len(data["invariant_violations"])
# Out-of-band fields for the per-clan aggregator. Not emitted to CSV.
row["_player_clans"] = dict(player_clans) if player_clans else {}
row["_snapshots"] = snapshots or {}
# Canopy telemetry (p0-35). Absent on pre-p0-35 batches — surfaced as
# None so the summary pipeline can skip those rows cleanly.
ecology = data.get("ecology") or {}
row["_canopy_mean_final"] = ecology.get("flora_canopy_mean")
row["_canopy_delta_final"] = ecology.get("flora_canopy_delta")
agg = data.get("aggregate", {})
row["_total_weather_events"] = agg.get("total_weather_events")
return row
def csv_fieldnames() -> list[str]:
fields = [
"seed", "outcome", "turns_played", "winner_index", "winner_personality",
"victory_type", "wall_clock_sec", "event_count",
]
fields += [f"evt_{et}" for et in EVENT_TYPES]
fields += [f"agg_{f}" for f in AGGREGATE_FIELDS]
for pid in ("0", "1"):
fields += [f"p{pid}_{f}" for f in PLAYER_FIELDS]
fields.append("invariant_violations")
return fields
VALID_OUTCOMES = {"victory", "max_turns", "defeat", "in_progress"}
def run_assertions(
rows: list[dict[str, Any]],
missing_seeds: list[int],
schema_errors: dict[Path, list[str]],
) -> list[str]:
failures: list[str] = []
if missing_seeds:
failures.append(f"Missing game directories for seeds: {missing_seeds}")
if schema_errors:
for path, errs in schema_errors.items():
failures.append(f"Schema validation failed for {path}:")
for e in errs[:5]:
failures.append(f" {e}")
if len(errs) > 5:
failures.append(f" ... ({len(errs) - 5} more)")
if not rows:
failures.append("No valid result rows to analyze.")
return failures
bad_outcomes = [r for r in rows if r["outcome"] not in VALID_OUTCOMES]
if bad_outcomes:
failures.append(f"{len(bad_outcomes)} game(s) had invalid outcome values")
total_violations = sum(r["invariant_violations"] for r in rows)
if total_violations > 0:
failures.append(f"Total invariant violations across games: {total_violations}")
max_p0_pop = max((r["p0_pop_peak"] for r in rows if r["p0_pop_peak"] != ""), default=0)
if max_p0_pop < 4:
failures.append(
f"No game reached p0_pop_peak >= 4 (max was {max_p0_pop}). "
"Growth system may be broken."
)
never_combat = [r for r in rows if r["agg_turn_first_combat"] == -1]
if never_combat:
failures.append(
f"{len(never_combat)} game(s) never fought a single combat — "
"AI may be pacifist or unreachable."
)
no_turns = [r for r in rows if r["turns_played"] < 1]
if no_turns:
failures.append(
f"{len(no_turns)} game(s) have turns_played < 1 — "
"game may have crashed before completing a turn."
)
return failures
def median_int(values: list[int | float]) -> int:
filtered = [v for v in values if isinstance(v, (int, float))]
if not filtered:
return -1
return int(statistics.median(filtered))
def build_personality_win_table(rows: list[dict[str, Any]]) -> dict[str, dict[str, int]]:
"""Return per-clan stats: clan_id → {wins, appearances, losses}.
appearances = number of games in which this clan was the AI opponent.
wins = games won by this clan.
losses = appearances - wins.
Uses winner_personality from each row's final turn-stats line.
Human player (empty clan_id / winner_index 0 with no personality) is
bucketed under the empty string key and excluded from the balance check.
"""
stats: dict[str, dict[str, int]] = {}
for row in rows:
personality = row.get("winner_personality", "")
# Count only AI opponents (non-empty clan)
if not personality:
continue
if personality not in stats:
stats[personality] = {"wins": 0, "appearances": 0}
if row["outcome"] == "victory":
stats[personality]["wins"] += 1
stats[personality]["appearances"] += 1
for clan_stats in stats.values():
clan_stats["losses"] = clan_stats["appearances"] - clan_stats["wins"]
return stats
def _safe_median(values: list[Any]) -> float | None:
nums = [float(v) for v in values if isinstance(v, (int, float))]
return statistics.median(nums) if nums else None
def build_per_clan_stats(rows: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
"""Aggregate per-clan metrics across a batch.
Data sources, in priority order:
- `_player_clans` from meta.json → authoritative: every AI appearance is
counted regardless of whether that clan won the game.
- `winner_personality` fallback when `_player_clans` is empty for a row —
credits the winning clan with an appearance + a win. Non-winning clans
remain unseen under this fallback (older batches undercount apps).
For each clan we record:
appearances, wins, win_rate, losses,
median_turns_to_victory (only games this clan won),
median_pop_peak, median_unit_count_{100,200,300}, median_gold_{100,200,300},
median_building_count
All "median_*" fields are computed across the clan's appearances; `None`
when no sample exists. Unit/gold snapshots pull from the per-turn lines
recorded in `_snapshots`. Building count uses each game's final line.
Every clan in KNOWN_CLAN_IDS appears in the result, even with zero data.
"""
acc: dict[str, dict[str, list[Any]]] = {
cid: {
"appearances": [],
"wins": [],
"turns_to_victory": [],
"pop_peak": [],
"buildings": [],
"unit_100": [], "unit_200": [], "unit_300": [],
"gold_100": [], "gold_200": [], "gold_300": [],
}
for cid in KNOWN_CLAN_IDS
}
def _ensure(clan: str) -> None:
if clan not in acc:
acc[clan] = {
"appearances": [],
"wins": [],
"turns_to_victory": [],
"pop_peak": [],
"buildings": [],
"unit_100": [], "unit_200": [], "unit_300": [],
"gold_100": [], "gold_200": [], "gold_300": [],
}
for row in rows:
clans_in_row: dict[str, str] = row.get("_player_clans") or {}
winner_idx = row.get("winner_index", -1)
winner_clan = row.get("winner_personality", "")
outcome = row.get("outcome", "")
snapshots: dict[int, dict[str, Any]] = row.get("_snapshots") or {}
if clans_in_row:
# Authoritative path: one record per AI player in this game.
iter_items = clans_in_row.items()
elif winner_clan:
# Fallback: we only know about the winner. Synthesize a single
# record keyed at the winner's player index so per-player stats
# pull from the same slot in the final turn line.
iter_items = [(str(int(winner_idx)), winner_clan)]
else:
continue
for pid_str, clan in iter_items:
if not clan:
continue
_ensure(clan)
bucket = acc[clan]
bucket["appearances"].append(1)
is_winner = (
outcome == "victory"
and str(int(winner_idx)) == str(pid_str)
)
if is_winner:
bucket["wins"].append(1)
ttv = row.get("turns_played", -1)
if isinstance(ttv, (int, float)) and ttv >= 0:
bucket["turns_to_victory"].append(float(ttv))
# Per-player stats from the final turn line (already on row).
pop_peak = row.get(f"p{pid_str}_pop_peak")
buildings = row.get(f"p{pid_str}_buildings")
if isinstance(pop_peak, (int, float)):
bucket["pop_peak"].append(float(pop_peak))
if isinstance(buildings, (int, float)):
bucket["buildings"].append(float(buildings))
# Per-turn snapshots for T100/T200/T300.
for t in PER_CLAN_SNAPSHOT_TURNS:
snap = snapshots.get(t)
if not snap:
continue
pstat = snap.get("player_stats", {}).get(str(pid_str), {})
mil = pstat.get("mil")
gold = pstat.get("gold")
if isinstance(mil, (int, float)):
bucket[f"unit_{t}"].append(float(mil))
if isinstance(gold, (int, float)):
bucket[f"gold_{t}"].append(float(gold))
out: dict[str, dict[str, Any]] = {}
for clan, samples in acc.items():
apps = len(samples["appearances"])
wins = len(samples["wins"])
out[clan] = {
"appearances": apps,
"wins": wins,
"losses": apps - wins,
"win_rate": (wins / apps) if apps else None,
"median_turns_to_victory": _safe_median(samples["turns_to_victory"]),
"median_pop_peak": _safe_median(samples["pop_peak"]),
"median_building_count": _safe_median(samples["buildings"]),
"median_unit_count_100": _safe_median(samples["unit_100"]),
"median_unit_count_200": _safe_median(samples["unit_200"]),
"median_unit_count_300": _safe_median(samples["unit_300"]),
"median_gold_100": _safe_median(samples["gold_100"]),
"median_gold_200": _safe_median(samples["gold_200"]),
"median_gold_300": _safe_median(samples["gold_300"]),
}
return out
def _fmt(v: Any) -> str:
if v is None:
return ""
if isinstance(v, float):
return f"{v:.0f}" if v.is_integer() or v >= 10 else f"{v:.1f}"
return str(v)
def render_per_clan_table(
rows: list[dict[str, Any]], out: Any = sys.stderr
) -> dict[str, dict[str, Any]]:
"""Print the per-clan metrics table and return the underlying dict."""
stats = build_per_clan_stats(rows)
print("per-clan stats:", file=out)
header = (
f" {'clan':<12} "
f"{'apps':>5} {'wins':>5} {'win%':>6} "
f"{'ttv':>5} {'pop':>5} {'bldg':>5} "
f"{'u100':>5} {'u200':>5} {'u300':>5} "
f"{'g100':>6} {'g200':>6} {'g300':>6}"
)
print(header, file=out)
any_data = False
for clan in sorted(stats):
s = stats[clan]
if s["appearances"] == 0 and clan not in KNOWN_CLAN_IDS:
continue
if s["appearances"] > 0:
any_data = True
wr = s["win_rate"]
wr_str = f"{int(round(wr * 100))}%" if wr is not None else ""
print(
f" {clan:<12} "
f"{s['appearances']:>5} {s['wins']:>5} {wr_str:>6} "
f"{_fmt(s['median_turns_to_victory']):>5} "
f"{_fmt(s['median_pop_peak']):>5} "
f"{_fmt(s['median_building_count']):>5} "
f"{_fmt(s['median_unit_count_100']):>5} "
f"{_fmt(s['median_unit_count_200']):>5} "
f"{_fmt(s['median_unit_count_300']):>5} "
f"{_fmt(s['median_gold_100']):>6} "
f"{_fmt(s['median_gold_200']):>6} "
f"{_fmt(s['median_gold_300']):>6}",
file=out,
)
if not any_data:
print(" (no clan data — meta.player_clans empty, no winning AIs)", file=out)
return stats
def print_personality_summary(
rows: list[dict[str, Any]], out: Any = sys.stderr
) -> None:
table = build_personality_win_table(rows)
if not table:
print("personality win-rate: no data (winner_personality missing from results)", file=out)
return
total_games = len(rows)
print("personality win-rate:", file=out)
print(f" {'clan':<14} {'wins':>5} {'apps':>5} {'win%':>6}", file=out)
for clan in sorted(table):
entry = table[clan]
apps = entry["appearances"]
wins = entry["wins"]
pct = 100 * wins // apps if apps else 0
flag = " <-- IMBALANCED (>50%)" if pct > 50 else ""
print(f" {clan:<14} {wins:>5} {apps:>5} {pct:>5}%{flag}", file=out)
def build_quality_metrics(rows: list[dict[str, Any]]) -> dict[str, float | None]:
"""Aggregate state-at-end quality metrics across a batch (p0-25).
Returns medians for:
median_winner_tier_peak, median_loser_tier_peak, median_tier_peak_gap,
median_peak_unit_tier (all players), median_wonder_count_per_player.
All values are `None` when no sample exists (e.g. all rows are pre-p0-25
batches where fields are absent — sentinel `QUALITY_METRIC_ABSENT` is
filtered out). The `_gap` is computed per-game (winner_tier - loser_tier
for that specific game) then the batch median taken, so it reflects typical
within-game spread rather than the delta of independent medians.
"""
winner_tiers: list[float] = []
loser_tiers: list[float] = []
per_game_gaps: list[float] = []
unit_tiers: list[float] = []
wonder_counts: list[float] = []
for row in rows:
winner_idx = row.get("winner_index", -1)
p0_tp = row.get("p0_tier_peak", QUALITY_METRIC_ABSENT)
p1_tp = row.get("p1_tier_peak", QUALITY_METRIC_ABSENT)
p0_ok = isinstance(p0_tp, (int, float)) and p0_tp != QUALITY_METRIC_ABSENT
p1_ok = isinstance(p1_tp, (int, float)) and p1_tp != QUALITY_METRIC_ABSENT
# Winner/loser tier_peak + per-game gap (only when both sides recorded).
if winner_idx in (0, 1) and p0_ok and p1_ok:
if winner_idx == 0:
winner_tiers.append(float(p0_tp))
loser_tiers.append(float(p1_tp))
per_game_gaps.append(float(p0_tp) - float(p1_tp))
else:
winner_tiers.append(float(p1_tp))
loser_tiers.append(float(p0_tp))
per_game_gaps.append(float(p1_tp) - float(p0_tp))
for pid in ("0", "1"):
put = row.get(f"p{pid}_peak_unit_tier", QUALITY_METRIC_ABSENT)
if isinstance(put, (int, float)) and put != QUALITY_METRIC_ABSENT:
unit_tiers.append(float(put))
wc = row.get(f"p{pid}_wonder_count", QUALITY_METRIC_ABSENT)
if isinstance(wc, (int, float)) and wc != QUALITY_METRIC_ABSENT:
wonder_counts.append(float(wc))
return {
"median_winner_tier_peak": _safe_median(winner_tiers),
"median_loser_tier_peak": _safe_median(loser_tiers),
"median_tier_peak_gap": _safe_median(per_game_gaps),
"median_peak_unit_tier": _safe_median(unit_tiers),
"median_wonder_count_per_player": _safe_median(wonder_counts),
}
def print_quality_metrics(
rows: list[dict[str, Any]], out: Any = sys.stderr
) -> dict[str, float | None]:
"""Print the state-at-end quality metrics block and return the dict."""
q = build_quality_metrics(rows)
print("state-at-end quality metrics (p0-25):", file=out)
any_data = any(v is not None for v in q.values())
if not any_data:
print(" (no data — batch pre-dates p0-25 instrumentation)", file=out)
return q
print(f" median_winner_tier_peak: {_fmt(q['median_winner_tier_peak'])}", file=out)
print(f" median_loser_tier_peak: {_fmt(q['median_loser_tier_peak'])}", file=out)
print(f" median_tier_peak_gap: {_fmt(q['median_tier_peak_gap'])}", file=out)
print(f" median_peak_unit_tier: {_fmt(q['median_peak_unit_tier'])}", file=out)
print(f" median_wonder_count_per_player: {_fmt(q['median_wonder_count_per_player'])}", file=out)
return q
def print_canopy_summary(
rows: list[dict[str, Any]], out: Any = sys.stderr
) -> None:
"""Report canopy trend across the batch (p0-35).
Sources each game's final `ecology.flora_canopy_mean` / `flora_canopy_delta`
from the last turn_stats line. Historical batches pre-p0-35 have None for
these fields — we skip those rows rather than mixing 0.0 into the median.
"""
means = [r["_canopy_mean_final"] for r in rows
if isinstance(r.get("_canopy_mean_final"), (int, float))]
deltas = [r["_canopy_delta_final"] for r in rows
if isinstance(r.get("_canopy_delta_final"), (int, float))]
print("flora canopy (p0-35):", file=out)
if not means:
print(" (no data — batch pre-dates p0-35 instrumentation)", file=out)
return
print(f" median final canopy mean: {_fmt(statistics.median(means))}", file=out)
nonzero_deltas = [d for d in deltas if d != 0.0]
print(
f" games with evolving canopy: {len(nonzero_deltas)} / {len(rows)} "
f"(non-zero final delta)", file=out
)
if deltas:
print(f" median final |delta|: {_fmt(statistics.median([abs(d) for d in deltas]))}", file=out)
def print_weather_summary(
rows: list[dict[str, Any]], out: Any = sys.stderr
) -> None:
"""Report weather-event coverage across the batch (p0-36)."""
totals = [r["_total_weather_events"] for r in rows
if isinstance(r.get("_total_weather_events"), (int, float))]
evt_counts = [r.get("evt_weather_event", 0) for r in rows]
print("weather events (p0-36):", file=out)
if not totals:
print(" (no data — batch pre-dates p0-36 instrumentation)", file=out)
return
games_with_any = sum(1 for t in totals if t > 0)
print(
f" games with >=1 weather_event: {games_with_any} / {len(rows)}",
file=out
)
if totals:
print(f" median total_weather_events: {_fmt(statistics.median(totals))}", file=out)
total_evts = sum(evt_counts)
print(f" total weather_event records (events.jsonl): {total_evts}", file=out)
def print_summary(rows: list[dict[str, Any]], out: Any = sys.stderr) -> None:
print("=== autoplay batch report ===", file=out)
print(f"games: {len(rows)}", file=out)
counts: dict[str, int] = {}
for r in rows:
counts[r["outcome"]] = counts.get(r["outcome"], 0) + 1
for k, v in sorted(counts.items()):
pct = 100 * v // len(rows) if rows else 0
print(f" {k}: {v} ({pct}%)", file=out)
if rows:
print(
f"median turns_played: {median_int([r['turns_played'] for r in rows])}",
file=out,
)
print(
f"median p0_pop_peak: {median_int([r['p0_pop_peak'] for r in rows])}",
file=out,
)
print(
f"median p0_gold_peak: {median_int([r['p0_gold_peak'] for r in rows])}",
file=out,
)
print(
f"median agg_total_combats: {median_int([r['agg_total_combats'] for r in rows])}",
file=out,
)
print(
f"median event_count: {median_int([r['event_count'] for r in rows])}",
file=out,
)
print("event counts by type (total across all games):", file=out)
for et in EVENT_TYPES:
total = sum(r.get(f"evt_{et}", 0) for r in rows)
if total > 0:
print(f" {et}: {total}", file=out)
total_v = sum(r["invariant_violations"] for r in rows)
print(f"invariant violations (total): {total_v}", file=out)
print_quality_metrics(rows, out=out)
print_canopy_summary(rows, out=out)
print_weather_summary(rows, out=out)
print_personality_summary(rows, out=out)
render_per_clan_table(rows, out=out)
def main(argv: list[str]) -> int:
positional: list[str] = []
flags: set[str] = set()
i = 1
while i < len(argv):
a = argv[i]
if a.startswith("-"):
flags.add(a)
else:
positional.append(a)
i += 1
if not positional:
print(
"usage: autoplay-report.py <results_dir> [--baseline PATH] [--update-baseline] [--deep]",
file=sys.stderr,
)
return 2
results_dir = Path(positional[0])
deep = "--deep" in flags
if not results_dir.is_dir():
print(f"ERROR: {results_dir} is not a directory", file=sys.stderr)
return 2
found, missing = find_game_dirs(results_dir)
if not found and not missing:
print(f"ERROR: No game_*_seed*/ dirs found under {results_dir}", file=sys.stderr)
return 1
ts_schema = load_schema(TURN_STATS_SCHEMA_NAME)
meta_schema = load_schema(META_SCHEMA_NAME)
rows: list[dict[str, Any]] = []
schema_errors: dict[Path, list[str]] = {}
for seed, game_dir in found:
meta_path = game_dir / "meta.json"
turn_stats_path = game_dir / "turn_stats.jsonl"
events_path = game_dir / "events.jsonl"
# Validate meta.json
if not meta_path.exists():
schema_errors[meta_path] = ["meta.json missing"]
else:
try:
meta_data = json.loads(meta_path.read_text())
meta_errs = validate(meta_data, meta_schema)
if meta_errs:
schema_errors[meta_path] = meta_errs
except (OSError, json.JSONDecodeError) as e:
schema_errors[meta_path] = [f"cannot load meta.json: {e}"]
# Fast path: read only the last line of turn_stats.jsonl
last_line = _read_last_jsonl_line(turn_stats_path)
if last_line is None:
schema_errors[turn_stats_path] = ["turn_stats.jsonl missing or empty"]
continue
try:
data = json.loads(last_line)
except json.JSONDecodeError as e:
schema_errors[turn_stats_path] = [f"last line invalid JSON: {e}"]
continue
errs = validate(data, ts_schema)
if errs:
schema_errors[turn_stats_path] = errs
continue
event_counts = _count_events_by_type(events_path) if events_path.exists() else {}
player_clans = _load_player_clans(meta_path) if meta_path.exists() else {}
snapshots = _stats_at_turn(turn_stats_path, PER_CLAN_SNAPSHOT_TURNS)
rows.append(
extract_row(seed, data, event_counts, player_clans, snapshots)
)
if deep:
# Read .save files only with --deep
for save_file in sorted(game_dir.glob("*.save")):
print(f"[deep] {save_file.name}: {save_file.stat().st_size} bytes", file=sys.stderr)
# CSV to stdout — ignore out-of-band `_*` fields used by the per-clan
# aggregator so the CSV schema stays stable.
writer = csv.DictWriter(
sys.stdout, fieldnames=csv_fieldnames(), extrasaction="ignore"
)
writer.writeheader()
for r in rows:
writer.writerow(r)
print_summary(rows)
failures = run_assertions(rows, missing, schema_errors)
if failures:
print("\n=== FAILURES ===", file=sys.stderr)
for f in failures:
print(f" {f}", file=sys.stderr)
return 1
if "--update-baseline" in flags:
print("--update-baseline: not yet implemented (Phase 3b)", file=sys.stderr)
print("\nAll assertions passed.", file=sys.stderr)
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))