magicciv/tools/regen-objectives-index.py

148 lines
5 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""Regenerate .project/objectives/objectives.json from live objective frontmatter.
Standalone replacement for the (currently offline) "claire" objective
orchestrator's index step. Walks every objective spec, parses its YAML
frontmatter, and emits the same top-level schema claire produced:
{ generated_at, totals, objectives[], blocked[], remaining_by_lead[] }
Field definitions (made explicit since claire's exact filter was undocumented):
* objectives[] one row per spec: id, title, priority, status, scope,
owner, updated_at, blocked_by, summary (first body para).
* totals count per status + grand total.
* blocked[] every spec with a non-empty `blocked_by`, verbatim
({id, blockedBy}). NOT filtered by blocker-done-ness.
* remaining_by_lead actionable backlog per owner: specs whose status is one
of REMAINING_STATUSES, grouped by owner ('' -> unassigned),
sorted by count desc then owner asc.
Usage: python3 tools/regen-objectives-index.py [--dir DIR] [--check]
--check : compute + print totals but do NOT write the file (dry run).
Reusable: re-run any time statuses change while claire is offline.
"""
from __future__ import annotations
import argparse
import json
import re
from datetime import datetime, timezone
from pathlib import Path
SKIP_FILES = {"README.md", "DASHBOARD_CATEGORIES.md", "DASHBOARD_COMPLETED.md"}
SCALAR_KEYS = ("id", "title", "priority", "status", "scope", "owner", "updated_at")
# Non-terminal statuses that represent outstanding work (oos/superseded/done excluded).
REMAINING_STATUSES = {"in_progress", "partial", "stub", "missing"}
FM_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n", re.S)
def _scalar(fm: str, key: str) -> str:
# [ \t]* (not \s*) so an empty `key:` does NOT swallow the next line's value.
m = re.search(rf"^{key}:[ \t]*(.*)$", fm, re.M)
if not m:
return ""
return m.group(1).strip().strip("\"'")
def _blocked_by(fm: str) -> list[str]:
m = re.search(r"^blocked_by:\s*\[(.*)\]\s*$", fm, re.M)
if not m or not m.group(1).strip():
return []
return [item.strip().strip("\"'") for item in m.group(1).split(",") if item.strip()]
def _summary(body: str) -> str:
"""First real paragraph of the body (skip headings, blockquotes, blanks)."""
para: list[str] = []
for raw in body.splitlines():
line = raw.rstrip()
if not line:
if para:
break
continue
if line.lstrip().startswith(("#", "---")):
if para:
break
continue
para.append(line.strip())
text = " ".join(para)
return text[:600]
def parse(path: Path) -> dict | None:
txt = path.read_text(encoding="utf-8")
m = FM_RE.match(txt)
if not m:
return None
fm = m.group(1)
if not re.search(r"^id:\s*\S", fm, re.M):
return None
body = txt[m.end():]
row = {k: _scalar(fm, k) for k in SCALAR_KEYS}
if not row["id"]:
row["id"] = path.stem
row["blocked_by"] = _blocked_by(fm)
row["summary"] = _summary(body)
return row
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--dir", default=str(Path(__file__).resolve().parents[1] / ".project" / "objectives"))
ap.add_argument("--check", action="store_true", help="dry run; print totals, do not write")
args = ap.parse_args()
objdir = Path(args.dir)
rows: list[dict] = []
for path in sorted(objdir.glob("*.md")):
if path.name in SKIP_FILES:
continue
row = parse(path)
if row is not None:
rows.append(row)
totals: dict[str, int] = {}
for r in rows:
totals[r["status"]] = totals.get(r["status"], 0) + 1
totals = {k: totals[k] for k in sorted(totals)}
totals["total"] = len(rows)
blocked = [{"id": r["id"], "blockedBy": r["blocked_by"]} for r in rows if r["blocked_by"]]
by_lead: dict[str, int] = {}
for r in rows:
if r["status"] in REMAINING_STATUSES:
owner = r["owner"] or "unassigned"
by_lead[owner] = by_lead.get(owner, 0) + 1
remaining_by_lead = [
{"owner": o, "remaining": n}
for o, n in sorted(by_lead.items(), key=lambda kv: (-kv[1], kv[0]))
]
index = {
"generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"totals": totals,
"objectives": rows,
"blocked": blocked,
"remaining_by_lead": remaining_by_lead,
}
print("totals:", json.dumps(totals))
print("blocked:", len(blocked), "| remaining_by_lead:",
json.dumps(remaining_by_lead))
if args.check:
print("(--check: not written)")
return 0
out = objdir / "objectives.json"
out.write_text(json.dumps(index, indent=2, ensure_ascii=False) + "\n", encoding="utf-8")
print(f"wrote {out} ({len(rows)} objectives)")
return 0
if __name__ == "__main__":
raise SystemExit(main())