151 lines
5.1 KiB
Python
151 lines
5.1 KiB
Python
"""Reusable JSON-Lines client for `scripts/player-api-server.sh`.
|
|
|
|
One client = one subprocess = one game. The Gymnasium env in
|
|
`magic_civ_env.py` owns one of these per `reset()` cycle, and the
|
|
evaluator owns one per evaluation episode. Both share the same
|
|
protocol; pulling it out here lets them stay in sync with the wire
|
|
contract documented in `src/game/engine/docs/PLAYER_API.md`.
|
|
|
|
Strong types throughout — no string-typed errors leaking up from the
|
|
harness. Anything off-protocol raises `HarnessError`.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[2]
|
|
HARNESS_SCRIPT = REPO_ROOT / "scripts" / "player-api-server.sh"
|
|
|
|
# Max lines we will read while looking for a matching response before
|
|
# giving up. Each harness response sits behind 0..N async notifications,
|
|
# so we need a buffer — 5000 is generous enough that even a busy turn
|
|
# with hundreds of `unit_moved`/`turn_started` notifications won't trip
|
|
# it, while still bounded so a wedged harness can't hang the trainer.
|
|
MAX_LINES_PER_RESPONSE = 5000
|
|
|
|
|
|
class HarnessError(RuntimeError):
|
|
"""Raised when the harness violates the protocol or dies unexpectedly."""
|
|
|
|
|
|
@dataclass(frozen=True, slots=True)
|
|
class HarnessConfig:
|
|
"""Per-episode harness configuration. Mirrors the env-var contract in
|
|
`scripts/player-api-server.sh` so callers can override any axis without
|
|
knowing the env-var spelling."""
|
|
|
|
seed: int = 42
|
|
players: int = 2
|
|
player_slot: int = 0
|
|
map_size: str = "duel"
|
|
map_type: str = "continents"
|
|
omniscient: bool = False
|
|
timeout_sec: int = 60
|
|
|
|
def to_env(self) -> dict[str, str]:
|
|
return {
|
|
"CP_SEED": str(self.seed),
|
|
"CP_PLAYERS": str(self.players),
|
|
"CP_PLAYER_SLOT": str(self.player_slot),
|
|
"CP_MAP_SIZE": self.map_size,
|
|
"CP_MAP_TYPE": self.map_type,
|
|
"CP_OMNISCIENT": "1" if self.omniscient else "0",
|
|
"CP_TIMEOUT_SEC": str(self.timeout_sec),
|
|
}
|
|
|
|
|
|
class HarnessClient:
|
|
"""One running harness instance. Cheap to construct (sub-second on
|
|
macOS once Godot's class cache is warm); destroy + recreate on each
|
|
Gym `reset()` so episodes have independent simulator state."""
|
|
|
|
def __init__(self, config: HarnessConfig | None = None) -> None:
|
|
self._config = config or HarnessConfig()
|
|
env = {**os.environ, **self._config.to_env()}
|
|
self._proc = subprocess.Popen(
|
|
["bash", str(HARNESS_SCRIPT)],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.DEVNULL,
|
|
cwd=str(REPO_ROOT),
|
|
text=True,
|
|
bufsize=1,
|
|
env=env,
|
|
)
|
|
self._next_id = 1
|
|
self._closed = False
|
|
|
|
@property
|
|
def config(self) -> HarnessConfig:
|
|
return self._config
|
|
|
|
def _send(self, msg: dict[str, Any]) -> dict[str, Any]:
|
|
if self._closed:
|
|
raise HarnessError("harness already closed")
|
|
msg["id"] = self._next_id
|
|
self._next_id += 1
|
|
assert self._proc.stdin is not None and self._proc.stdout is not None
|
|
self._proc.stdin.write(json.dumps(msg) + "\n")
|
|
self._proc.stdin.flush()
|
|
for _ in range(MAX_LINES_PER_RESPONSE):
|
|
line = self._proc.stdout.readline()
|
|
if not line:
|
|
self._closed = True
|
|
raise HarnessError(
|
|
f"harness stdout EOF while waiting for id={msg['id']}"
|
|
)
|
|
try:
|
|
obj = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
# Notifications without an id, or stray stderr that landed
|
|
# on stdout — skip and keep reading.
|
|
continue
|
|
if obj.get("id") == msg["id"]:
|
|
return obj
|
|
raise HarnessError(
|
|
f"no correlated response for id={msg['id']} within {MAX_LINES_PER_RESPONSE} lines"
|
|
)
|
|
|
|
def view(self) -> dict[str, Any]:
|
|
r = self._send({"type": "view"})
|
|
if not r.get("ok"):
|
|
raise HarnessError(f"view failed: {r.get('error')}")
|
|
return r["view"]
|
|
|
|
def act(self, action: dict[str, Any]) -> dict[str, Any]:
|
|
r = self._send({"type": "act", "action": action})
|
|
if not r.get("ok"):
|
|
err = r.get("error", {})
|
|
raise HarnessError(
|
|
f"act({action.get('type')!r}) failed: "
|
|
f"{err.get('code')}: {err.get('message')}"
|
|
)
|
|
return r
|
|
|
|
def end_turn(self) -> dict[str, Any]:
|
|
return self.act({"type": "end_turn"})
|
|
|
|
def shutdown(self) -> None:
|
|
if self._closed:
|
|
return
|
|
self._closed = True
|
|
try:
|
|
self._send({"type": "shutdown"})
|
|
except HarnessError:
|
|
pass
|
|
try:
|
|
self._proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
self._proc.kill()
|
|
self._proc.wait(timeout=2)
|
|
|
|
def __enter__(self) -> HarnessClient:
|
|
return self
|
|
|
|
def __exit__(self, *exc: object) -> None:
|
|
self.shutdown()
|