feat(autoplay): Add batch autoplay processing, report generator, result schema, and validator

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
autocommit 2026-04-15 07:37:01 -07:00
parent 0bbb4df2ed
commit 2e4ff5e96f
4 changed files with 740 additions and 203 deletions

View file

@ -1,8 +1,17 @@
#!/usr/bin/env bash
# autoplay-batch.sh — Run auto_play N times with different seeds and collect result JSON files.
# autoplay-batch.sh — Run auto_play N times with different seeds and collect per-game output dirs.
#
# Usage: tools/autoplay-batch.sh [count=3] [turn_limit=500] [results_dir=/tmp/autoplay_batch]
#
# Output layout:
# <results_dir>/game_<stamp>_seed<N>/
# meta.json
# turn_stats.jsonl
# events.jsonl
# game.log
# weston.log (local only)
# *.save (per-turn saves, if configured)
#
# Environment:
# AUTOPLAY_HOST — If set (e.g. "lilith@apricot.local"), run each game via SSH using
# /tmp/run_ap3.sh on the remote host and scp results back.
@ -39,9 +48,12 @@ fi
mkdir -p "$RESULTS_DIR"
STAMP="$(date +%Y%m%d_%H%M%S)"
echo "============================================================"
echo "Autoplay Batch: $COUNT games, turn_limit=$TURN_LIMIT"
echo "Results: $RESULTS_DIR"
echo "Stamp: $STAMP"
if [ -n "$AUTOPLAY_HOST" ]; then
echo "Mode: remote SSH ($AUTOPLAY_HOST)"
else
@ -51,7 +63,6 @@ echo "Safety timeout: ${SAFETY_TIMEOUT}s per game"
echo "============================================================"
_kill_stale_procs() {
# Kill stale weston/godot from previous runs (local only)
pkill -f "weston.*godot-headless" 2>/dev/null || true
pkill -f "org.godotengine.Godot" 2>/dev/null || true
sleep 0.5
@ -59,7 +70,7 @@ _kill_stale_procs() {
_run_local() {
local seed="$1"
local seed_dir="$2"
local game_dir="$2"
if ! command -v flatpak >/dev/null 2>&1; then
echo "ERROR: flatpak not installed. Set AUTOPLAY_HOST to run on a remote Linux host." >&2
@ -71,7 +82,7 @@ _run_local() {
echo "[seed $seed] Starting weston (headless)..."
WESTON_SOCKET="godot-headless-$$"
weston --backend=headless --socket="$WESTON_SOCKET" --width=1920 --height=1080 \
>"$seed_dir/weston.log" 2>&1 &
>"$game_dir/weston.log" 2>&1 &
WESTON_PID=$!
sleep 1
@ -84,12 +95,12 @@ _run_local() {
--env=AUTO_PLAY=true \
--env=AUTO_PLAY_SEED="$seed" \
--env=AUTO_PLAY_TURN_LIMIT="$TURN_LIMIT" \
--env=AUTO_PLAY_DIR="$seed_dir" \
--env=AUTO_PLAY_DIR="$game_dir" \
org.godotengine.Godot \
--path "$GAME_DIR" \
--rendering-method gl_compatibility \
--headless \
>"$seed_dir/game.log" 2>&1 || {
>"$game_dir/game.log" 2>&1 || {
local exit_code=$?
echo "[seed $seed] Godot exited with code $exit_code" >&2
}
@ -100,17 +111,19 @@ _run_local() {
_run_remote() {
local seed="$1"
local seed_dir="$2"
local game_dir="$2"
echo "[seed $seed] Running via SSH on $AUTOPLAY_HOST..."
# Build a remote results dir that run_ap3.sh can write to (not /tmp — Flatpak sandbox)
local remote_seed_dir
remote_seed_dir="\$HOME/tmp/autoplay_batch/seed_${seed}"
# Resolve remote $HOME once so we don't fight quoting rules
if [ -z "${REMOTE_HOME:-}" ]; then
REMOTE_HOME="$(ssh "$AUTOPLAY_HOST" 'echo "$HOME"')"
fi
local remote_game_dir="$REMOTE_HOME/tmp/autoplay_batch/game_${STAMP}_seed${seed}"
ssh "$AUTOPLAY_HOST" "
set -euo pipefail
mkdir -p '$remote_seed_dir'
mkdir -p '$remote_game_dir'
if [ ! -f /tmp/run_ap3.sh ]; then
echo 'ERROR: /tmp/run_ap3.sh not found on $AUTOPLAY_HOST' >&2
exit 1
@ -118,14 +131,14 @@ _run_remote() {
AUTO_PLAY=true \
AUTO_PLAY_SEED='$seed' \
AUTO_PLAY_TURN_LIMIT='$TURN_LIMIT' \
AUTO_PLAY_DIR='$remote_seed_dir' \
bash /tmp/run_ap3.sh >'$remote_seed_dir/game.log' 2>&1
AUTO_PLAY_DIR='$remote_game_dir' \
bash /tmp/run_ap3.sh >'$remote_game_dir/game.log' 2>&1
" || {
echo "[seed $seed] SSH run exited with error — see $seed_dir/game.log after scp" >&2
echo "[seed $seed] SSH run exited with error — see $game_dir/game.log after scp" >&2
}
echo "[seed $seed] Fetching results from $AUTOPLAY_HOST..."
scp -r "$AUTOPLAY_HOST:\$HOME/tmp/autoplay_batch/seed_${seed}/." "$seed_dir/" \
scp -r "$AUTOPLAY_HOST:$remote_game_dir/." "$game_dir/" \
>/dev/null 2>&1 || {
echo "WARNING: scp failed for seed $seed — result may be missing" >&2
}
@ -136,26 +149,34 @@ _run_remote() {
FAILED_SEEDS=()
for seed in $(seq 1 "$COUNT"); do
seed_dir="$RESULTS_DIR/seed_${seed}"
mkdir -p "$seed_dir"
game_dir="$RESULTS_DIR/game_${STAMP}_seed${seed}"
mkdir -p "$game_dir"
echo ""
echo "[$(date +%H:%M:%S)] === Game $seed/$COUNT (seed=$seed) ==="
echo "[seed $seed] Output dir: $game_dir"
if [ -n "$AUTOPLAY_HOST" ]; then
_run_remote "$seed" "$seed_dir"
_run_remote "$seed" "$game_dir"
else
_run_local "$seed" "$seed_dir"
_run_local "$seed" "$game_dir"
fi
# Look for timestamped result file (result_<stamp>_seed<N>.json) or legacy (result_<N>.json)
result_file="$(ls -1 "$seed_dir"/result_*_seed${seed}.json 2>/dev/null | tail -n1)"
if [ -z "$result_file" ] && [ -f "$seed_dir/result_${seed}.json" ]; then
result_file="$seed_dir/result_${seed}.json"
fi
if [ -n "$result_file" ] && [ -f "$result_file" ]; then
echo "[seed $seed] OK — result written to $result_file"
# Check for meta.json + non-empty turn_stats.jsonl as canonical success indicators
meta_ok=false
stats_ok=false
[ -f "$game_dir/meta.json" ] && meta_ok=true
[ -f "$game_dir/turn_stats.jsonl" ] && [ -s "$game_dir/turn_stats.jsonl" ] && stats_ok=true
if $meta_ok && $stats_ok; then
line_count="$(wc -l < "$game_dir/turn_stats.jsonl" | tr -d ' ')"
echo "[seed $seed] OK — meta.json present, turn_stats.jsonl has $line_count line(s)"
else
echo "[seed $seed] MISSING result file (no result_*_seed${seed}.json found)" >&2
if ! $meta_ok; then
echo "[seed $seed] MISSING meta.json" >&2
fi
if ! $stats_ok; then
echo "[seed $seed] MISSING or empty turn_stats.jsonl (game may have crashed)" >&2
fi
FAILED_SEEDS+=("$seed")
fi
done
@ -165,13 +186,13 @@ done
echo ""
echo "============================================================"
PRODUCED=$(( COUNT - ${#FAILED_SEEDS[@]} ))
echo "Batch complete: $PRODUCED/$COUNT games produced result.json"
echo "Batch complete: $PRODUCED/$COUNT games produced turn_stats.jsonl"
echo "Results: $RESULTS_DIR"
echo "============================================================"
if [ ${#FAILED_SEEDS[@]} -gt 0 ]; then
echo "ERROR: Missing result.json for seeds: ${FAILED_SEEDS[*]}" >&2
echo " Check game.log in each seed dir for details." >&2
echo "ERROR: No turn_stats.jsonl for seeds: ${FAILED_SEEDS[*]}" >&2
echo " Check game.log in each game dir for details." >&2
exit 1
fi

View file

@ -1,229 +1,389 @@
#!/usr/bin/env python3
"""autoplay-report.py — Aggregate autoplay batch results into a CSV + summary report.
"""
Aggregate auto_play batch results into a CSV + summary + assertions.
Reads all game_<stamp>_seed<N>/ directories under <results_dir>.
Pulls the last line of turn_stats.jsonl as the final game state (fast path).
Counts events from events.jsonl.
Optionally reads .save files with --deep.
Usage:
tools/autoplay-report.py <results_dir> [--baseline PATH] [--update-baseline]
tools/autoplay-report.py <results_dir> [--baseline PATH] [--update-baseline] [--deep]
Reads all <results_dir>/seed_*/result_*.json files.
Emits CSV to stdout, summary to stderr.
Exits non-zero if any assertion fails.
Exits:
0 all games parsed, validated, and assertions passed
1 schema validation failure OR assertion failure OR missing results
2 usage error
stdlib only no pip installs.
"""
from __future__ import annotations
import argparse
import csv
import json
import os
import statistics
import sys
from pathlib import Path
from typing import Any
import importlib.util as _iu
def find_result_files(results_dir: Path) -> list[tuple[int, Path]]:
"""Return (seed, path) pairs for all result_*.json files, sorted by seed.
Matches filenames of the form result_<datetime>_seed<N>.json under
seed_<N>/ subdirectories. Picks the most recent (lexicographic max) if
multiple stamps exist for the same seed."""
results: list[tuple[int, Path]] = []
for seed_dir in sorted(results_dir.glob("seed_*")):
if not seed_dir.is_dir():
continue
seed_str = seed_dir.name.removeprefix("seed_")
if not seed_str.isdigit():
continue
seed = int(seed_str)
candidates = sorted(seed_dir.glob(f"result_*_seed{seed}.json"))
if not candidates:
# Fall back to legacy naming for backward compatibility
legacy = seed_dir / f"result_{seed}.json"
if legacy.exists():
candidates = [legacy]
if not candidates:
continue
results.append((seed, candidates[-1]))
return results
_validate_path = Path(__file__).resolve().parent / "autoplay-validate.py"
_spec = _iu.spec_from_file_location("autoplay_validate", _validate_path)
if _spec is None or _spec.loader is None:
raise ImportError(f"cannot load {_validate_path}")
_mod = _iu.module_from_spec(_spec)
_spec.loader.exec_module(_mod)
load_schema = _mod.load_schema
validate = _mod.validate
TURN_STATS_SCHEMA_NAME = "turn-stats-line"
EVENTS_SCHEMA_NAME = "events-line"
META_SCHEMA_NAME = "meta"
def parse_result(path: Path) -> dict[str, Any]:
with path.open() as f:
return json.load(f)
def extract_row(seed: int, data: dict[str, Any]) -> dict[str, Any]:
"""Extract CSV fields from a result JSON."""
players = data.get("players", [{}, {}])
p0 = players[0] if len(players) > 0 else {}
p1 = players[1] if len(players) > 1 else {}
def pstat(p: dict[str, Any], key: str) -> int:
return int(p.get(key, -1))
return {
"seed": seed,
"outcome": data.get("outcome", ""),
"turns": int(data.get("turns", data.get("final_turn", -1))),
"winner": int(data.get("winner", data.get("winner_index", -1))),
"p0_pop": pstat(p0, "pop"),
"p0_mil": pstat(p0, "mil"),
"p0_cities": pstat(p0, "cities"),
"p0_gold": pstat(p0, "gold"),
"p0_techs": pstat(p0, "techs"),
"p0_combats": pstat(p0, "combats"),
"p1_pop": pstat(p1, "pop"),
"p1_mil": pstat(p1, "mil"),
"p1_cities": pstat(p1, "cities"),
"p1_gold": pstat(p1, "gold"),
"p1_techs": pstat(p1, "techs"),
"p1_combats": pstat(p1, "combats"),
"invariants": int(data.get("invariant_violations", 0)),
}
CSV_FIELDS = [
"seed", "outcome", "turns", "winner",
"p0_pop", "p0_mil", "p0_cities", "p0_gold", "p0_techs", "p0_combats",
"p1_pop", "p1_mil", "p1_cities", "p1_gold", "p1_techs", "p1_combats",
"invariants",
EVENT_TYPES = [
"city_founded", "city_captured", "city_grew", "city_starved",
"tech_researched", "unit_created", "unit_destroyed", "combat_resolved", "victory",
]
VALID_OUTCOMES = {"victory", "max_turns", "defeat"}
def find_game_dirs(results_dir: Path) -> tuple[list[tuple[int, Path]], list[int]]:
"""Find game_<stamp>_seed<N>/ directories. Returns (found, missing_seeds).
For each seed number, picks the most recent directory (lexicographic max on stamp).
"""
by_seed: dict[int, list[Path]] = {}
for d in results_dir.iterdir():
if not d.is_dir():
continue
name = d.name
if not name.startswith("game_"):
continue
# Expected: game_<stamp>_seed<N>
parts = name.rsplit("_seed", 1)
if len(parts) != 2 or not parts[1].isdigit():
continue
seed = int(parts[1])
by_seed.setdefault(seed, []).append(d)
found: list[tuple[int, Path]] = []
for seed in sorted(by_seed):
dirs = sorted(by_seed[seed])
found.append((seed, dirs[-1]))
# We report missing only if there are gaps in the seed sequence
if not found:
return [], []
max_seed = max(s for s, _ in found)
present = {s for s, _ in found}
missing = [s for s in range(1, max_seed + 1) if s not in present]
return found, missing
def run_assertions(rows: list[dict[str, Any]], missing_seeds: list[int]) -> list[str]:
"""Return list of assertion failure messages (empty = all pass)."""
def _read_last_jsonl_line(path: Path) -> str | None:
"""Read the last non-empty line of a JSONL file efficiently."""
try:
text = path.read_text()
except OSError:
return None
for line in reversed(text.splitlines()):
line = line.strip()
if line:
return line
return None
def _count_jsonl_lines(path: Path) -> int:
"""Count non-empty lines in a JSONL file."""
try:
text = path.read_text()
except OSError:
return 0
return sum(1 for l in text.splitlines() if l.strip())
def _count_events_by_type(path: Path) -> dict[str, int]:
"""Read events.jsonl, count occurrences per event type."""
counts: dict[str, int] = {}
try:
text = path.read_text()
except OSError:
return counts
for raw in text.splitlines():
raw = raw.strip()
if not raw:
continue
try:
obj = json.loads(raw)
except json.JSONDecodeError:
continue
t = obj.get("type", "<unknown>")
counts[t] = counts.get(t, 0) + 1
return counts
AGGREGATE_FIELDS = [
"total_combats",
"total_cities_founded",
"total_cities_captured",
"turn_first_combat",
"turn_first_city_captured",
]
PLAYER_FIELDS = [
"pop", "pop_peak", "mil",
"cities", "cities_captured", "cities_lost",
"gold", "gold_peak", "gold_per_turn",
"techs", "tiles", "buildings",
"happiness",
"food_total", "production_total",
"kills", "units_lost",
"turn_first_pop_3", "turn_first_pop_4",
]
def extract_row(
seed: int, data: dict[str, Any], event_counts: dict[str, int]
) -> dict[str, Any]:
# turn-stats-line uses "turn" not "turns_played"
turn = data.get("turn", data.get("turns_played", -1))
total_events = sum(event_counts.values())
row: dict[str, Any] = {
"seed": seed,
"outcome": data["outcome"],
"turns_played": turn,
"winner_index": data["winner_index"],
"victory_type": data["victory_type"],
"wall_clock_sec": round(float(data["wall_clock_sec"]), 2),
"event_count": total_events,
}
for et in EVENT_TYPES:
row[f"evt_{et}"] = event_counts.get(et, 0)
for f in AGGREGATE_FIELDS:
row[f"agg_{f}"] = data["aggregate"][f]
player_stats: dict[str, Any] = data["player_stats"]
for pid in ("0", "1"):
pstat = player_stats.get(pid, {})
for f in PLAYER_FIELDS:
row[f"p{pid}_{f}"] = pstat.get(f, "")
row["invariant_violations"] = len(data["invariant_violations"])
return row
def csv_fieldnames() -> list[str]:
fields = [
"seed", "outcome", "turns_played", "winner_index",
"victory_type", "wall_clock_sec", "event_count",
]
fields += [f"evt_{et}" for et in EVENT_TYPES]
fields += [f"agg_{f}" for f in AGGREGATE_FIELDS]
for pid in ("0", "1"):
fields += [f"p{pid}_{f}" for f in PLAYER_FIELDS]
fields.append("invariant_violations")
return fields
VALID_OUTCOMES = {"victory", "max_turns", "defeat", "in_progress"}
def run_assertions(
rows: list[dict[str, Any]],
missing_seeds: list[int],
schema_errors: dict[Path, list[str]],
) -> list[str]:
failures: list[str] = []
if missing_seeds:
failures.append(
f"Missing result.json for seeds: {missing_seeds}"
"Task 1 (AUTO_PLAY_SEED + JSON writer) may not be complete yet"
)
failures.append(f"Missing game directories for seeds: {missing_seeds}")
for row in rows:
outcome = row["outcome"]
if outcome not in VALID_OUTCOMES:
failures.append(
f"Seed {row['seed']}: invalid outcome '{outcome}' "
f"(expected one of {sorted(VALID_OUTCOMES)})"
)
if schema_errors:
for path, errs in schema_errors.items():
failures.append(f"Schema validation failed for {path}:")
for e in errs[:5]:
failures.append(f" {e}")
if len(errs) > 5:
failures.append(f" ... ({len(errs) - 5} more)")
total_violations = sum(r["invariants"] for r in rows)
if not rows:
failures.append("No valid result rows to analyze.")
return failures
bad_outcomes = [r for r in rows if r["outcome"] not in VALID_OUTCOMES]
if bad_outcomes:
failures.append(f"{len(bad_outcomes)} game(s) had invalid outcome values")
total_violations = sum(r["invariant_violations"] for r in rows)
if total_violations > 0:
per_seed = {r["seed"]: r["invariants"] for r in rows if r["invariants"] > 0}
failures.append(f"Total invariant violations across games: {total_violations}")
max_p0_pop = max((r["p0_pop_peak"] for r in rows if r["p0_pop_peak"] != ""), default=0)
if max_p0_pop < 4:
failures.append(
f"Invariant violations detected: {total_violations} total, "
f"by seed: {per_seed}"
f"No game reached p0_pop_peak >= 4 (max was {max_p0_pop}). "
"Growth system may be broken."
)
if rows:
max_p0_pop = max(r["p0_pop"] for r in rows)
if max_p0_pop < 4:
failures.append(
f"Sanity check failed: no game reached p0 pop >= 4 "
f"(max was {max_p0_pop}). Growth system may be broken."
)
never_combat = [r for r in rows if r["agg_turn_first_combat"] == -1]
if never_combat:
failures.append(
f"{len(never_combat)} game(s) never fought a single combat — "
"AI may be pacifist or unreachable."
)
no_turns = [r for r in rows if r["turns_played"] < 1]
if no_turns:
failures.append(
f"{len(no_turns)} game(s) have turns_played < 1 — "
"game may have crashed before completing a turn."
)
return failures
def median_int(values: list[int]) -> int:
if not values:
def median_int(values: list[int | float]) -> int:
filtered = [v for v in values if isinstance(v, (int, float))]
if not filtered:
return -1
valid = [v for v in values if v >= 0]
if not valid:
return -1
return int(statistics.median(valid))
return int(statistics.median(filtered))
def write_baseline_stub(path: Path) -> None:
stub = {
"_note": "Baseline stub — Phase 3b will populate this with real thresholds.",
"version": 1,
"thresholds": {},
}
path.write_text(json.dumps(stub, indent=2) + "\n")
print(f"Baseline stub written to {path}", file=sys.stderr)
def print_summary(rows: list[dict[str, Any]], out: Any = sys.stderr) -> None:
print("=== autoplay batch report ===", file=out)
print(f"games: {len(rows)}", file=out)
counts: dict[str, int] = {}
for r in rows:
counts[r["outcome"]] = counts.get(r["outcome"], 0) + 1
for k, v in sorted(counts.items()):
pct = 100 * v // len(rows) if rows else 0
print(f" {k}: {v} ({pct}%)", file=out)
if rows:
print(
f"median turns_played: {median_int([r['turns_played'] for r in rows])}",
file=out,
)
print(
f"median p0_pop_peak: {median_int([r['p0_pop_peak'] for r in rows])}",
file=out,
)
print(
f"median p0_gold_peak: {median_int([r['p0_gold_peak'] for r in rows])}",
file=out,
)
print(
f"median agg_total_combats: {median_int([r['agg_total_combats'] for r in rows])}",
file=out,
)
print(
f"median event_count: {median_int([r['event_count'] for r in rows])}",
file=out,
)
print("event counts by type (total across all games):", file=out)
for et in EVENT_TYPES:
total = sum(r.get(f"evt_{et}", 0) for r in rows)
if total > 0:
print(f" {et}: {total}", file=out)
total_v = sum(r["invariant_violations"] for r in rows)
print(f"invariant violations (total): {total_v}", file=out)
def main() -> None:
parser = argparse.ArgumentParser(
description="Aggregate autoplay batch results into CSV + summary."
)
parser.add_argument("results_dir", type=Path, help="Directory containing seed_* subdirs")
parser.add_argument("--baseline", type=Path, default=None, help="Baseline JSON for comparison (not yet implemented)")
parser.add_argument("--update-baseline", action="store_true", help="Write/update baseline file (stub only for now)")
args = parser.parse_args()
def main(argv: list[str]) -> int:
positional: list[str] = []
flags: set[str] = set()
i = 1
while i < len(argv):
a = argv[i]
if a.startswith("-"):
flags.add(a)
else:
positional.append(a)
i += 1
if not positional:
print(
"usage: autoplay-report.py <results_dir> [--baseline PATH] [--update-baseline] [--deep]",
file=sys.stderr,
)
return 2
results_dir = Path(positional[0])
deep = "--deep" in flags
results_dir: Path = args.results_dir
if not results_dir.is_dir():
print(f"ERROR: results_dir not found: {results_dir}", file=sys.stderr)
sys.exit(1)
print(f"ERROR: {results_dir} is not a directory", file=sys.stderr)
return 2
seed_entries = find_result_files(results_dir)
if not seed_entries:
print(f"ERROR: No seed_*/result_*.json files found in {results_dir}", file=sys.stderr)
sys.exit(1)
found, missing = find_game_dirs(results_dir)
if not found and not missing:
print(f"ERROR: No game_*_seed*/ dirs found under {results_dir}", file=sys.stderr)
return 1
ts_schema = load_schema(TURN_STATS_SCHEMA_NAME)
meta_schema = load_schema(META_SCHEMA_NAME)
rows: list[dict[str, Any]] = []
parse_failures: list[str] = []
missing_seeds: list[int] = []
schema_errors: dict[Path, list[str]] = {}
for seed, result_path in seed_entries:
if not result_path.exists():
missing_seeds.append(seed)
for seed, game_dir in found:
meta_path = game_dir / "meta.json"
turn_stats_path = game_dir / "turn_stats.jsonl"
events_path = game_dir / "events.jsonl"
# Validate meta.json
if not meta_path.exists():
schema_errors[meta_path] = ["meta.json missing"]
else:
try:
meta_data = json.loads(meta_path.read_text())
meta_errs = validate(meta_data, meta_schema)
if meta_errs:
schema_errors[meta_path] = meta_errs
except (OSError, json.JSONDecodeError) as e:
schema_errors[meta_path] = [f"cannot load meta.json: {e}"]
# Fast path: read only the last line of turn_stats.jsonl
last_line = _read_last_jsonl_line(turn_stats_path)
if last_line is None:
schema_errors[turn_stats_path] = ["turn_stats.jsonl missing or empty"]
continue
try:
data = parse_result(result_path)
rows.append(extract_row(seed, data))
except (json.JSONDecodeError, KeyError) as exc:
parse_failures.append(f"Seed {seed}: failed to parse {result_path}: {exc}")
data = json.loads(last_line)
except json.JSONDecodeError as e:
schema_errors[turn_stats_path] = [f"last line invalid JSON: {e}"]
continue
# CSV header + rows to stdout
print(",".join(CSV_FIELDS))
for row in rows:
print(",".join(str(row[f]) for f in CSV_FIELDS))
errs = validate(data, ts_schema)
if errs:
schema_errors[turn_stats_path] = errs
continue
# Summary to stderr
n_games = len(rows)
n_victories = sum(1 for r in rows if r["outcome"] == "victory")
n_max_turns = sum(1 for r in rows if r["outcome"] == "max_turns")
turns_list = [r["turns"] for r in rows]
p0_pop_list = [r["p0_pop"] for r in rows]
p0_combats_list = [r["p0_combats"] for r in rows]
total_violations = sum(r["invariants"] for r in rows)
event_counts = _count_events_by_type(events_path) if events_path.exists() else {}
rows.append(extract_row(seed, data, event_counts))
pct = lambda n: f"{round(100 * n / n_games)}%" if n_games > 0 else "n/a"
if deep:
# Read .save files only with --deep
for save_file in sorted(game_dir.glob("*.save")):
print(f"[deep] {save_file.name}: {save_file.stat().st_size} bytes", file=sys.stderr)
print("", file=sys.stderr)
print("=== autoplay batch report ===", file=sys.stderr)
print(f"games: {n_games}", file=sys.stderr)
print(f"victories: {n_victories} ({pct(n_victories)})", file=sys.stderr)
print(f"max_turns: {n_max_turns} ({pct(n_max_turns)})", file=sys.stderr)
print(f"median turns: {median_int(turns_list)}", file=sys.stderr)
print(f"median p0_pop_final: {median_int(p0_pop_list)}", file=sys.stderr)
print(f"median p0_combats: {median_int(p0_combats_list)}", file=sys.stderr)
print(f"invariant violations (total): {total_violations}", file=sys.stderr)
if missing_seeds:
print(f"missing result.json: seeds {missing_seeds}", file=sys.stderr)
# CSV to stdout
writer = csv.DictWriter(sys.stdout, fieldnames=csv_fieldnames())
writer.writeheader()
for r in rows:
writer.writerow(r)
# Assertions
all_failures = parse_failures + run_assertions(rows, missing_seeds)
print_summary(rows)
failures = run_assertions(rows, missing, schema_errors)
if failures:
print("\n=== FAILURES ===", file=sys.stderr)
for f in failures:
print(f" {f}", file=sys.stderr)
return 1
if args.update_baseline:
baseline_path = args.baseline or results_dir / "baseline.json"
write_baseline_stub(baseline_path)
if "--update-baseline" in flags:
print("--update-baseline: not yet implemented (Phase 3b)", file=sys.stderr)
if all_failures:
print("", file=sys.stderr)
print("=== ASSERTION FAILURES ===", file=sys.stderr)
for msg in all_failures:
print(f" FAIL: {msg}", file=sys.stderr)
sys.exit(1)
print("", file=sys.stderr)
print("All assertions passed.", file=sys.stderr)
print("\nAll assertions passed.", file=sys.stderr)
return 0
if __name__ == "__main__":
main()
sys.exit(main(sys.argv))

View file

@ -0,0 +1,98 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://magic-civilization/schemas/autoplay-result.json",
"title": "AutoPlay Result",
"description": "Per-game result file emitted by src/game/engine/scenes/tests/auto_play.gd every turn and at exit.",
"type": "object",
"additionalProperties": false,
"required": [
"seed",
"turns_played",
"outcome",
"winner_index",
"victory_type",
"wall_clock_sec",
"aggregate",
"player_stats",
"invariant_violations"
],
"properties": {
"seed": { "type": "integer", "minimum": 0 },
"turns_played": { "type": "integer", "minimum": 0 },
"outcome": {
"type": "string",
"enum": ["in_progress", "victory", "max_turns", "defeat"]
},
"winner_index": { "type": "integer", "minimum": -1 },
"victory_type": { "type": "string" },
"wall_clock_sec": { "type": "number", "minimum": 0 },
"aggregate": {
"type": "object",
"additionalProperties": false,
"required": [
"total_combats",
"total_cities_founded",
"total_cities_captured",
"turn_first_combat",
"turn_first_city_captured"
],
"properties": {
"total_combats": { "type": "integer", "minimum": 0 },
"total_cities_founded": { "type": "integer", "minimum": 0 },
"total_cities_captured": { "type": "integer", "minimum": 0 },
"turn_first_combat": { "type": "integer", "minimum": -1, "description": "-1 if no combat yet" },
"turn_first_city_captured": { "type": "integer", "minimum": -1 }
}
},
"player_stats": {
"type": "object",
"description": "Map of player_index (as string) to per-player stats. Rewritten every turn — not final until outcome != 'in_progress'.",
"propertyNames": { "pattern": "^[0-9]+$" },
"additionalProperties": {
"$ref": "#/definitions/player_stats"
}
},
"invariant_violations": {
"type": "array",
"items": { "type": "string" }
}
},
"definitions": {
"player_stats": {
"type": "object",
"additionalProperties": false,
"required": [
"pop", "pop_peak",
"mil",
"cities", "cities_captured", "cities_lost",
"gold", "gold_peak", "gold_per_turn",
"techs", "tiles", "buildings",
"happiness",
"food_total", "production_total",
"kills", "units_lost",
"turn_first_pop_3", "turn_first_pop_4"
],
"properties": {
"pop": { "type": "integer", "minimum": 0 },
"pop_peak": { "type": "integer", "minimum": 0 },
"mil": { "type": "integer", "minimum": 0 },
"cities": { "type": "integer", "minimum": 0 },
"cities_captured": { "type": "integer", "minimum": 0 },
"cities_lost": { "type": "integer", "minimum": 0 },
"gold": { "type": "integer" },
"gold_peak": { "type": "integer" },
"gold_per_turn": { "type": "integer" },
"techs": { "type": "integer", "minimum": 0 },
"tiles": { "type": "integer", "minimum": 0 },
"buildings": { "type": "integer", "minimum": 0 },
"happiness": { "type": "integer" },
"food_total": { "type": "number" },
"production_total": { "type": "number" },
"kills": { "type": "integer", "minimum": 0 },
"units_lost": { "type": "integer", "minimum": 0 },
"turn_first_pop_3": { "type": "integer", "minimum": -1 },
"turn_first_pop_4": { "type": "integer", "minimum": -1 }
}
}
}
}

258
tools/autoplay-validate.py Executable file
View file

@ -0,0 +1,258 @@
#!/usr/bin/env python3
"""
JSON Schema validator for autoplay output files.
Implements the subset of draft-07 used by the schemas:
type, required, additionalProperties, properties, propertyNames.pattern,
minimum, enum, items, pattern, $ref (local only).
stdlib only no pip installs.
Usage:
# Validate a single JSON file against a named schema:
python3 tools/autoplay-validate.py --schema meta path/to/meta.json
# Validate every line of a JSONL file independently:
python3 tools/autoplay-validate.py --schema turn-stats-line --jsonl path/to/turn_stats.jsonl
# Legacy: validate against the flat result schema (default):
python3 tools/autoplay-validate.py path/to/result.json
Exits 0 if all valid, 1 with errors to stderr, 2 on usage error.
Available schema names (--schema):
turn-stats-line tools/schemas/autoplay/turn-stats-line.json
meta tools/schemas/autoplay/meta.json
events-line tools/schemas/autoplay/events-line.json
save tools/schemas/autoplay/save.json
result tools/autoplay-result-schema.json (legacy flat schema)
"""
from __future__ import annotations
import json
import re
import sys
from pathlib import Path
from typing import Any
TOOLS_DIR = Path(__file__).parent
SCHEMAS_DIR = TOOLS_DIR / "schemas" / "autoplay"
SCHEMA_PATHS: dict[str, Path] = {
"result": TOOLS_DIR / "autoplay-result-schema.json",
"turn-stats-line": SCHEMAS_DIR / "turn-stats-line.json",
"meta": SCHEMAS_DIR / "meta.json",
"events-line": SCHEMAS_DIR / "events-line.json",
"save": SCHEMAS_DIR / "save.json",
}
_DEFAULT_SCHEMA = "result"
def load_schema(name: str = _DEFAULT_SCHEMA) -> dict[str, Any]:
path = SCHEMA_PATHS.get(name)
if path is None:
raise ValueError(
f"unknown schema {name!r}. Available: {', '.join(sorted(SCHEMA_PATHS))}"
)
with path.open() as f:
return json.load(f)
_TYPE_CHECKS: dict[str, type | tuple[type, ...]] = {
"object": dict,
"array": list,
"string": str,
"integer": int,
"number": (int, float),
"boolean": bool,
"null": type(None),
}
def _resolve_ref(ref: str, root: dict[str, Any]) -> dict[str, Any]:
if not ref.startswith("#/"):
raise ValueError(f"only local refs supported, got {ref!r}")
node: Any = root
for part in ref[2:].split("/"):
if not isinstance(node, dict) or part not in node:
raise ValueError(f"ref {ref!r} does not resolve")
node = node[part]
return node
def _validate(
value: Any, schema: dict[str, Any], root: dict[str, Any], path: str
) -> list[str]:
errors: list[str] = []
if "$ref" in schema:
schema = _resolve_ref(schema["$ref"], root)
t = schema.get("type")
if t is not None:
expected = _TYPE_CHECKS.get(t)
if expected is None:
errors.append(f"{path}: unknown schema type {t!r}")
return errors
# bool is a subclass of int in Python; reject booleans as numbers.
if t in ("integer", "number") and isinstance(value, bool):
errors.append(f"{path}: expected {t}, got boolean")
return errors
if t == "integer" and isinstance(value, float) and not value.is_integer():
errors.append(f"{path}: expected integer, got float {value}")
return errors
if not isinstance(value, expected):
errors.append(f"{path}: expected {t}, got {type(value).__name__}")
return errors
if "enum" in schema:
if value not in schema["enum"]:
errors.append(f"{path}: {value!r} not in enum {schema['enum']}")
if "minimum" in schema and isinstance(value, (int, float)):
if value < schema["minimum"]:
errors.append(f"{path}: {value} < minimum {schema['minimum']}")
if "pattern" in schema and isinstance(value, str):
if not re.match(schema["pattern"], value):
errors.append(
f"{path}: {value!r} does not match pattern {schema['pattern']!r}"
)
if t == "object" and isinstance(value, dict):
props: dict[str, Any] = schema.get("properties", {})
required: list[str] = schema.get("required", [])
additional: bool | dict[str, Any] = schema.get("additionalProperties", True)
prop_names: dict[str, Any] | None = schema.get("propertyNames")
for req in required:
if req not in value:
errors.append(f"{path}: missing required property {req!r}")
for k, v in value.items():
kpath = f"{path}.{k}"
if prop_names is not None:
errors.extend(_validate(k, prop_names, root, f"{kpath}<key>"))
if k in props:
errors.extend(_validate(v, props[k], root, kpath))
elif additional is False:
errors.append(f"{path}: unexpected property {k!r}")
elif isinstance(additional, dict):
errors.extend(_validate(v, additional, root, kpath))
if t == "array" and isinstance(value, list):
item_schema = schema.get("items")
if item_schema is not None:
for i, item in enumerate(value):
errors.extend(_validate(item, item_schema, root, f"{path}[{i}]"))
return errors
def validate(data: Any, schema: dict[str, Any] | None = None) -> list[str]:
"""Validate data against schema. Returns list of error strings (empty = valid)."""
s = schema if schema is not None else load_schema()
return _validate(data, s, s, "$")
def _validate_file(path: Path, schema: dict[str, Any], jsonl: bool) -> int:
"""Validate one file. Returns error count."""
total_errors = 0
try:
text = path.read_text()
except OSError as e:
print(f"{path}: cannot read ({e})", file=sys.stderr)
return 1
if jsonl:
for lineno, raw in enumerate(text.splitlines(), start=1):
raw = raw.strip()
if not raw:
continue
try:
data = json.loads(raw)
except json.JSONDecodeError as e:
print(f"{path}:{lineno}: invalid JSON ({e})", file=sys.stderr)
total_errors += 1
continue
errs = validate(data, schema)
if errs:
total_errors += len(errs)
print(f"{path}:{lineno}: {len(errs)} error(s)", file=sys.stderr)
for e in errs:
print(f" {e}", file=sys.stderr)
else:
try:
data = json.loads(text)
except json.JSONDecodeError as e:
print(f"{path}: invalid JSON ({e})", file=sys.stderr)
return 1
errs = validate(data, schema)
if errs:
total_errors += len(errs)
print(f"{path}: {len(errs)} error(s)", file=sys.stderr)
for e in errs:
print(f" {e}", file=sys.stderr)
else:
print(f"{path}: OK", file=sys.stderr)
if jsonl and total_errors == 0:
print(f"{path}: OK", file=sys.stderr)
return total_errors
def _main(argv: list[str]) -> int:
args = argv[1:]
schema_name = _DEFAULT_SCHEMA
jsonl = False
files: list[str] = []
i = 0
while i < len(args):
a = args[i]
if a == "--schema":
i += 1
if i >= len(args):
print("ERROR: --schema requires a value", file=sys.stderr)
return 2
schema_name = args[i]
elif a == "--jsonl":
jsonl = True
elif a.startswith("--schema="):
schema_name = a[len("--schema="):]
elif a.startswith("-"):
print(f"ERROR: unknown flag {a!r}", file=sys.stderr)
return 2
else:
files.append(a)
i += 1
if not files:
print(
"usage: autoplay-validate.py [--schema NAME] [--jsonl] <file> [<file> ...]",
file=sys.stderr,
)
print(
f" schemas: {', '.join(sorted(SCHEMA_PATHS))}",
file=sys.stderr,
)
return 2
try:
schema = load_schema(schema_name)
except ValueError as e:
print(f"ERROR: {e}", file=sys.stderr)
return 2
total_errors = 0
for f in files:
total_errors += _validate_file(Path(f), schema, jsonl)
return 1 if total_errors else 0
if __name__ == "__main__":
sys.exit(_main(sys.argv))