magicciv/tools/checklist-report.py

560 lines
23 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Full 4X checklist verification for a 3-seed autoplay batch.
Reads a batch dir from tools/autoplay-batch.sh and emits a markdown table
of metric | value | target | PASS/FAIL against the STOP-criterion thresholds.
Usage:
tools/checklist-report.py [--difficulty easy|normal|hard|insane] <batch_dir>
tools/checklist-report.py personality_win_balance <batch_dir>
emits a JSON verdict on stdout; exits 0 on pass, 1 on fail.
"""
from __future__ import annotations
import json, statistics, sys
from pathlib import Path
from typing import Any
KNOWN_CLAN_IDS = ["blackhammer", "deepforge", "goldvein", "ironhold", "runesmith"]
MIN_APPEARANCES_FOR_NO_WIN_CHECK = 5
THRESHOLDS = {
# pop_peak vic_lo vic_hi ttv_lo ttv_hi combats
"easy": (10, 20, 60, 300, 9999, 50),
"normal": (20, 40, 70, 200, 350, 120),
"hard": (30, 50, 80, 150, 250, 200),
"insane": (35, 60, 90, 100, 200, 300),
}
def _jsonl(p: Path) -> list[dict]:
if not p.exists():
return []
out = []
for ln in p.read_text().splitlines():
ln = ln.strip()
if ln:
try: out.append(json.loads(ln))
except json.JSONDecodeError: pass
return out
def _collect(gd: Path) -> dict:
stats = _jsonl(gd / "turn_stats.jsonl")
events = _jsonl(gd / "events.jsonl")
final = stats[-1] if stats else {}
agg, pstats = final.get("aggregate", {}), final.get("player_stats", {})
ev = {}
for e in events:
ev[e.get("type", "")] = ev.get(e.get("type", ""), 0) + 1
happy_distinct = max(
len({s["player_stats"].get(pid, {}).get("happiness", 0) for s in stats if "player_stats" in s})
for pid in ("0", "1")
) if stats else 0
p0_ok = p1_ok = False
for s in stats:
if s.get("turn", 0) > 100: break
p0 = s.get("player_stats", {}).get("0", {})
p1 = s.get("player_stats", {}).get("1", {})
if p0.get("pop", 0) >= 5 and p0.get("mil", 0) >= 4: p0_ok = True
if p1.get("pop", 0) >= 5 and p1.get("mil", 0) >= 4: p1_ok = True
inv = sum(len(s.get("invariant_violations", [])) for s in stats)
log = gd / "game.log"
errs = sum(1 for ln in log.read_text().splitlines() if "SCRIPT ERROR" in ln) if log.exists() else 0
player_clans: dict[str, str] = {}
meta_path = gd / "meta.json"
if meta_path.exists():
try:
raw = json.loads(meta_path.read_text()).get("player_clans", {})
if isinstance(raw, dict):
player_clans = {str(k): str(v) for k, v in raw.items() if v}
except (OSError, json.JSONDecodeError):
pass
# Defensive fallback for legacy matchup-grid runs (pre per-slot pinning):
# if any player slot has empty clan_id, derive it from the parent dir name
# `<root>/<clan_a>_vs_<clan_b>/as_<clan_X>/game_*`. The pinned clan was
# historically placed on slot 1, the other on slot 0.
parent = gd.parent
pair_root = parent.parent
if parent.name.startswith("as_") and "_vs_" in pair_root.name:
pinned_clan = parent.name[len("as_"):]
pair_clans = pair_root.name.split("_vs_")
if len(pair_clans) == 2 and pinned_clan in pair_clans:
other_clan = pair_clans[0] if pair_clans[1] == pinned_clan else pair_clans[1]
# Legacy: pinned on slot 1, "other" on slot 0
if "0" not in player_clans:
player_clans["0"] = other_clan
if "1" not in player_clans:
player_clans["1"] = pinned_clan
return {
"turns": final.get("turn", 0), "outcome": final.get("outcome", "?"),
"winner_personality": final.get("winner_personality", ""),
"winner_index": final.get("winner_index", -1),
"player_clans": player_clans,
"pop_peak": max(pstats.get("0", {}).get("pop_peak", 0), pstats.get("1", {}).get("pop_peak", 0)),
"p0_tiles": pstats.get("0", {}).get("tiles", 0),
"p0_techs": pstats.get("0", {}).get("techs", 0),
"combats": agg.get("total_combats", 0),
"happy_distinct": happy_distinct,
"imp_events": ev.get("improvement_built", 0),
"loot_events": ev.get("loot_dropped", 0),
"gate_events": ev.get("resource_gate_rejected", 0),
"both_p100": p0_ok and p1_ok, "invariants": inv, "script_errors": errs,
}
WIN_RATE_BALANCE_THRESHOLD = 50
def personality_win_balance(results: list[tuple[int, dict]]) -> tuple[bool, str]:
"""Return (balanced, detail_string).
balanced is True when no clan with >=1 appearance wins more than
WIN_RATE_BALANCE_THRESHOLD % of its games.
"""
tally: dict[str, dict[str, int]] = {}
for _, r in results:
clan = r.get("winner_personality", "")
if not clan:
continue
if clan not in tally:
tally[clan] = {"wins": 0, "appearances": 0}
tally[clan]["appearances"] += 1
if r["outcome"] == "victory":
tally[clan]["wins"] += 1
if not tally:
return True, "no data"
parts: list[str] = []
balanced = True
for clan in sorted(tally):
apps = tally[clan]["appearances"]
wins = tally[clan]["wins"]
pct = 100 * wins // apps if apps else 0
parts.append(f"{clan}:{wins}/{apps}({pct}%)")
if pct > WIN_RATE_BALANCE_THRESHOLD:
balanced = False
return balanced, " ".join(parts)
def _tally_appearances(results: list[tuple[int, dict]]) -> dict[str, dict[str, int]]:
"""Count wins + appearances per clan.
Prefers `player_clans` (every AI in every game). Falls back to
`winner_personality` when no clan data is present on a row (undercounts
appearances for non-winning clans see Task #9 notes).
"""
tally: dict[str, dict[str, int]] = {}
for _, r in results:
clans = r.get("player_clans") or {}
outcome = r.get("outcome", "")
winner_idx = r.get("winner_index", -1)
if clans:
for pid, clan in clans.items():
if not clan:
continue
entry = tally.setdefault(clan, {"wins": 0, "appearances": 0})
entry["appearances"] += 1
if outcome == "victory" and str(int(winner_idx)) == str(pid):
entry["wins"] += 1
else:
clan = r.get("winner_personality", "")
if not clan:
continue
entry = tally.setdefault(clan, {"wins": 0, "appearances": 0})
entry["appearances"] += 1
if outcome == "victory":
entry["wins"] += 1
return tally
def personality_win_balance_verdict(
results: list[tuple[int, dict]],
) -> dict[str, Any]:
"""Return a CI-consumable JSON verdict for the two-clause balance gate.
Clauses:
1. No clan has win_rate > 50 % across its appearances.
2. Every clan with MIN_APPEARANCES_FOR_NO_WIN_CHECK appearances must
have 1 win. Clans below the threshold are exempt (sample size is
too small to fail on the "zero wins" signal).
The verdict dict shape:
{
"pass": bool,
"sample_size": int,
"reasons": [str, ...], # empty when pass=True
"clans": {clan_id: {apps, wins, losses, win_rate_pct}, ...},
"missing_clans": [clan_id, ...], # clans from KNOWN_CLAN_IDS with 0 apps
}
"""
tally = _tally_appearances(results)
reasons: list[str] = []
clans_out: dict[str, dict[str, Any]] = {}
for clan in sorted(set(tally) | set(KNOWN_CLAN_IDS)):
entry = tally.get(clan, {"wins": 0, "appearances": 0})
apps = entry["appearances"]
wins = entry["wins"]
pct = (100 * wins / apps) if apps else 0.0
clans_out[clan] = {
"appearances": apps,
"wins": wins,
"losses": apps - wins,
"win_rate_pct": round(pct, 2),
}
if apps > 0 and pct > WIN_RATE_BALANCE_THRESHOLD:
reasons.append(
f"{clan} win_rate {pct:.1f}% exceeds {WIN_RATE_BALANCE_THRESHOLD}%"
f" ({wins}/{apps})"
)
if apps >= MIN_APPEARANCES_FOR_NO_WIN_CHECK and wins == 0:
reasons.append(
f"{clan} has {apps} appearances but 0 wins"
f" (threshold: >= {MIN_APPEARANCES_FOR_NO_WIN_CHECK})"
)
missing = [c for c in KNOWN_CLAN_IDS if clans_out[c]["appearances"] == 0]
return {
"pass": not reasons,
"sample_size": len(results),
"reasons": reasons,
"clans": clans_out,
"missing_clans": missing,
}
def _load_batch_results(batch: Path) -> list[tuple[int, dict]]:
games = sorted(
[(int(d.name.rsplit("_seed", 1)[1]), d) for d in batch.iterdir()
if d.is_dir() and d.name.startswith("game_") and d.name.rsplit("_seed", 1)[-1].isdigit()]
)
return [(s, _collect(d)) for s, d in games]
def _load_results_recursive(parent: Path) -> list[tuple[int, dict]]:
"""Find every `game_*_seed<N>` dir under `parent` at any depth and
collect their stats. Used by matchup_balance (10 sub-dirs, one per
pair × position) where `_load_batch_results` which looks only at
direct children misses the nesting.
"""
out: list[tuple[int, dict]] = []
for d in sorted(parent.rglob("game_*")):
if not d.is_dir():
continue
if "_seed" not in d.name:
continue
tail = d.name.rsplit("_seed", 1)[-1]
if not tail.isdigit():
continue
out.append((int(tail), _collect(d)))
return out
# ── matchup_balance: grid across all C(5,2)=10 1v1 pairings ────────────
def matchup_balance_verdict(results: list[tuple[int, dict]]) -> dict[str, Any]:
"""Verdict for the 1v1 matchup-grid gate (prerequisite for ultimate_stress).
A matchup-grid run is the 10 unordered clan pairs, each run COUNT times
in each position. This gate checks that ACROSS THE GRID:
1. No single clan wins more than `WIN_RATE_BALANCE_THRESHOLD` (50%)
of its appearances. Reusing the same threshold as personality_win_balance.
2. Every clan must appear in at least `MIN_APPEARANCES_FOR_NO_WIN_CHECK`
games. If the caller ran too few seeds per pair, the grid is
incomplete and we fail with an explicit reason (not a silent pass).
3. Every clan with `MIN_APPEARANCES_FOR_NO_WIN_CHECK` appearances
must win at least ONE game.
Shape of the returned verdict dict is identical to
`personality_win_balance_verdict` so consumers can share rendering.
"""
tally = _tally_appearances(results)
reasons: list[str] = []
clans_out: dict[str, dict[str, Any]] = {}
for clan in sorted(set(tally) | set(KNOWN_CLAN_IDS)):
entry = tally.get(clan, {"wins": 0, "appearances": 0})
apps = entry["appearances"]
wins = entry["wins"]
pct = (100 * wins / apps) if apps else 0.0
clans_out[clan] = {
"appearances": apps,
"wins": wins,
"losses": apps - wins,
"win_rate_pct": round(pct, 2),
}
if apps < MIN_APPEARANCES_FOR_NO_WIN_CHECK:
reasons.append(
f"{clan} has only {apps} appearances (need "
f">= {MIN_APPEARANCES_FOR_NO_WIN_CHECK}); grid incomplete — "
f"run tools/matchup-grid.sh with COUNT>=5"
)
continue
if pct > WIN_RATE_BALANCE_THRESHOLD:
reasons.append(
f"{clan} win_rate {pct:.1f}% exceeds {WIN_RATE_BALANCE_THRESHOLD}%"
f" ({wins}/{apps})"
)
if wins == 0:
reasons.append(
f"{clan} has {apps} appearances but 0 wins in the grid"
)
missing = [c for c in KNOWN_CLAN_IDS if clans_out[c]["appearances"] == 0]
return {
"pass": not reasons,
"sample_size": len(results),
"reasons": reasons,
"clans": clans_out,
"missing_clans": missing,
}
# ── ultimate_stress: huge-map 5-clan free-for-all ──────────────────────
def ultimate_stress_verdict(results: list[tuple[int, dict]]) -> dict[str, Any]:
"""Verdict for the huge-map 5-clan ultimate stress gate.
This is the end-state AI-lookahead gate: 5 clan AIs competing on a
map sized for 8. We demand four properties:
1. Sample size at least 5 seeded games (below this the verdict
isn't statistically meaningful; exit 2 rather than lying about
pass/fail).
2. Decisive-game rate at least 50% of games end in victory (not
stalemate / in_progress). If MCTS stalls on a huge map the
lookahead isn't working.
3. Winner distribution is non-degenerate at least 2 DISTINCT clans
win across the grid. A single clan sweeping every seed means
the run reduces to the "one-strong-clan" pathology matchup_balance
was supposed to catch; the ultimate test re-checks at scale.
4. Median game length uses the map 40% of the turn_limit (the
gate input batch's wall_clock_turn cap). If games snap-end at
T20 on a 500-turn cap, the map isn't being used.
The returned dict shape mirrors the other verdict functions for
rendering consistency.
"""
reasons: list[str] = []
sample = len(results)
min_sample = 5
if sample < min_sample:
reasons.append(
f"ultimate_stress needs >= {min_sample} games; got {sample}. "
f"Re-run tools/huge-map-5clan.sh with SEEDS>={min_sample}"
)
return {
"pass": False,
"sample_size": sample,
"reasons": reasons,
"clans": {},
"victory_count": 0,
"median_turn": 0,
"turn_limit_seen": 0,
}
# Tally per-game outcomes + turn counts.
victory_count = 0
winner_clans_seen: set[str] = set()
turns: list[int] = []
turn_limit_seen = 0
for _, r in results:
outcome = r.get("outcome", "")
turn = int(r.get("turn", 0))
turn_limit_seen = max(turn_limit_seen, turn)
turns.append(turn)
if outcome == "victory":
victory_count += 1
# Record winning clan if meta has it.
wi = r.get("winner_index", -1)
clans = r.get("player_clans") or {}
if outcome == "victory" and clans:
clan = clans.get(str(int(wi)), "") if wi is not None else ""
if clan:
winner_clans_seen.add(clan)
elif outcome == "victory":
# Fallback to winner_personality when player_clans absent.
wp = r.get("winner_personality", "")
if wp:
winner_clans_seen.add(wp)
victory_rate = victory_count / sample
if victory_rate < 0.5:
reasons.append(
f"decisive-game rate {victory_count}/{sample} = {100*victory_rate:.0f}% "
f"< 50% — MCTS is stalling on the huge map"
)
if len(winner_clans_seen) < 2 and victory_count > 0:
reasons.append(
f"only {len(winner_clans_seen)} distinct clan(s) won across {victory_count} victories "
f"({sorted(winner_clans_seen)}); winner distribution is degenerate"
)
median_turn = statistics.median(turns) if turns else 0
# If turn_limit isn't explicitly known, infer from the max final turn
# across games (games that hit the cap will all land at the same turn).
# Threshold: median >= 40% of the inferred cap.
inferred_cap = max(turn_limit_seen, 100)
min_median = int(inferred_cap * 0.4)
if median_turn < min_median:
reasons.append(
f"median game length {median_turn} < {min_median} ({inferred_cap} * 40%); "
f"games snap-ending — huge map not being used"
)
tally = _tally_appearances(results)
clans_out: dict[str, dict[str, Any]] = {}
for clan in sorted(set(tally) | set(KNOWN_CLAN_IDS)):
entry = tally.get(clan, {"wins": 0, "appearances": 0})
apps = entry["appearances"]
wins = entry["wins"]
pct = (100 * wins / apps) if apps else 0.0
clans_out[clan] = {
"appearances": apps,
"wins": wins,
"losses": apps - wins,
"win_rate_pct": round(pct, 2),
}
return {
"pass": not reasons,
"sample_size": sample,
"reasons": reasons,
"clans": clans_out,
"victory_count": victory_count,
"distinct_winners": sorted(winner_clans_seen),
"median_turn": median_turn,
"turn_limit_seen": turn_limit_seen,
}
def _run_matchup_balance_cli(argv: list[str]) -> int:
if len(argv) != 1:
print("usage: checklist-report.py matchup_balance <grid_dir>", file=sys.stderr)
return 2
parent = Path(argv[0])
if not parent.is_dir():
print(f"ERROR: {parent} is not a directory", file=sys.stderr)
return 2
results = _load_results_recursive(parent)
verdict = matchup_balance_verdict(results)
print(json.dumps(verdict, indent=2))
return 0 if verdict["pass"] else 1
def _run_ultimate_stress_cli(argv: list[str]) -> int:
if len(argv) != 1:
print("usage: checklist-report.py ultimate_stress <batch_dir>", file=sys.stderr)
return 2
batch = Path(argv[0])
if not batch.is_dir():
print(f"ERROR: {batch} is not a directory", file=sys.stderr)
return 2
# Ultimate-stress batches have games at top level (single flat autoplay-batch).
results = _load_batch_results(batch)
verdict = ultimate_stress_verdict(results)
print(json.dumps(verdict, indent=2))
return 0 if verdict["pass"] else 1
def _run_personality_win_balance_cli(argv: list[str]) -> int:
if len(argv) != 1:
print("usage: checklist-report.py personality_win_balance <batch_dir>", file=sys.stderr)
return 2
batch = Path(argv[0])
if not batch.is_dir():
print(f"ERROR: {batch} is not a directory", file=sys.stderr)
return 2
results = _load_batch_results(batch)
verdict = personality_win_balance_verdict(results)
print(json.dumps(verdict, indent=2))
return 0 if verdict["pass"] else 1
def _row(label, value, target, ok) -> str:
return f"| {label} | {value} | {target} | {'PASS' if ok else 'FAIL'} |"
def main(argv: list[str]) -> int:
args = argv[1:]
# Subcommand: personality_win_balance — emit a JSON verdict for CI
if args and args[0] == "personality_win_balance":
return _run_personality_win_balance_cli(args[1:])
# Subcommand: matchup_balance — 10-pair grid verdict
if args and args[0] == "matchup_balance":
return _run_matchup_balance_cli(args[1:])
# Subcommand: ultimate_stress — huge-map 5-clan stress verdict
if args and args[0] == "ultimate_stress":
return _run_ultimate_stress_cli(args[1:])
difficulty = "normal"
if args and args[0] == "--difficulty":
if len(args) < 2 or args[1] not in THRESHOLDS:
print(f"usage: checklist-report.py [--difficulty {'|'.join(THRESHOLDS)}] <batch_dir>", file=sys.stderr)
return 2
difficulty, args = args[1], args[2:]
if len(args) != 1:
print(f"usage: checklist-report.py [--difficulty {'|'.join(THRESHOLDS)}] <batch_dir>", file=sys.stderr)
return 2
pop_min, vic_lo, vic_hi, ttv_lo, ttv_hi, combats_min = THRESHOLDS[difficulty]
batch = Path(args[0])
if not batch.is_dir():
print(f"ERROR: {batch} is not a directory", file=sys.stderr); return 2
games = sorted(
[(int(d.name.rsplit("_seed", 1)[1]), d) for d in batch.iterdir()
if d.is_dir() and d.name.startswith("game_") and d.name.rsplit("_seed", 1)[-1].isdigit()]
)
if not games:
print(f"ERROR: no games under {batch}", file=sys.stderr); return 1
results = [(s, _collect(d)) for s, d in games]
n = len(results)
med = lambda k: statistics.median([r[k] for _, r in results])
vics = [r for _, r in results if r["outcome"] == "victory"]
vic_pct = 100 * len(vics) / n
med_ttv = statistics.median([r["turns"] for r in vics]) if vics else 0
imp_total = sum(r["imp_events"] for _, r in results)
loot_total = sum(r["loot_events"] for _, r in results)
gate_total = sum(r["gate_events"] for _, r in results)
both = sum(1 for _, r in results if r["both_p100"])
inv = sum(r["invariants"] for _, r in results)
errs = sum(r["script_errors"] for _, r in results)
pwb_ok, pwb_detail = personality_win_balance(results)
rows = [
f"# FULL 4X CHECKLIST — batch `{batch.name}` (difficulty: {difficulty})",
f"\n**Games:** {n} **Seeds:** {[s for s, _ in results]}\n",
"| Metric | Value | Target | Result |", "|---|---|---|---|",
"| **CORE** | | | |",
_row("pop_peak median", f"{med('pop_peak'):.0f}", f">={pop_min}", med("pop_peak") >= pop_min),
_row("victories", f"{len(vics)}/{n} ({vic_pct:.0f}%)", f"{vic_lo}-{vic_hi}%", vic_lo <= vic_pct <= vic_hi),
_row("median TTV", f"{med_ttv:.0f}" if vics else "n/a", f"{ttv_lo}-{ttv_hi}", (not vics) or ttv_lo <= med_ttv <= ttv_hi),
_row("median combats", f"{med('combats'):.0f}", f">={combats_min}", med("combats") >= combats_min),
_row("median p0_tiles", f"{med('p0_tiles'):.0f}", ">=20", med("p0_tiles") >= 20),
_row("median p0_techs", f"{med('p0_techs'):.0f}", ">=20", med("p0_techs") >= 20),
"| **SYSTEMS** | | | |",
_row("strategic resources gate", f"{gate_total} rejections", ">=1", gate_total >= 1),
_row("luxury happiness varies", f"min distinct={min(r['happy_distinct'] for _, r in results)}",
">=3 distinct/seed", all(r["happy_distinct"] >= 3 for _, r in results)),
_row("improvement_built total", imp_total, ">=5", imp_total >= 5),
_row("loot_dropped total", loot_total, ">=1", loot_total >= 1),
_row("worker improvements/seed (min)", min(r["imp_events"] for _, r in results),
">=5/seed", all(r["imp_events"] >= 5 for _, r in results)),
"| **QUALITY** | | | |",
_row("both players pop>=5 mil>=4 by T100", f"{both}/{n} seeds", ">=2 seeds", both >= 2),
_row("invariant violations", inv, "0", inv == 0),
_row("SCRIPT ERRORs in logs", errs, "0", errs == 0),
_row("personality_win_balance", pwb_detail, "no clan >50%", pwb_ok),
]
passes = sum(1 for r in rows if r.endswith("PASS |"))
fails = sum(1 for r in rows if r.endswith("FAIL |"))
rows.append(f"\n**Pass: {passes} Fail: {fails}**\n")
rows.append("## Per-seed detail\n")
rows.append("| Seed | Outcome | Turns | Pop | Combats | Techs | Tiles | Imp | Loot | BothP100 | Inv | Errs |")
rows.append("|---|---|---|---|---|---|---|---|---|---|---|---|")
for s, r in results:
rows.append(f"| {s} | {r['outcome']} | {r['turns']} | {r['pop_peak']} | {r['combats']} | "
f"{r['p0_techs']} | {r['p0_tiles']} | {r['imp_events']} | {r['loot_events']} | "
f"{r['both_p100']} | {r['invariants']} | {r['script_errors']} |")
print("\n".join(rows))
return 0 if fails == 0 else 1
if __name__ == "__main__":
sys.exit(main(sys.argv))