magicciv/tools/validate-resource-graph.py

274 lines
9.4 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Cross-reference validator for the resource graph (Gap A from
plans/what-are-all-the-gleaming-flurry.md).
Walks the Game 1 Age-of-Dwarves data pack and asserts the deposit unit/building
tech graph is internally consistent. Designed to be run in CI.
Modes:
--report (default) Print every issue, exit 0. Use during cleanup.
--strict Exit non-zero on any error. Use in CI after cleanup.
Checks:
M1 Manifest IDs must each have a matching deposit file.
D1 Deposit gates_units / gates_buildings entries must reference real units/buildings.
D2 Bidirectional: if deposit D lists unit U in gates_units, U.requires_resource == D.
U1 Every unit/building with requires_resource: R R must be in the manifest.
U2 Every unit/building with requires_resource: R file R.json must exist with the
reverse back-pointer (this unit/building must appear in R's gates_* list).
T1 Tech reachability: for unit U with tech_required: T and requires_resource: R,
depth(R.yield_gate) <= depth(T). I.e. by the time you unlock U, R is usable.
S1 Game-1 scope: flag deposits whose name or description carries strong magic-school
markers when they appear in the Game-1 manifest. Warning, not error.
"""
from __future__ import annotations
import argparse
import json
import sys
from collections import deque
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
DEPOSITS_DIR = REPO_ROOT / "public" / "resources" / "deposits"
UNITS_DIR = REPO_ROOT / "public" / "resources" / "units"
BUILDINGS_DIR = REPO_ROOT / "public" / "resources" / "buildings"
TECHS_DIR = REPO_ROOT / "public" / "resources" / "techs"
MANIFEST = (
REPO_ROOT
/ "public"
/ "games"
/ "age-of-dwarves"
/ "data"
/ "deposits"
/ "manifest.json"
)
MAGIC_NAME_MARKERS = (
"magesteel",
"deep_crystal",
"dragon_bone",
"mithril",
"chaos_",
"ghost_orchid",
"sacred_grove",
"sunstone",
"merfolk",
"leviathan",
"kraken",
"sea_serpent",
)
class Report:
def __init__(self) -> None:
self.errors: list[str] = []
self.warnings: list[str] = []
def err(self, code: str, msg: str) -> None:
self.errors.append(f"[{code}] {msg}")
def warn(self, code: str, msg: str) -> None:
self.warnings.append(f"[{code}] {msg}")
def print(self) -> None:
for line in self.errors:
print(f"ERROR {line}")
for line in self.warnings:
print(f"WARN {line}")
print(
f"\n{len(self.errors)} errors, {len(self.warnings)} warnings"
)
def _load_array(path: Path) -> list[dict]:
with path.open() as f:
data = json.load(f)
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data]
raise ValueError(f"{path} is neither list nor object")
def _load_manifest(path: Path) -> list[str]:
with path.open() as f:
data = json.load(f)
includes = data.get("includes", [])
if not isinstance(includes, list):
raise ValueError(f"{path}: 'includes' is not a list")
return [str(x) for x in includes]
def _scan_dir(directory: Path) -> dict[str, dict]:
"""Return id -> entry for every entry in every .json file in the directory.
Skips schema files and category-definition files."""
out: dict[str, dict] = {}
if not directory.is_dir():
return out
for path in sorted(directory.iterdir()):
if path.suffix != ".json":
continue
if path.name.endswith(".schema.json"):
continue
if "categories" in path.name or path.name in {
"improvements.json",
"resources.json",
"episodes.json",
}:
continue
try:
for entry in _load_array(path):
if isinstance(entry, dict) and "id" in entry:
out[entry["id"]] = entry
except Exception as exc:
print(f"WARN failed to read {path}: {exc}", file=sys.stderr)
return out
def _build_tech_depth(techs: dict[str, dict]) -> dict[str, int]:
"""BFS from techs with no `requires` to give each tech an integer depth."""
depth: dict[str, int] = {}
queue: deque[str] = deque()
for tid, t in techs.items():
if not t.get("requires"):
depth[tid] = 0
queue.append(tid)
while queue:
tid = queue.popleft()
d = depth[tid]
for other_id, other in techs.items():
if other_id in depth:
continue
reqs = other.get("requires", []) or []
if all(r in depth for r in reqs):
depth[other_id] = max((depth[r] for r in reqs), default=0) + 1
queue.append(other_id)
# Anything left got pruned by a cycle or missing dep.
return depth
def validate(rep: Report) -> None:
manifest = _load_manifest(MANIFEST)
manifest_set = set(manifest)
deposits = _scan_dir(DEPOSITS_DIR)
units = _scan_dir(UNITS_DIR)
buildings = _scan_dir(BUILDINGS_DIR)
techs = _scan_dir(TECHS_DIR)
tech_depth = _build_tech_depth(techs)
# M1 — manifest IDs each have a matching deposit file.
for did in manifest:
if did not in deposits:
rep.err("M1", f"manifest lists '{did}' but no deposits/{did}.json exists")
# D1 + D2 — deposits' gates_* point at real units/buildings with reciprocal requires_resource.
for did, dep in deposits.items():
if did not in manifest_set:
continue # only validate deposits that ship in Game 1.
for uid in dep.get("gates_units", []) or []:
unit = units.get(uid)
if not unit:
rep.err("D1", f"deposit '{did}'.gates_units lists '{uid}' but no unit found")
continue
if unit.get("requires_resource") != did:
rep.err(
"D2",
f"deposit '{did}' says it gates unit '{uid}', but {uid}.requires_resource = "
f"{unit.get('requires_resource')!r}",
)
for bid in dep.get("gates_buildings", []) or []:
bld = buildings.get(bid)
if not bld:
rep.err("D1", f"deposit '{did}'.gates_buildings lists '{bid}' but no building found")
continue
if bld.get("requires_resource") != did:
rep.err(
"D2",
f"deposit '{did}' says it gates building '{bid}', but {bid}.requires_resource = "
f"{bld.get('requires_resource')!r}",
)
# U1 + U2 — units/buildings with requires_resource point at a manifest deposit
# whose gates_* list contains them.
for kind, table in (("unit", units), ("building", buildings)):
for iid, entry in table.items():
rres = entry.get("requires_resource")
if not rres:
continue
if rres not in manifest_set:
rep.err(
"U1",
f"{kind} '{iid}' requires resource '{rres}' but '{rres}' is not in the Game 1 manifest",
)
continue
dep = deposits.get(rres)
if not dep:
continue # already flagged by M1.
back = dep.get(
"gates_units" if kind == "unit" else "gates_buildings"
) or []
if iid not in back:
rep.err(
"U2",
f"{kind} '{iid}'.requires_resource = '{rres}' but '{rres}'.gates_{kind}s does not contain '{iid}'",
)
# T1 — tech reachability.
for kind, table in (("unit", units), ("building", buildings)):
for iid, entry in table.items():
rres = entry.get("requires_resource")
if not rres or rres not in deposits:
continue
t_unlock = entry.get("tech_required")
yield_gate = deposits[rres].get("yield_gate")
if not t_unlock or not yield_gate:
continue # one or both unconstrained → fine.
du = tech_depth.get(t_unlock)
dr = tech_depth.get(yield_gate)
if du is None or dr is None:
rep.warn(
"T1",
f"{kind} '{iid}': cannot compute tech depth for "
f"unlock={t_unlock!r} or yield_gate={yield_gate!r}",
)
continue
if dr > du:
rep.err(
"T1",
f"{kind} '{iid}' unlocks at tech '{t_unlock}' (depth {du}) but its "
f"resource '{rres}' needs yield_gate '{yield_gate}' (depth {dr}) — "
f"player unlocks the {kind} before they can use the resource",
)
# S1 — flag magic-flavor deposit names in Game 1 manifest.
for did in manifest:
if any(marker in did for marker in MAGIC_NAME_MARKERS):
rep.warn(
"S1",
f"manifest entry '{did}' has a magic-school name marker; should it ship in Game 1?",
)
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument(
"--strict",
action="store_true",
help="Exit non-zero on any error. Default: report and exit 0.",
)
args = ap.parse_args()
rep = Report()
validate(rep)
rep.print()
if args.strict and rep.errors:
return 1
return 0
if __name__ == "__main__":
sys.exit(main())