Incremental rebuilds accumulate snapshots (~$0.40/mo each). dist:prune keeps the newest N (default 2: current + one rollback); dist:image reminds you to run it. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
334 lines
15 KiB
Bash
Executable file
334 lines
15 KiB
Bash
Executable file
#!/usr/bin/env bash
|
||
# Distributed test/train dispatch — fan the iteration loop across the DigitalOcean
|
||
# test fleet. Sourced by ./run (defines cmd_dist_*). Auto-registered via the
|
||
# cmd_<verb>_<target> name-dispatch, so no edit to the top-level `run` is needed.
|
||
#
|
||
# ./run dist:up <workers> [size] [region] spin the fleet up
|
||
# ./run dist:sim <games> [turn_limit] [--destroy-after] fan a sim batch across it
|
||
# ./run dist:train <total_steps> [--destroy-after] fan an RL sweep across it
|
||
# ./run dist:down tear it down (zero cost)
|
||
#
|
||
# Requires: TF_VAR_do_token in env, terraform on PATH, and a coordinator with
|
||
# GNU coreutils (autoplay-batch.sh uses `realpath -m`).
|
||
|
||
_DIST_TF_DIR_REL="infra/terraform/test-fleet"
|
||
|
||
_dist_repo_root() { (cd "$(dirname "${BASH_SOURCE[0]}")/../.." && pwd); }
|
||
|
||
_dist_tf() {
|
||
local root
|
||
root="$(_dist_repo_root)"
|
||
terraform -chdir="$root/$_DIST_TF_DIR_REL" "$@"
|
||
}
|
||
|
||
_dist_read_hosts() {
|
||
# Echo one "<user>@<ip>" per line from the inventory, skipping comments/blanks.
|
||
local inv="$1"
|
||
grep -vE '^\s*(#|$)' "$inv" 2>/dev/null || true
|
||
}
|
||
|
||
_dist_wait_ready() {
|
||
# Block until each worker's cloud-init finishes — it copies the fleet key to the
|
||
# build user and git-pulls. DO's boot agent install delays runcmd 1-3 min, so the
|
||
# build user isn't ssh-able until then. We ssh as root (authorized immediately) to wait.
|
||
local root inv host ip
|
||
root="$(_dist_repo_root)"; inv="$root/.local/fleet/inventory"
|
||
[ -f "$inv" ] || return 0
|
||
while IFS= read -r host; do
|
||
ip="${host#*@}"
|
||
printf ' waiting for %s cloud-init... ' "$ip"
|
||
local _i
|
||
for _i in $(seq 1 36); do
|
||
ssh -n -o StrictHostKeyChecking=accept-new -o ConnectTimeout=8 -o BatchMode=yes -i ~/.ssh/id_mc_fleet "root@$ip" true 2>/dev/null && break
|
||
sleep 5
|
||
done
|
||
ssh -n -o BatchMode=yes -i ~/.ssh/id_mc_fleet "root@$ip" 'cloud-init status --wait >/dev/null 2>&1 || true' 2>/dev/null
|
||
echo "ready"
|
||
done < <(_dist_read_hosts "$inv")
|
||
}
|
||
|
||
cmd_dist() {
|
||
cat <<'EOF'
|
||
Distributed test/train fleet (DigitalOcean). Set TF_VAR_do_token first.
|
||
./run dist:check offline: fmt + validate + mocked test (no token/spend)
|
||
./run dist:image [--cold] (re)build golden image — incremental by default (~3-8min vs ~20 cold)
|
||
./run dist:prune [keep=2] delete superseded golden snapshots (~$0.40/mo each)
|
||
./run dist:up <workers> [size] [region] e.g. ./run dist:up 10
|
||
./run dist:sim <games> [turn_limit] [--destroy-after]
|
||
./run dist:train <total_steps> [--destroy-after]
|
||
./run dist:test cargo test --workspace on a worker
|
||
./run dist:build cargo build + wasm on a worker (wasm rsync'd back)
|
||
./run dist:sync [ref] git pull + rebuild gdext on live workers
|
||
./run dist:render <res://scene.tscn> <out.png> render a proof scene (software weston, no GPU) → png
|
||
./run dist:down
|
||
EOF
|
||
}
|
||
|
||
cmd_dist_check() {
|
||
# Offline IaC verification — no DigitalOcean token, no API, no servers, no cost.
|
||
# fmt (style) + validate (schema typecheck) + test (mocked-provider behaviour).
|
||
local root
|
||
root="$(_dist_repo_root)"
|
||
local dir="$root/$_DIST_TF_DIR_REL"
|
||
echo "== terraform fmt =="
|
||
terraform -chdir="$dir" fmt -check -recursive || { echo "fmt: run 'terraform -chdir=$dir fmt'" >&2; return 1; }
|
||
echo "== terraform init (providers only) =="
|
||
terraform -chdir="$dir" init -backend=false -input=false >/dev/null || return 1
|
||
echo "== terraform validate (schema typecheck) =="
|
||
terraform -chdir="$dir" validate || return 1
|
||
echo "== terraform test (mocked digitalocean) =="
|
||
terraform -chdir="$dir" test || return 1
|
||
echo "dist:check OK — config is valid, no resources touched."
|
||
}
|
||
|
||
cmd_dist_image() {
|
||
# (Re)build the golden image. INCREMENTAL by default: builds FROM the newest
|
||
# mc-golden snapshot, so provision.sh (idempotent) only redoes changed work
|
||
# (~3-8 min). --cold builds from stock Ubuntu (~20 min) — resets accumulated
|
||
# layer cruft; run occasionally. Needs ~/.vault/{do_pat_mc,mc_forge_creds}.
|
||
local cold=false a
|
||
for a in "$@"; do [ "$a" = "--cold" ] && cold=true; done
|
||
local root pat
|
||
root="$(_dist_repo_root)"
|
||
pat="$(cat ~/.vault/do_pat_mc 2>/dev/null)"
|
||
[ -n "$pat" ] || { echo "no ~/.vault/do_pat_mc" >&2; return 1; }
|
||
export DIGITALOCEAN_TOKEN="$pat"
|
||
# shellcheck disable=SC1090
|
||
. ~/.vault/mc_forge_creds
|
||
export PKR_VAR_git_remote="http://${ADMIN_USER}:${ADMIN_PASS}@${FORGE_IP}:3000/mcadmin/magicciv.git"
|
||
PKR_VAR_fleet_pubkey="$(cat ~/.ssh/id_mc_fleet.pub)"; export PKR_VAR_fleet_pubkey
|
||
local base="ubuntu-24-04-x64" prev
|
||
if ! $cold; then
|
||
prev="$(curl -s -H "Authorization: Bearer $pat" "https://api.digitalocean.com/v2/snapshots?resource_type=droplet&per_page=200" \
|
||
| python3 -c "import sys,json;s=[x for x in json.load(sys.stdin)['snapshots'] if x['name'].startswith('mc-golden')];s.sort(key=lambda x:x['created_at']);print(s[-1]['id'] if s else '')" 2>/dev/null)"
|
||
if [ -n "$prev" ]; then base="$prev"; echo "INCREMENTAL rebuild from snapshot $base (pass --cold for a full rebuild)"; else echo "no prior golden — cold build"; fi
|
||
else
|
||
echo "COLD rebuild from $base"
|
||
fi
|
||
export PKR_VAR_base_image="$base"
|
||
( cd "$root/infra/packer" && packer init golden-image.pkr.hcl >/dev/null && packer build golden-image.pkr.hcl )
|
||
echo "tip: each rebuild leaves a snapshot (~\$0.40/mo) — './run dist:prune' deletes superseded ones."
|
||
}
|
||
|
||
cmd_dist_prune() {
|
||
# Delete superseded golden snapshots, keeping the newest N (default 2).
|
||
local keep="${1:-2}" pat old id
|
||
pat="$(cat ~/.vault/do_pat_mc 2>/dev/null)"
|
||
[ -n "$pat" ] || { echo "no ~/.vault/do_pat_mc" >&2; return 1; }
|
||
old="$(curl -s -H "Authorization: Bearer $pat" "https://api.digitalocean.com/v2/snapshots?resource_type=droplet&per_page=200" \
|
||
| python3 -c "import sys,json;s=[x for x in json.load(sys.stdin)['snapshots'] if x['name'].startswith('mc-golden')];s.sort(key=lambda x:x['created_at']);[print(x['id']) for x in s[:-${keep}]]" 2>/dev/null)"
|
||
[ -n "$old" ] || { echo "nothing to prune (<= $keep golden snapshots)"; return 0; }
|
||
for id in $old; do
|
||
curl -s -o /dev/null -w " pruned golden snapshot $id: http %{http_code}\n" -X DELETE -H "Authorization: Bearer $pat" "https://api.digitalocean.com/v2/snapshots/$id"
|
||
done
|
||
}
|
||
|
||
cmd_dist_up() {
|
||
local n="${1:-}"
|
||
[[ "$n" =~ ^[0-9]+$ ]] || { echo "usage: ./run dist:up <workers> [size] [region]" >&2; return 1; }
|
||
: "${TF_VAR_do_token:?export TF_VAR_do_token=<DigitalOcean API token> first}"
|
||
local args=(-auto-approve -var "workers=$n")
|
||
[ -n "${2:-}" ] && args+=(-var "size=$2")
|
||
[ -n "${3:-}" ] && args+=(-var "region=$3")
|
||
_dist_tf init -input=false >/dev/null
|
||
_dist_tf apply "${args[@]}" || { echo "dist:up FAILED — terraform apply errored (see above)" >&2; return 1; }
|
||
echo "fleet up: $n worker(s) — waiting for cloud-init before they're usable..."
|
||
_dist_wait_ready
|
||
echo "fleet ready. inventory: $(_dist_repo_root)/.local/fleet/inventory"
|
||
}
|
||
|
||
cmd_dist_down() {
|
||
: "${TF_VAR_do_token:?export TF_VAR_do_token=<DigitalOcean API token> first}"
|
||
_dist_tf apply -auto-approve -var "workers=0"
|
||
echo "fleet down (workers=0): zero compute cost, snapshot only (~$0.40/mo)."
|
||
}
|
||
|
||
cmd_dist_sim() {
|
||
local total="${1:-}" turn="${2:-300}" destroy=false
|
||
local a
|
||
for a in "$@"; do [ "$a" = "--destroy-after" ] && destroy=true; done
|
||
[[ "$total" =~ ^[0-9]+$ ]] || { echo "usage: ./run dist:sim <total_games> [turn_limit] [--destroy-after]" >&2; return 1; }
|
||
|
||
local root inv
|
||
root="$(_dist_repo_root)"
|
||
inv="$root/.local/fleet/inventory"
|
||
[ -f "$inv" ] || { echo "no inventory at $inv — run ./run dist:up <N> first" >&2; return 1; }
|
||
|
||
local hosts=()
|
||
while IFS= read -r line; do hosts+=("$line"); done < <(_dist_read_hosts "$inv")
|
||
local n=${#hosts[@]}
|
||
[ "$n" -gt 0 ] || { echo "inventory empty — fleet is down" >&2; return 1; }
|
||
|
||
local stamp results shard
|
||
stamp="$(date +%Y%m%d_%H%M%S)"
|
||
results="$root/.local/iter/$stamp"
|
||
mkdir -p "$results"
|
||
shard=$(( (total + n - 1) / n )) # ceil(total / n)
|
||
echo "distributing $total game(s) over $n worker(s): ~$shard each, turn_limit=$turn"
|
||
echo "results → $results"
|
||
|
||
local pids=() i=0 host offset cnt cores
|
||
for host in "${hosts[@]}"; do
|
||
offset=$(( i * shard ))
|
||
cnt=$shard
|
||
(( offset + cnt > total )) && cnt=$(( total - offset ))
|
||
(( cnt <= 0 )) && break
|
||
cores="$(ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new "$host" nproc 2>/dev/null || echo 8)"
|
||
echo " [$host] seeds $(( offset + 1 ))..$(( offset + cnt )) PARALLEL=$cores"
|
||
AUTOPLAY_HOST="$host" SEED_OFFSET="$offset" PARALLEL="$cores" \
|
||
bash "$root/tools/autoplay-batch.sh" "$cnt" "$turn" "$results" \
|
||
>"$results/dispatch_worker_${i}.log" 2>&1 &
|
||
pids+=($!)
|
||
i=$(( i + 1 ))
|
||
done
|
||
|
||
local fail=0 p
|
||
for p in "${pids[@]}"; do wait "$p" || fail=$(( fail + 1 )); done
|
||
|
||
local produced
|
||
produced="$(find "$results" -name turn_stats.jsonl -type f 2>/dev/null | wc -l | tr -d ' ')"
|
||
echo "----------------------------------------------------------------"
|
||
echo "distributed sim done: $produced game(s) produced turn_stats under $results"
|
||
[ "$fail" -eq 0 ] || echo "WARNING: $fail worker batch(es) errored — see $results/dispatch_worker_*.log" >&2
|
||
|
||
$destroy && { echo "--destroy-after → tearing down"; cmd_dist_down; }
|
||
[ "$fail" -eq 0 ]
|
||
}
|
||
|
||
cmd_dist_train() {
|
||
# v1 blocking sweep: one training run per worker (distinct seed + run-name),
|
||
# then pull the models back. Detached orchestration is the documented follow-up.
|
||
local steps="${1:-1000000}" destroy=false
|
||
local a
|
||
for a in "$@"; do [ "$a" = "--destroy-after" ] && destroy=true; done
|
||
[[ "$steps" =~ ^[0-9]+$ ]] || { echo "usage: ./run dist:train <total_steps> [--destroy-after]" >&2; return 1; }
|
||
|
||
local root inv
|
||
root="$(_dist_repo_root)"
|
||
inv="$root/.local/fleet/inventory"
|
||
[ -f "$inv" ] || { echo "no inventory at $inv — run ./run dist:up <N> first" >&2; return 1; }
|
||
|
||
local hosts=()
|
||
while IFS= read -r line; do hosts+=("$line"); done < <(_dist_read_hosts "$inv")
|
||
local n=${#hosts[@]}
|
||
[ "$n" -gt 0 ] || { echo "inventory empty — fleet is down" >&2; return 1; }
|
||
|
||
local stamp results
|
||
stamp="$(date +%Y%m%d_%H%M%S)"
|
||
results="$root/.local/train/$stamp"
|
||
mkdir -p "$results"
|
||
echo "fanning $n training run(s) × $steps steps (CPU). results → $results"
|
||
|
||
local repo_remote="Code/@projects/@magic-civilization"
|
||
local pids=() i=0 host seed run
|
||
for host in "${hosts[@]}"; do
|
||
seed=$(( 42 + i ))
|
||
run="dist-${stamp}-w${i}"
|
||
echo " [$host] run=$run seed=$seed"
|
||
ssh -o BatchMode=yes -o StrictHostKeyChecking=accept-new "$host" \
|
||
"cd ~/$repo_remote && python3 -m tooling.rl_self_play.train --run-name '$run' --seed $seed --total-steps $steps --device cpu" \
|
||
>"$results/train_worker_${i}.log" 2>&1 &
|
||
pids+=($!)
|
||
i=$(( i + 1 ))
|
||
done
|
||
|
||
local fail=0 p
|
||
for p in "${pids[@]}"; do wait "$p" || fail=$(( fail + 1 )); done
|
||
|
||
# Pull each worker's model dir back.
|
||
i=0
|
||
for host in "${hosts[@]}"; do
|
||
run="dist-${stamp}-w${i}"
|
||
rsync -az "$host:~/$repo_remote/tooling/rl_self_play/models/$run" "$results/" 2>/dev/null || \
|
||
echo " note: no model dir for $run on $host (check $results/train_worker_${i}.log)"
|
||
i=$(( i + 1 ))
|
||
done
|
||
|
||
echo "----------------------------------------------------------------"
|
||
echo "distributed train done under $results"
|
||
[ "$fail" -eq 0 ] || echo "WARNING: $fail run(s) errored — see $results/train_worker_*.log" >&2
|
||
|
||
$destroy && { echo "--destroy-after → tearing down"; cmd_dist_down; }
|
||
[ "$fail" -eq 0 ]
|
||
}
|
||
|
||
# ── compute offload (single worker) ──────────────────────────────────────────
|
||
# Run heavy build/test compute on a DO worker instead of plum (M2 Air). Workers
|
||
# already carry the toolchain (golden image) + repo (cloud-init git pull).
|
||
|
||
_dist_first_host() {
|
||
local inv
|
||
inv="$(_dist_repo_root)/.local/fleet/inventory"
|
||
[ -f "$inv" ] || return 1
|
||
_dist_read_hosts "$inv" | head -1
|
||
}
|
||
|
||
cmd_dist_sync() {
|
||
# Pull the given ref on every live worker + rebuild the GDExtension, so a
|
||
# mid-session code change reaches the fleet without an image rebuild.
|
||
local ref="${1:-main}"
|
||
local root inv host
|
||
root="$(_dist_repo_root)"
|
||
inv="$root/.local/fleet/inventory"
|
||
[ -f "$inv" ] || { echo "no fleet — run ./run dist:up <N> first" >&2; return 1; }
|
||
local pids=() p fail=0
|
||
while IFS= read -r host; do
|
||
echo "[$host] sync → $ref"
|
||
ssh -n -o BatchMode=yes -o StrictHostKeyChecking=accept-new "$host" "
|
||
set -e
|
||
cd ~/Code/@projects/@magic-civilization
|
||
git fetch --depth=1 origin '$ref' && git reset --hard FETCH_HEAD
|
||
cd src/simulator && . ~/.cargo/env && bash build-gdext.sh
|
||
" &
|
||
pids+=($!)
|
||
done < <(_dist_read_hosts "$inv")
|
||
for p in "${pids[@]}"; do wait "$p" || fail=$(( fail + 1 )); done
|
||
[ "$fail" -eq 0 ] && echo "synced all workers to $ref" || { echo "$fail worker(s) failed sync" >&2; return 1; }
|
||
}
|
||
|
||
cmd_dist_test() {
|
||
# Offload the Rust test suite to one fast worker (slow on the M2 Air).
|
||
local host repo
|
||
host="$(_dist_first_host)" || { echo "no fleet — run ./run dist:up 1 c-8 first" >&2; return 1; }
|
||
repo="Code/@projects/@magic-civilization"
|
||
echo "running cargo tests on $host ..."
|
||
ssh -n -o BatchMode=yes -o StrictHostKeyChecking=accept-new "$host" "
|
||
set -e
|
||
cd ~/$repo/src/simulator && . ~/.cargo/env
|
||
if command -v cargo-nextest >/dev/null 2>&1; then cargo nextest run --workspace; else cargo test --workspace; fi
|
||
"
|
||
}
|
||
|
||
cmd_dist_build() {
|
||
# Offload the workspace build for fast compile feedback, and bring back the
|
||
# platform-independent WASM artifact. The native .so is linux-only and stays
|
||
# on the worker (plum builds its own macOS .dylib locally).
|
||
local host root repo
|
||
host="$(_dist_first_host)" || { echo "no fleet — run ./run dist:up 1 first" >&2; return 1; }
|
||
root="$(_dist_repo_root)"
|
||
repo="Code/@projects/@magic-civilization"
|
||
echo "building workspace + wasm on $host ..."
|
||
ssh -n -o BatchMode=yes -o StrictHostKeyChecking=accept-new "$host" "
|
||
set -e
|
||
cd ~/$repo/src/simulator && . ~/.cargo/env
|
||
cargo build --workspace
|
||
bash build-wasm.sh
|
||
"
|
||
echo "fetching wasm artifact → plum ..."
|
||
mkdir -p "$root/.local/build/wasm"
|
||
rsync -az "$host:~/$repo/.local/build/wasm/" "$root/.local/build/wasm/" 2>/dev/null \
|
||
&& echo "wasm → .local/build/wasm/" || echo "note: no wasm at .local/build/wasm/ on worker"
|
||
}
|
||
|
||
cmd_dist_render() {
|
||
# Render a proof scene on a worker (software weston + Mesa llvmpipe, no GPU) and
|
||
# pull the PNG back to plum. Replaces the apricot SCREENSHOT_HOST flow.
|
||
local scene="${1:-}" out="${2:-}"
|
||
[ -n "$scene" ] && [ -n "$out" ] || { echo "usage: ./run dist:render <res://scene.tscn> <out.png> [timeout_s]" >&2; return 1; }
|
||
local host
|
||
host="$(_dist_first_host)" || { echo "no fleet — run ./run dist:up 1 first" >&2; return 1; }
|
||
local user="${host%@*}"
|
||
AUTOPLAY_HOST="$host" \
|
||
PROJECT_ROOT_REMOTE="/home/${user}/Code/@projects/@magic-civilization" \
|
||
bash "$(_dist_repo_root)/tools/capture-proof.sh" "$scene" "$out" "${3:-180}"
|
||
}
|