magicciv/tooling/rl_self_play/harness_client.py

231 lines
9.1 KiB
Python

"""Reusable JSON-Lines client for `scripts/player-api-server.sh`.
One client = one subprocess = one game. The Gymnasium env in
`magic_civ_env.py` owns one of these per `reset()` cycle, and the
evaluator owns one per evaluation episode. Both share the same
protocol; pulling it out here lets them stay in sync with the wire
contract documented in `src/game/engine/docs/PLAYER_API.md`.
Strong types throughout — no string-typed errors leaking up from the
harness. Anything off-protocol raises `HarnessError`.
"""
from __future__ import annotations
import json
import os
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any
REPO_ROOT = Path(__file__).resolve().parents[2]
HARNESS_SCRIPT = REPO_ROOT / "scripts" / "player-api-server.sh"
# Max lines we will read while looking for a matching response before
# giving up. Each harness response sits behind 0..N async notifications,
# so we need a buffer — 5000 is generous enough that even a busy turn
# with hundreds of `unit_moved`/`turn_started` notifications won't trip
# it, while still bounded so a wedged harness can't hang the trainer.
MAX_LINES_PER_RESPONSE = 5000
class HarnessError(RuntimeError):
"""Raised when the harness violates the protocol or dies unexpectedly."""
@dataclass(frozen=True, slots=True)
class HarnessConfig:
"""Per-episode harness configuration. Mirrors the env-var contract in
`scripts/player-api-server.sh` so callers can override any axis without
knowing the env-var spelling."""
seed: int = 42
players: int = 2
player_slot: int = 0
# Stage 4 (multi-slot adapter) — externally-driven slots when this
# process is driving more than one slot (e.g. 5 learned slots in a
# 5v5 FFA). When this tuple has >1 entry, every wire `view` / `act`
# call MUST include a `slot` field naming which slot it targets;
# `HarnessClient.view`/`act`/`end_turn` accept an optional `slot`
# kwarg for this. Defaults to `(player_slot,)` so single-slot
# callers keep the existing wire shape unchanged.
player_slots: tuple[int, ...] = ()
map_size: str = "duel"
map_type: str = "continents"
omniscient: bool = False
timeout_sec: int = 60
# When set, the simulator's TurnProcessor uses a VictoryConfig that
# matches this mode (see mc-player-api/src/dispatch.rs
# `victory_config_from_env`). Default "domination" so RL episodes can
# terminate via capital capture / LastSurvivor — without this the
# simulator falls back to a city-count check that almost never fires
# in 1v1 duel play.
victory_mode: str = "domination"
# Stage 4 — per-AI-slot controller registry id, comma-joined in slot
# order over AI slots (i.e. excluding `player_slot`). Empty = every
# AI slot defaults to `"scripted:default"` (the MCTS+heuristic).
# Set this to mix learned + scripted opponents in one game, e.g.
# `("learned:duel-v1b", "", "")` puts learned on the first AI slot.
player_controllers: tuple[str, ...] = ()
@property
def effective_player_slots(self) -> tuple[int, ...]:
"""Resolve the back-compat fallback: empty tuple → `(player_slot,)`."""
return self.player_slots if self.player_slots else (self.player_slot,)
def to_env(self) -> dict[str, str]:
slots = self.effective_player_slots
env: dict[str, str] = {
"CP_SEED": str(self.seed),
"CP_PLAYERS": str(self.players),
"CP_PLAYER_SLOT": str(slots[0]),
"CP_PLAYER_SLOTS": ",".join(str(s) for s in slots),
"CP_MAP_SIZE": self.map_size,
"CP_MAP_TYPE": self.map_type,
"CP_OMNISCIENT": "1" if self.omniscient else "0",
"CP_TIMEOUT_SEC": str(self.timeout_sec),
"CP_VICTORY_MODE": self.victory_mode,
}
if self.player_controllers:
env["CP_PLAYER_CONTROLLERS"] = ",".join(self.player_controllers)
return env
class HarnessClient:
"""One running harness instance. Cheap to construct (sub-second on
macOS once Godot's class cache is warm); destroy + recreate on each
Gym `reset()` so episodes have independent simulator state."""
def __init__(self, config: HarnessConfig | None = None) -> None:
self._config = config or HarnessConfig()
env = {**os.environ, **self._config.to_env()}
self._proc = subprocess.Popen(
["bash", str(HARNESS_SCRIPT)],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
cwd=str(REPO_ROOT),
text=True,
bufsize=1,
env=env,
)
self._next_id = 1
self._closed = False
# Async notifications that arrived while we were waiting for a
# correlated response. The wire spec sends notifications without
# an `id`; they carry the same Event payload as response.events
# but fire for things outside our request (AI turns, opponent
# eliminations, GameOver while we're mid-act). Drained by
# `drain_notifications()` after each act/view.
self._pending_notifications: list[dict[str, Any]] = []
@property
def config(self) -> HarnessConfig:
return self._config
def _send(self, msg: dict[str, Any]) -> dict[str, Any]:
if self._closed:
raise HarnessError("harness already closed")
msg["id"] = self._next_id
self._next_id += 1
assert self._proc.stdin is not None and self._proc.stdout is not None
self._proc.stdin.write(json.dumps(msg) + "\n")
self._proc.stdin.flush()
for _ in range(MAX_LINES_PER_RESPONSE):
line = self._proc.stdout.readline()
if not line:
self._closed = True
raise HarnessError(
f"harness stdout EOF while waiting for id={msg['id']}"
)
try:
obj = json.loads(line)
except json.JSONDecodeError:
# Stray stderr on stdout — skip and keep reading.
continue
if obj.get("id") == msg["id"]:
return obj
if "id" not in obj:
# Async notification — buffer for the next drain.
self._pending_notifications.append(obj)
raise HarnessError(
f"no correlated response for id={msg['id']} within {MAX_LINES_PER_RESPONSE} lines"
)
def view(self, slot: int | None = None) -> dict[str, Any]:
msg: dict[str, Any] = {"type": "view"}
if slot is not None:
msg["slot"] = slot
r = self._send(msg)
if not r.get("ok"):
raise HarnessError(f"view failed: {r.get('error')}")
return r["view"]
def act(
self, action: dict[str, Any], slot: int | None = None
) -> dict[str, Any]:
msg: dict[str, Any] = {"type": "act", "action": action}
if slot is not None:
msg["slot"] = slot
r = self._send(msg)
if not r.get("ok"):
err = r.get("error", {})
raise HarnessError(
f"act({action.get('type')!r}, slot={slot}) failed: "
f"{err.get('code')}: {err.get('message')}"
)
return r
def end_turn(self, slot: int | None = None) -> dict[str, Any]:
return self.act({"type": "end_turn"}, slot=slot)
def suggest(self, slot: int | None = None) -> list[dict[str, Any]]:
"""Ask the harness what the scripted controller would play for
`slot` this turn, WITHOUT applying anything or advancing the turn.
Backs the behavioural-cloning recorder (`record_expert.py`): the
returned `actions` list uses the same `PlayerAction` JSON shape
`act()` accepts, so the recorder replays each action straight
back. Read-only on the wire — two `suggest` calls in a row return
identical results and leave `view()` unchanged.
"""
msg: dict[str, Any] = {"type": "suggest"}
if slot is not None:
msg["slot"] = slot
r = self._send(msg)
if not r.get("ok"):
err = r.get("error", {})
raise HarnessError(
f"suggest(slot={slot}) failed: "
f"{err.get('code')}: {err.get('message')}"
)
return list(r.get("actions", []))
def drain_notifications(self) -> list[dict[str, Any]]:
"""Pop and return all async notifications that arrived since the
last drain. Each entry is the raw wire object; callers typically
extract `Notification::Event` payloads by looking at the `type`
field and the carried event tag."""
drained = self._pending_notifications
self._pending_notifications = []
return drained
def shutdown(self) -> None:
if self._closed:
return
self._closed = True
try:
self._send({"type": "shutdown"})
except HarnessError:
pass
try:
self._proc.wait(timeout=5)
except subprocess.TimeoutExpired:
self._proc.kill()
self._proc.wait(timeout=2)
def __enter__(self) -> HarnessClient:
return self
def __exit__(self, *exc: object) -> None:
self.shutdown()