438 lines
14 KiB
Python
Executable file
438 lines
14 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Aggregate auto_play batch results into a CSV + summary + assertions.
|
|
|
|
Reads all game_<stamp>_seed<N>/ directories under <results_dir>.
|
|
Pulls the last line of turn_stats.jsonl as the final game state (fast path).
|
|
Counts events from events.jsonl.
|
|
Optionally reads .save files with --deep.
|
|
|
|
Usage:
|
|
tools/autoplay-report.py <results_dir> [--baseline PATH] [--update-baseline] [--deep]
|
|
|
|
Exits:
|
|
0 all games parsed, validated, and assertions passed
|
|
1 schema validation failure OR assertion failure OR missing results
|
|
2 usage error
|
|
|
|
stdlib only — no pip installs.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import json
|
|
import statistics
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import importlib.util as _iu
|
|
|
|
_validate_path = Path(__file__).resolve().parent / "autoplay-validate.py"
|
|
_spec = _iu.spec_from_file_location("autoplay_validate", _validate_path)
|
|
if _spec is None or _spec.loader is None:
|
|
raise ImportError(f"cannot load {_validate_path}")
|
|
_mod = _iu.module_from_spec(_spec)
|
|
_spec.loader.exec_module(_mod)
|
|
load_schema = _mod.load_schema
|
|
validate = _mod.validate
|
|
|
|
TURN_STATS_SCHEMA_NAME = "turn-stats-line"
|
|
EVENTS_SCHEMA_NAME = "events-line"
|
|
META_SCHEMA_NAME = "meta"
|
|
|
|
EVENT_TYPES = [
|
|
"city_founded", "city_captured", "city_grew", "city_starved",
|
|
"tech_researched", "unit_created", "unit_destroyed", "combat_resolved", "victory",
|
|
]
|
|
|
|
|
|
def find_game_dirs(results_dir: Path) -> tuple[list[tuple[int, Path]], list[int]]:
|
|
"""Find game_<stamp>_seed<N>/ directories. Returns (found, missing_seeds).
|
|
|
|
For each seed number, picks the most recent directory (lexicographic max on stamp).
|
|
"""
|
|
by_seed: dict[int, list[Path]] = {}
|
|
for d in results_dir.iterdir():
|
|
if not d.is_dir():
|
|
continue
|
|
name = d.name
|
|
if not name.startswith("game_"):
|
|
continue
|
|
# Expected: game_<stamp>_seed<N>
|
|
parts = name.rsplit("_seed", 1)
|
|
if len(parts) != 2 or not parts[1].isdigit():
|
|
continue
|
|
seed = int(parts[1])
|
|
by_seed.setdefault(seed, []).append(d)
|
|
|
|
found: list[tuple[int, Path]] = []
|
|
for seed in sorted(by_seed):
|
|
dirs = sorted(by_seed[seed])
|
|
found.append((seed, dirs[-1]))
|
|
|
|
# We report missing only if there are gaps in the seed sequence
|
|
if not found:
|
|
return [], []
|
|
max_seed = max(s for s, _ in found)
|
|
present = {s for s, _ in found}
|
|
missing = [s for s in range(1, max_seed + 1) if s not in present]
|
|
return found, missing
|
|
|
|
|
|
def _read_last_jsonl_line(path: Path) -> str | None:
|
|
"""Read the last non-empty line of a JSONL file efficiently."""
|
|
try:
|
|
text = path.read_text()
|
|
except OSError:
|
|
return None
|
|
for line in reversed(text.splitlines()):
|
|
line = line.strip()
|
|
if line:
|
|
return line
|
|
return None
|
|
|
|
|
|
def _count_jsonl_lines(path: Path) -> int:
|
|
"""Count non-empty lines in a JSONL file."""
|
|
try:
|
|
text = path.read_text()
|
|
except OSError:
|
|
return 0
|
|
return sum(1 for l in text.splitlines() if l.strip())
|
|
|
|
|
|
def _count_events_by_type(path: Path) -> dict[str, int]:
|
|
"""Read events.jsonl, count occurrences per event type."""
|
|
counts: dict[str, int] = {}
|
|
try:
|
|
text = path.read_text()
|
|
except OSError:
|
|
return counts
|
|
for raw in text.splitlines():
|
|
raw = raw.strip()
|
|
if not raw:
|
|
continue
|
|
try:
|
|
obj = json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
t = obj.get("type", "<unknown>")
|
|
counts[t] = counts.get(t, 0) + 1
|
|
return counts
|
|
|
|
|
|
KNOWN_CLAN_IDS = ["blackhammer", "deepforge", "goldvein", "ironhold", "runesmith"]
|
|
|
|
AGGREGATE_FIELDS = [
|
|
"total_combats",
|
|
"total_cities_founded",
|
|
"total_cities_captured",
|
|
"turn_first_combat",
|
|
"turn_first_city_captured",
|
|
]
|
|
|
|
PLAYER_FIELDS = [
|
|
"pop", "pop_peak", "mil",
|
|
"cities", "cities_captured", "cities_lost",
|
|
"gold", "gold_peak", "gold_per_turn",
|
|
"techs", "tiles", "buildings",
|
|
"happiness",
|
|
"food_total", "production_total",
|
|
"kills", "units_lost",
|
|
"turn_first_pop_3", "turn_first_pop_4",
|
|
]
|
|
|
|
|
|
def extract_row(
|
|
seed: int, data: dict[str, Any], event_counts: dict[str, int]
|
|
) -> dict[str, Any]:
|
|
# turn-stats-line uses "turn" not "turns_played"
|
|
turn = data.get("turn", data.get("turns_played", -1))
|
|
total_events = sum(event_counts.values())
|
|
row: dict[str, Any] = {
|
|
"seed": seed,
|
|
"outcome": data["outcome"],
|
|
"turns_played": turn,
|
|
"winner_index": data["winner_index"],
|
|
"winner_personality": data.get("winner_personality", ""),
|
|
"victory_type": data["victory_type"],
|
|
"wall_clock_sec": round(float(data["wall_clock_sec"]), 2),
|
|
"event_count": total_events,
|
|
}
|
|
for et in EVENT_TYPES:
|
|
row[f"evt_{et}"] = event_counts.get(et, 0)
|
|
for f in AGGREGATE_FIELDS:
|
|
row[f"agg_{f}"] = data["aggregate"][f]
|
|
player_stats: dict[str, Any] = data["player_stats"]
|
|
for pid in ("0", "1"):
|
|
pstat = player_stats.get(pid, {})
|
|
for f in PLAYER_FIELDS:
|
|
row[f"p{pid}_{f}"] = pstat.get(f, "")
|
|
row["invariant_violations"] = len(data["invariant_violations"])
|
|
return row
|
|
|
|
|
|
def csv_fieldnames() -> list[str]:
|
|
fields = [
|
|
"seed", "outcome", "turns_played", "winner_index", "winner_personality",
|
|
"victory_type", "wall_clock_sec", "event_count",
|
|
]
|
|
fields += [f"evt_{et}" for et in EVENT_TYPES]
|
|
fields += [f"agg_{f}" for f in AGGREGATE_FIELDS]
|
|
for pid in ("0", "1"):
|
|
fields += [f"p{pid}_{f}" for f in PLAYER_FIELDS]
|
|
fields.append("invariant_violations")
|
|
return fields
|
|
|
|
|
|
VALID_OUTCOMES = {"victory", "max_turns", "defeat", "in_progress"}
|
|
|
|
|
|
def run_assertions(
|
|
rows: list[dict[str, Any]],
|
|
missing_seeds: list[int],
|
|
schema_errors: dict[Path, list[str]],
|
|
) -> list[str]:
|
|
failures: list[str] = []
|
|
|
|
if missing_seeds:
|
|
failures.append(f"Missing game directories for seeds: {missing_seeds}")
|
|
|
|
if schema_errors:
|
|
for path, errs in schema_errors.items():
|
|
failures.append(f"Schema validation failed for {path}:")
|
|
for e in errs[:5]:
|
|
failures.append(f" {e}")
|
|
if len(errs) > 5:
|
|
failures.append(f" ... ({len(errs) - 5} more)")
|
|
|
|
if not rows:
|
|
failures.append("No valid result rows to analyze.")
|
|
return failures
|
|
|
|
bad_outcomes = [r for r in rows if r["outcome"] not in VALID_OUTCOMES]
|
|
if bad_outcomes:
|
|
failures.append(f"{len(bad_outcomes)} game(s) had invalid outcome values")
|
|
|
|
total_violations = sum(r["invariant_violations"] for r in rows)
|
|
if total_violations > 0:
|
|
failures.append(f"Total invariant violations across games: {total_violations}")
|
|
|
|
max_p0_pop = max((r["p0_pop_peak"] for r in rows if r["p0_pop_peak"] != ""), default=0)
|
|
if max_p0_pop < 4:
|
|
failures.append(
|
|
f"No game reached p0_pop_peak >= 4 (max was {max_p0_pop}). "
|
|
"Growth system may be broken."
|
|
)
|
|
|
|
never_combat = [r for r in rows if r["agg_turn_first_combat"] == -1]
|
|
if never_combat:
|
|
failures.append(
|
|
f"{len(never_combat)} game(s) never fought a single combat — "
|
|
"AI may be pacifist or unreachable."
|
|
)
|
|
|
|
no_turns = [r for r in rows if r["turns_played"] < 1]
|
|
if no_turns:
|
|
failures.append(
|
|
f"{len(no_turns)} game(s) have turns_played < 1 — "
|
|
"game may have crashed before completing a turn."
|
|
)
|
|
|
|
return failures
|
|
|
|
|
|
def median_int(values: list[int | float]) -> int:
|
|
filtered = [v for v in values if isinstance(v, (int, float))]
|
|
if not filtered:
|
|
return -1
|
|
return int(statistics.median(filtered))
|
|
|
|
|
|
def build_personality_win_table(rows: list[dict[str, Any]]) -> dict[str, dict[str, int]]:
|
|
"""Return per-clan stats: clan_id → {wins, appearances, losses}.
|
|
|
|
appearances = number of games in which this clan was the AI opponent.
|
|
wins = games won by this clan.
|
|
losses = appearances - wins.
|
|
Uses winner_personality from each row's final turn-stats line.
|
|
Human player (empty clan_id / winner_index 0 with no personality) is
|
|
bucketed under the empty string key and excluded from the balance check.
|
|
"""
|
|
stats: dict[str, dict[str, int]] = {}
|
|
for row in rows:
|
|
personality = row.get("winner_personality", "")
|
|
# Count only AI opponents (non-empty clan)
|
|
if not personality:
|
|
continue
|
|
if personality not in stats:
|
|
stats[personality] = {"wins": 0, "appearances": 0}
|
|
if row["outcome"] == "victory":
|
|
stats[personality]["wins"] += 1
|
|
stats[personality]["appearances"] += 1
|
|
for clan_stats in stats.values():
|
|
clan_stats["losses"] = clan_stats["appearances"] - clan_stats["wins"]
|
|
return stats
|
|
|
|
|
|
def print_personality_summary(
|
|
rows: list[dict[str, Any]], out: Any = sys.stderr
|
|
) -> None:
|
|
table = build_personality_win_table(rows)
|
|
if not table:
|
|
print("personality win-rate: no data (winner_personality missing from results)", file=out)
|
|
return
|
|
total_games = len(rows)
|
|
print("personality win-rate:", file=out)
|
|
print(f" {'clan':<14} {'wins':>5} {'apps':>5} {'win%':>6}", file=out)
|
|
for clan in sorted(table):
|
|
entry = table[clan]
|
|
apps = entry["appearances"]
|
|
wins = entry["wins"]
|
|
pct = 100 * wins // apps if apps else 0
|
|
flag = " <-- IMBALANCED (>50%)" if pct > 50 else ""
|
|
print(f" {clan:<14} {wins:>5} {apps:>5} {pct:>5}%{flag}", file=out)
|
|
|
|
|
|
def print_summary(rows: list[dict[str, Any]], out: Any = sys.stderr) -> None:
|
|
print("=== autoplay batch report ===", file=out)
|
|
print(f"games: {len(rows)}", file=out)
|
|
counts: dict[str, int] = {}
|
|
for r in rows:
|
|
counts[r["outcome"]] = counts.get(r["outcome"], 0) + 1
|
|
for k, v in sorted(counts.items()):
|
|
pct = 100 * v // len(rows) if rows else 0
|
|
print(f" {k}: {v} ({pct}%)", file=out)
|
|
if rows:
|
|
print(
|
|
f"median turns_played: {median_int([r['turns_played'] for r in rows])}",
|
|
file=out,
|
|
)
|
|
print(
|
|
f"median p0_pop_peak: {median_int([r['p0_pop_peak'] for r in rows])}",
|
|
file=out,
|
|
)
|
|
print(
|
|
f"median p0_gold_peak: {median_int([r['p0_gold_peak'] for r in rows])}",
|
|
file=out,
|
|
)
|
|
print(
|
|
f"median agg_total_combats: {median_int([r['agg_total_combats'] for r in rows])}",
|
|
file=out,
|
|
)
|
|
print(
|
|
f"median event_count: {median_int([r['event_count'] for r in rows])}",
|
|
file=out,
|
|
)
|
|
print("event counts by type (total across all games):", file=out)
|
|
for et in EVENT_TYPES:
|
|
total = sum(r.get(f"evt_{et}", 0) for r in rows)
|
|
if total > 0:
|
|
print(f" {et}: {total}", file=out)
|
|
total_v = sum(r["invariant_violations"] for r in rows)
|
|
print(f"invariant violations (total): {total_v}", file=out)
|
|
print_personality_summary(rows, out=out)
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
positional: list[str] = []
|
|
flags: set[str] = set()
|
|
i = 1
|
|
while i < len(argv):
|
|
a = argv[i]
|
|
if a.startswith("-"):
|
|
flags.add(a)
|
|
else:
|
|
positional.append(a)
|
|
i += 1
|
|
|
|
if not positional:
|
|
print(
|
|
"usage: autoplay-report.py <results_dir> [--baseline PATH] [--update-baseline] [--deep]",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
|
|
results_dir = Path(positional[0])
|
|
deep = "--deep" in flags
|
|
|
|
if not results_dir.is_dir():
|
|
print(f"ERROR: {results_dir} is not a directory", file=sys.stderr)
|
|
return 2
|
|
|
|
found, missing = find_game_dirs(results_dir)
|
|
if not found and not missing:
|
|
print(f"ERROR: No game_*_seed*/ dirs found under {results_dir}", file=sys.stderr)
|
|
return 1
|
|
|
|
ts_schema = load_schema(TURN_STATS_SCHEMA_NAME)
|
|
meta_schema = load_schema(META_SCHEMA_NAME)
|
|
rows: list[dict[str, Any]] = []
|
|
schema_errors: dict[Path, list[str]] = {}
|
|
|
|
for seed, game_dir in found:
|
|
meta_path = game_dir / "meta.json"
|
|
turn_stats_path = game_dir / "turn_stats.jsonl"
|
|
events_path = game_dir / "events.jsonl"
|
|
|
|
# Validate meta.json
|
|
if not meta_path.exists():
|
|
schema_errors[meta_path] = ["meta.json missing"]
|
|
else:
|
|
try:
|
|
meta_data = json.loads(meta_path.read_text())
|
|
meta_errs = validate(meta_data, meta_schema)
|
|
if meta_errs:
|
|
schema_errors[meta_path] = meta_errs
|
|
except (OSError, json.JSONDecodeError) as e:
|
|
schema_errors[meta_path] = [f"cannot load meta.json: {e}"]
|
|
|
|
# Fast path: read only the last line of turn_stats.jsonl
|
|
last_line = _read_last_jsonl_line(turn_stats_path)
|
|
if last_line is None:
|
|
schema_errors[turn_stats_path] = ["turn_stats.jsonl missing or empty"]
|
|
continue
|
|
|
|
try:
|
|
data = json.loads(last_line)
|
|
except json.JSONDecodeError as e:
|
|
schema_errors[turn_stats_path] = [f"last line invalid JSON: {e}"]
|
|
continue
|
|
|
|
errs = validate(data, ts_schema)
|
|
if errs:
|
|
schema_errors[turn_stats_path] = errs
|
|
continue
|
|
|
|
event_counts = _count_events_by_type(events_path) if events_path.exists() else {}
|
|
rows.append(extract_row(seed, data, event_counts))
|
|
|
|
if deep:
|
|
# Read .save files only with --deep
|
|
for save_file in sorted(game_dir.glob("*.save")):
|
|
print(f"[deep] {save_file.name}: {save_file.stat().st_size} bytes", file=sys.stderr)
|
|
|
|
# CSV to stdout
|
|
writer = csv.DictWriter(sys.stdout, fieldnames=csv_fieldnames())
|
|
writer.writeheader()
|
|
for r in rows:
|
|
writer.writerow(r)
|
|
|
|
print_summary(rows)
|
|
failures = run_assertions(rows, missing, schema_errors)
|
|
if failures:
|
|
print("\n=== FAILURES ===", file=sys.stderr)
|
|
for f in failures:
|
|
print(f" {f}", file=sys.stderr)
|
|
return 1
|
|
|
|
if "--update-baseline" in flags:
|
|
print("--update-baseline: not yet implemented (Phase 3b)", file=sys.stderr)
|
|
|
|
print("\nAll assertions passed.", file=sys.stderr)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main(sys.argv))
|