magicciv/tools/sprite-generation/engine/registry.py
Claude Code 1b7999918c feat(gui): Add dashboard page to visualize sprite registry data from SQLite database
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-03-29 05:29:52 -07:00

1008 lines
39 KiB
Python

"""SQLite-backed sprite registry tracking ~4,500 sprites through their lifecycle."""
from __future__ import annotations
import sqlite3
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
_SCHEMA = """
CREATE TABLE IF NOT EXISTS sprites (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
entity_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'needed',
sprite_path TEXT,
install_path TEXT,
prompt TEXT,
negative_prompt TEXT,
gen_width INTEGER DEFAULT 1024,
gen_height INTEGER DEFAULT 512,
target_width INTEGER DEFAULT 384,
target_height INTEGER DEFAULT 332,
source_file TEXT,
dimensions TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS sprite_dimensions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sprite_id TEXT NOT NULL REFERENCES sprites(id) ON DELETE CASCADE,
dimension_type TEXT NOT NULL,
dimension_value TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'needed',
prompt_modifier TEXT,
install_path TEXT,
approved_variant_id INTEGER REFERENCES variants(id),
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(sprite_id, dimension_type, dimension_value)
);
CREATE TABLE IF NOT EXISTS variants (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sprite_id TEXT NOT NULL REFERENCES sprites(id) ON DELETE CASCADE,
dimension_id INTEGER REFERENCES sprite_dimensions(id) ON DELETE CASCADE,
seed INTEGER NOT NULL,
job_id TEXT,
job_status TEXT NOT NULL DEFAULT 'submitted',
raw_path TEXT,
processed_path TEXT,
prompt_modifier TEXT,
model TEXT,
prompt_used TEXT,
negative_used TEXT,
guidance_scale REAL,
steps INTEGER,
prompt_author TEXT,
is_approved INTEGER NOT NULL DEFAULT 0,
rating INTEGER,
notes TEXT,
generation_ms INTEGER,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS variant_scores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
variant_id INTEGER NOT NULL REFERENCES variants(id) ON DELETE CASCADE,
scorer_name TEXT NOT NULL,
scorer_model TEXT NOT NULL,
tier INTEGER NOT NULL,
gates TEXT,
quality TEXT,
gate_passed INTEGER NOT NULL DEFAULT 0,
confidence REAL NOT NULL DEFAULT 0.0,
failed_gate_reason TEXT,
quality_floor_failed INTEGER NOT NULL DEFAULT 0,
raw_response TEXT,
scored_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS generation_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at TEXT NOT NULL,
finished_at TEXT,
category TEXT,
total_jobs INTEGER NOT NULL DEFAULT 0,
completed INTEGER NOT NULL DEFAULT 0,
failed INTEGER NOT NULL DEFAULT 0,
variants_per INTEGER NOT NULL DEFAULT 8
);
CREATE TABLE IF NOT EXISTS prompt_templates (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
template TEXT NOT NULL,
negative TEXT NOT NULL,
version INTEGER NOT NULL DEFAULT 1,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sprites_category ON sprites(category);
CREATE INDEX IF NOT EXISTS idx_sprites_status ON sprites(status);
CREATE INDEX IF NOT EXISTS idx_sprite_dimensions_sprite ON sprite_dimensions(sprite_id);
CREATE INDEX IF NOT EXISTS idx_variants_sprite ON variants(sprite_id);
CREATE INDEX IF NOT EXISTS idx_variants_dimension ON variants(dimension_id);
CREATE INDEX IF NOT EXISTS idx_variants_job_status ON variants(job_status);
CREATE INDEX IF NOT EXISTS idx_variant_scores_variant ON variant_scores(variant_id);
CREATE INDEX IF NOT EXISTS idx_variant_scores_scorer ON variant_scores(scorer_name);
CREATE INDEX IF NOT EXISTS idx_variant_scores_variant_scorer ON variant_scores(variant_id, scorer_name);
-- Latest score per (variant, scorer): used by all pass/fail decisions
CREATE VIEW IF NOT EXISTS latest_scores AS
SELECT vs.*
FROM variant_scores vs
WHERE vs.id = (
SELECT MAX(id) FROM variant_scores
WHERE variant_id = vs.variant_id AND scorer_name = vs.scorer_name
);
"""
class SpriteRegistry:
def __init__(self, db_path: Path) -> None:
db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(str(db_path), check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self.conn.execute("PRAGMA journal_mode=WAL")
self.conn.execute("PRAGMA foreign_keys=ON")
self.conn.executescript(_SCHEMA)
self._migrate()
def _migrate(self) -> None:
# variants column additions
existing_v = {row[1] for row in self.conn.execute("PRAGMA table_info(variants)").fetchall()}
for col, typ in [
("model", "TEXT"),
("prompt_used", "TEXT"),
("negative_used", "TEXT"),
("guidance_scale", "REAL"),
("steps", "INTEGER"),
("prompt_author", "TEXT"),
("scored_by", "TEXT"),
("scored_at", "TEXT"),
("review_tier", "INTEGER DEFAULT 0"),
]:
if col not in existing_v:
self.conn.execute(f"ALTER TABLE variants ADD COLUMN {col} {typ}")
# Drop UNIQUE(variant_id, scorer_name) from variant_scores if present.
# SQLite doesn't support DROP CONSTRAINT — we recreate the table without it.
idxs = {row[1] for row in self.conn.execute(
"SELECT * FROM sqlite_master WHERE type='index' AND tbl_name='variant_scores'"
).fetchall()}
if any("unique" in idx.lower() or "sqlite_autoindex" in idx.lower() for idx in idxs):
self.conn.executescript("""
BEGIN;
DROP VIEW IF EXISTS latest_scores;
CREATE TABLE IF NOT EXISTS variant_scores_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
variant_id INTEGER NOT NULL REFERENCES variants(id) ON DELETE CASCADE,
scorer_name TEXT NOT NULL,
scorer_model TEXT NOT NULL,
tier INTEGER NOT NULL,
gates TEXT,
quality TEXT,
gate_passed INTEGER NOT NULL DEFAULT 0,
confidence REAL NOT NULL DEFAULT 0.0,
failed_gate_reason TEXT,
quality_floor_failed INTEGER NOT NULL DEFAULT 0,
raw_response TEXT,
scored_at TEXT NOT NULL
);
INSERT OR IGNORE INTO variant_scores_new
SELECT id, variant_id, scorer_name, scorer_model, tier,
gates, quality, gate_passed, confidence,
failed_gate_reason, quality_floor_failed, raw_response, scored_at
FROM variant_scores;
DROP TABLE variant_scores;
ALTER TABLE variant_scores_new RENAME TO variant_scores;
CREATE INDEX IF NOT EXISTS idx_variant_scores_variant
ON variant_scores(variant_id);
CREATE INDEX IF NOT EXISTS idx_variant_scores_scorer
ON variant_scores(scorer_name);
CREATE INDEX IF NOT EXISTS idx_variant_scores_variant_scorer
ON variant_scores(variant_id, scorer_name);
CREATE VIEW latest_scores AS
SELECT vs.* FROM variant_scores vs
WHERE vs.id = (
SELECT MAX(id) FROM variant_scores
WHERE variant_id = vs.variant_id AND scorer_name = vs.scorer_name
);
COMMIT;
""")
# Ensure latest_scores view exists (CREATE VIEW is idempotent via IF NOT EXISTS)
self.conn.executescript("""
CREATE VIEW IF NOT EXISTS latest_scores AS
SELECT vs.*
FROM variant_scores vs
WHERE vs.id = (
SELECT MAX(id) FROM variant_scores
WHERE variant_id = vs.variant_id AND scorer_name = vs.scorer_name
);
""")
self.conn.commit()
# -- sprites ---------------------------------------------------------------
def upsert_sprite(
self,
id: str,
category: str,
entity_id: str,
*,
status: str = "needed",
sprite_path: str | None = None,
install_path: str | None = None,
prompt: str | None = None,
negative_prompt: str | None = None,
gen_width: int = 1024,
gen_height: int = 512,
target_width: int = 384,
target_height: int = 332,
source_file: str | None = None,
dimensions: str | None = None,
) -> bool:
now = _now()
with self.conn:
existing = self.conn.execute(
"SELECT id FROM sprites WHERE id = ?", (id,)
).fetchone()
if existing:
self.conn.execute(
"""UPDATE sprites SET category=?, entity_id=?, sprite_path=?,
install_path=?, prompt=?, negative_prompt=?, gen_width=?,
gen_height=?, target_width=?, target_height=?, source_file=?,
dimensions=?, updated_at=?
WHERE id=?""",
(
category, entity_id, sprite_path, install_path, prompt,
negative_prompt, gen_width, gen_height, target_width,
target_height, source_file, dimensions, now, id,
),
)
return False
self.conn.execute(
"""INSERT INTO sprites (id, category, entity_id, status, sprite_path,
install_path, prompt, negative_prompt, gen_width, gen_height,
target_width, target_height, source_file, dimensions,
created_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
id, category, entity_id, status, sprite_path, install_path,
prompt, negative_prompt, gen_width, gen_height, target_width,
target_height, source_file, dimensions, now, now,
),
)
return True
def update_sprite_status(self, sprite_id: str, status: str) -> None:
with self.conn:
self.conn.execute(
"UPDATE sprites SET status=?, updated_at=? WHERE id=?",
(status, _now(), sprite_id),
)
def get_sprite(self, sprite_id: str) -> dict | None:
row = self.conn.execute(
"SELECT * FROM sprites WHERE id = ?", (sprite_id,)
).fetchone()
if not row:
return None
sprite = dict(row)
dims = self.conn.execute(
"SELECT * FROM sprite_dimensions WHERE sprite_id = ? ORDER BY dimension_type, dimension_value",
(sprite_id,),
).fetchall()
sprite["dimensions_list"] = [dict(d) for d in dims]
variants = self.conn.execute(
"SELECT * FROM variants WHERE sprite_id = ? AND dimension_id IS NULL ORDER BY created_at",
(sprite_id,),
).fetchall()
sprite["variants"] = [dict(v) for v in variants]
for dim in sprite["dimensions_list"]:
dim_variants = self.conn.execute(
"SELECT * FROM variants WHERE dimension_id = ? ORDER BY created_at",
(dim["id"],),
).fetchall()
dim["variants"] = [dict(v) for v in dim_variants]
return sprite
def get_sprites(
self,
category: str | None = None,
status: str | None = None,
search: str | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict]:
clauses: list[str] = []
params: list[str | int] = []
if category:
clauses.append("category = ?")
params.append(category)
if status:
clauses.append("status = ?")
params.append(status)
if search:
clauses.append("(id LIKE ? OR entity_id LIKE ? OR prompt LIKE ?)")
term = f"%{search}%"
params.extend([term, term, term])
where = f" WHERE {' AND '.join(clauses)}" if clauses else ""
params.extend([limit, offset])
rows = self.conn.execute(
f"SELECT * FROM sprites{where} ORDER BY category, id LIMIT ? OFFSET ?",
params,
).fetchall()
return [dict(r) for r in rows]
def reject_sprite(self, sprite_id: str, dimension_id: int | None = None) -> None:
now = _now()
with self.conn:
if dimension_id is not None:
self.conn.execute(
"UPDATE variants SET is_approved=0 WHERE dimension_id=?",
(dimension_id,),
)
self.conn.execute(
"UPDATE sprite_dimensions SET status='needed', approved_variant_id=NULL, updated_at=? WHERE id=?",
(now, dimension_id),
)
else:
self.conn.execute(
"UPDATE variants SET is_approved=0 WHERE sprite_id=? AND dimension_id IS NULL",
(sprite_id,),
)
self.conn.execute(
"UPDATE sprites SET status='needed', updated_at=? WHERE id=?",
(now, sprite_id),
)
def get_approved_uninstalled(self, category: str | None = None) -> list[dict]:
clauses = ["status = 'approved'"]
params: list[str] = []
if category:
clauses.append("category = ?")
params.append(category)
where = " WHERE " + " AND ".join(clauses)
rows = self.conn.execute(
f"SELECT * FROM sprites{where} ORDER BY category, id", params
).fetchall()
results = [dict(r) for r in rows]
# Also include dimensions that are approved but not installed
dim_clauses = ["sd.status = 'approved'"]
dim_params: list[str] = []
if category:
dim_clauses.append("s.category = ?")
dim_params.append(category)
dim_where = " WHERE " + " AND ".join(dim_clauses)
dim_rows = self.conn.execute(
f"""SELECT sd.*, s.category, s.entity_id FROM sprite_dimensions sd
JOIN sprites s ON sd.sprite_id = s.id{dim_where}
ORDER BY s.category, sd.sprite_id""",
dim_params,
).fetchall()
for r in results:
r["approved_dimensions"] = [
dict(d) for d in dim_rows if d["sprite_id"] == r["id"]
]
return results
def mark_installed(self, sprite_id: str, dimension_id: int | None = None) -> None:
now = _now()
with self.conn:
if dimension_id is not None:
self.conn.execute(
"UPDATE sprite_dimensions SET status='installed', updated_at=? WHERE id=?",
(now, dimension_id),
)
else:
self.conn.execute(
"UPDATE sprites SET status='installed', updated_at=? WHERE id=?",
(now, sprite_id),
)
# -- dimensions ------------------------------------------------------------
def upsert_dimension(
self,
sprite_id: str,
dimension_type: str,
dimension_value: str,
*,
status: str = "needed",
prompt_modifier: str | None = None,
install_path: str | None = None,
) -> int:
now = _now()
with self.conn:
existing = self.conn.execute(
"""SELECT id FROM sprite_dimensions
WHERE sprite_id=? AND dimension_type=? AND dimension_value=?""",
(sprite_id, dimension_type, dimension_value),
).fetchone()
if existing:
self.conn.execute(
"""UPDATE sprite_dimensions SET prompt_modifier=?,
install_path=?, updated_at=? WHERE id=?""",
(prompt_modifier, install_path, now, existing["id"]),
)
return existing["id"]
cur = self.conn.execute(
"""INSERT INTO sprite_dimensions (sprite_id, dimension_type,
dimension_value, status, prompt_modifier, install_path,
created_at, updated_at)
VALUES (?,?,?,?,?,?,?,?)""",
(
sprite_id, dimension_type, dimension_value, status,
prompt_modifier, install_path, now, now,
),
)
return cur.lastrowid
def update_dimension_status(self, dimension_id: int, status: str) -> None:
with self.conn:
self.conn.execute(
"UPDATE sprite_dimensions SET status=?, updated_at=? WHERE id=?",
(status, _now(), dimension_id),
)
# -- variants --------------------------------------------------------------
def add_variant(
self,
sprite_id: str,
seed: int,
dimension_id: int | None = None,
prompt_modifier: str | None = None,
job_id: str | None = None,
model: str | None = None,
prompt_used: str | None = None,
negative_used: str | None = None,
guidance_scale: float | None = None,
steps: int | None = None,
prompt_author: str | None = None,
) -> int:
with self.conn:
cur = self.conn.execute(
"""INSERT INTO variants (sprite_id, dimension_id, seed, job_id,
prompt_modifier, model, prompt_used, negative_used,
guidance_scale, steps, prompt_author, created_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
(
sprite_id, dimension_id, seed, job_id, prompt_modifier,
model, prompt_used, negative_used, guidance_scale, steps,
prompt_author, _now(),
),
)
return cur.lastrowid
def update_variant_status(
self,
variant_id: int,
job_status: str,
*,
raw_path: str | None = None,
processed_path: str | None = None,
generation_ms: int | None = None,
) -> None:
with self.conn:
sets = ["job_status=?"]
params: list = [job_status]
if raw_path is not None:
sets.append("raw_path=?")
params.append(raw_path)
if processed_path is not None:
sets.append("processed_path=?")
params.append(processed_path)
if generation_ms is not None:
sets.append("generation_ms=?")
params.append(generation_ms)
params.append(variant_id)
self.conn.execute(
f"UPDATE variants SET {', '.join(sets)} WHERE id=?", params
)
# When a variant completes, transition parent sprite to 'review'
if job_status == "completed":
row = self.conn.execute(
"SELECT sprite_id FROM variants WHERE id=?", (variant_id,)
).fetchone()
if row:
self.conn.execute(
"UPDATE sprites SET status='review', updated_at=? "
"WHERE id=? AND status IN ('needed', 'generating')",
(_now(), row["sprite_id"]),
)
def approve_variant(self, variant_id: int) -> None:
with self.conn:
row = self.conn.execute(
"SELECT sprite_id, dimension_id FROM variants WHERE id=?",
(variant_id,),
).fetchone()
if not row:
return
now = _now()
self.conn.execute(
"UPDATE variants SET is_approved=1 WHERE id=?", (variant_id,)
)
if row["dimension_id"] is not None:
self.conn.execute(
"""UPDATE sprite_dimensions SET status='approved',
approved_variant_id=?, updated_at=? WHERE id=?""",
(variant_id, now, row["dimension_id"]),
)
else:
self.conn.execute(
"UPDATE sprites SET status='approved', updated_at=? WHERE id=?",
(now, row["sprite_id"]),
)
def reject_variant(self, variant_id: int) -> None:
"""Mark a variant as rejected (rating = -1). Persists skip decisions."""
with self.conn:
self.conn.execute(
"UPDATE variants SET rating=-1 WHERE id=?", (variant_id,)
)
def get_variant(self, variant_id: int) -> dict | None:
row = self.conn.execute(
"SELECT * FROM variants WHERE id=?", (variant_id,)
).fetchone()
return dict(row) if row else None
def get_variants(
self, sprite_id: str, dimension_id: int | None = None
) -> list[dict]:
if dimension_id is not None:
rows = self.conn.execute(
"SELECT * FROM variants WHERE sprite_id=? AND dimension_id=? ORDER BY created_at",
(sprite_id, dimension_id),
).fetchall()
else:
rows = self.conn.execute(
"SELECT * FROM variants WHERE sprite_id=? ORDER BY created_at",
(sprite_id,),
).fetchall()
return [dict(r) for r in rows]
def get_pending_variants(self) -> list[dict]:
rows = self.conn.execute(
"SELECT * FROM variants WHERE job_status IN ('submitted', 'running') ORDER BY created_at"
).fetchall()
return [dict(r) for r in rows]
def get_recent_variants(self, limit: int = 30, since_id: int | None = None) -> list[dict]:
"""Recently completed variants with sprite metadata for the stream ticker."""
clauses = ["v.job_status = 'completed'", "v.raw_path IS NOT NULL"]
params: list[str | int] = []
if since_id is not None:
clauses.append("v.id > ?")
params.append(since_id)
where = " WHERE " + " AND ".join(clauses)
params.append(limit)
rows = self.conn.execute(
f"""SELECT v.id as variant_id, v.sprite_id, s.category, s.entity_id,
v.raw_path, v.processed_path, v.seed, v.created_at,
v.rating, v.notes, v.is_approved,
v.scored_by, v.review_tier
FROM variants v
JOIN sprites s ON v.sprite_id = s.id
{where}
ORDER BY v.id DESC
LIMIT ?""",
params,
).fetchall()
return [dict(r) for r in rows]
# -- stats -----------------------------------------------------------------
def get_stats(self) -> dict:
rows = self.conn.execute(
"SELECT category, status, COUNT(*) as cnt FROM sprites GROUP BY category, status"
).fetchall()
by_category: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
total: dict[str, int] = defaultdict(int)
for r in rows:
by_category[r["category"]][r["status"]] = r["cnt"]
total[r["status"]] += r["cnt"]
return {
"by_category": {k: dict(v) for k, v in by_category.items()},
"total": dict(total),
}
# -- generation runs -------------------------------------------------------
def start_run(self, category: str | None = None, variants_per: int = 8) -> int:
with self.conn:
cur = self.conn.execute(
"""INSERT INTO generation_runs (started_at, category, variants_per)
VALUES (?,?,?)""",
(_now(), category, variants_per),
)
return cur.lastrowid
def update_run(
self,
run_id: int,
completed_delta: int = 0,
failed_delta: int = 0,
finished: bool = False,
) -> None:
with self.conn:
sets = []
params: list = []
if completed_delta:
sets.append("completed = completed + ?")
params.append(completed_delta)
if failed_delta:
sets.append("failed = failed + ?")
params.append(failed_delta)
if finished:
sets.append("finished_at = ?")
params.append(_now())
if not sets:
return
params.append(run_id)
self.conn.execute(
f"UPDATE generation_runs SET {', '.join(sets)} WHERE id=?", params
)
def get_runs(self) -> list[dict]:
rows = self.conn.execute(
"SELECT * FROM generation_runs ORDER BY started_at DESC"
).fetchall()
return [dict(r) for r in rows]
# -- review queue ----------------------------------------------------------
def get_review_queue(self, limit: int = 50) -> list[dict]:
"""Sprites in 'review' with ranked variants, ordered by best confidence.
Returns sprites that have at least one variant with a rating.
Each sprite includes its top 3 variants sorted by rating desc.
"""
rows = self.conn.execute(
"""SELECT s.*, MAX(v.rating) as best_rating
FROM sprites s
JOIN variants v ON v.sprite_id = s.id
WHERE s.status = 'review' AND v.job_status = 'completed' AND v.rating IS NOT NULL
GROUP BY s.id
ORDER BY best_rating DESC, s.id
LIMIT ?""",
(limit,),
).fetchall()
result = []
for r in rows:
sprite = dict(r)
variants = self.conn.execute(
"""SELECT * FROM variants
WHERE sprite_id = ? AND job_status = 'completed' AND rating IS NOT NULL
ORDER BY rating DESC, id
LIMIT 3""",
(sprite["id"],),
).fetchall()
sprite["top_variants"] = [dict(v) for v in variants]
result.append(sprite)
return result
def get_progress(self) -> dict:
"""Overall pipeline progress for dashboard."""
status_counts = self.conn.execute(
"SELECT status, COUNT(*) as cnt FROM sprites GROUP BY status"
).fetchall()
total = sum(r["cnt"] for r in status_counts)
by_status = {r["status"]: r["cnt"] for r in status_counts}
category_counts = self.conn.execute(
"SELECT category, status, COUNT(*) as cnt FROM sprites GROUP BY category, status"
).fetchall()
by_category: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for r in category_counts:
by_category[r["category"]][r["status"]] = r["cnt"]
review_ready = self.conn.execute(
"""SELECT COUNT(DISTINCT s.id) as cnt
FROM sprites s
JOIN variants v ON v.sprite_id = s.id
WHERE s.status = 'review' AND v.rating IS NOT NULL"""
).fetchone()["cnt"]
return {
"total": total,
"by_status": by_status,
"by_category": {k: dict(v) for k, v in by_category.items()},
"review_ready": review_ready,
"installed_pct": round(by_status.get("installed", 0) / total * 100, 1) if total else 0,
}
# -- reconciliation --------------------------------------------------------
def reconcile_from_disk(self, raw_dir: Path, dry_run: bool = False) -> dict:
"""Scan raw/ for PNG files not tracked as completed, create missing records.
Filename convention: {category}_{entity}_{variant_id}.png
e.g. units_bowmen_dwarves_m_7510.png → sprite_id='units/bowmen_dwarves_m', variant_id=7510
"""
import re
files = sorted(raw_dir.glob("*.png"))
pattern = re.compile(r"^(.+?)_(\d+)\.png$")
# Build lookup of all known variant IDs and their status
existing = {}
for row in self.conn.execute("SELECT id, job_status, raw_path FROM variants").fetchall():
existing[row["id"]] = {"job_status": row["job_status"], "raw_path": row["raw_path"]}
# Build lookup of all sprite IDs
sprite_ids = {
row["id"] for row in self.conn.execute("SELECT id FROM sprites").fetchall()
}
already_tracked = 0
reconciled = 0
updated = 0
unmatched = []
unparseable = []
for f in files:
m = pattern.match(f.name)
if not m:
unparseable.append(f.name)
continue
prefix, vid_str = m.group(1), m.group(2)
variant_id = int(vid_str)
raw_path = str(f)
# Derive sprite_id: replace first _ with / (category separator)
# e.g. "units_bowmen_dwarves_m" → "units/bowmen_dwarves_m"
first_underscore = prefix.index("_")
category = prefix[:first_underscore]
entity = prefix[first_underscore + 1:]
sprite_id = f"{category}/{entity}"
if variant_id in existing:
rec = existing[variant_id]
if rec["job_status"] == "completed" and rec["raw_path"]:
already_tracked += 1
continue
# Exists but not completed — update it
if not dry_run:
self.conn.execute(
"UPDATE variants SET job_status='completed', raw_path=? WHERE id=?",
(raw_path, variant_id),
)
# Transition sprite to review if needed
self.conn.execute(
"UPDATE sprites SET status='review', updated_at=? "
"WHERE id=? AND status IN ('needed','generating')",
(_now(), sprite_id),
)
updated += 1
continue
# Not in DB at all — if sprite doesn't exist, delete the orphaned file
if sprite_id not in sprite_ids:
if not dry_run:
f.unlink()
unmatched.append({"file": f.name, "sprite_id": sprite_id, "variant_id": variant_id})
continue
if not dry_run:
self.conn.execute(
"""INSERT OR REPLACE INTO variants (id, sprite_id, seed, job_status, raw_path, created_at)
VALUES (?, ?, 0, 'completed', ?, ?)""",
(variant_id, sprite_id, raw_path, _now()),
)
self.conn.execute(
"UPDATE sprites SET status='review', updated_at=? "
"WHERE id=? AND status IN ('needed','generating')",
(_now(), sprite_id),
)
reconciled += 1
if not dry_run:
self.conn.commit()
return {
"disk_files": len(files),
"unparseable": len(unparseable),
"already_tracked": already_tracked,
"reconciled": reconciled,
"updated": updated,
"unmatched": len(unmatched),
"unmatched_details": unmatched[:20],
}
# -- cleanup ---------------------------------------------------------------
# -- per-scorer scorecards ------------------------------------------------
def store_score(
self,
variant_id: int,
scorer_name: str,
scorer_model: str,
tier: int,
result: dict,
raw_response: str | None = None,
) -> None:
import json as _json
with self.conn:
self.conn.execute(
"""INSERT INTO variant_scores
(variant_id, scorer_name, scorer_model, tier, gates, quality,
gate_passed, confidence, failed_gate_reason, quality_floor_failed,
raw_response, scored_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
variant_id, scorer_name, scorer_model, tier,
_json.dumps(result.get("gates", {})),
_json.dumps(result.get("quality", {})),
1 if result.get("gate_passed") else 0,
result.get("confidence", 0.0),
result.get("failed_gate_reason"),
1 if result.get("quality_floor_failed") else 0,
raw_response,
_now(),
),
)
def get_scores(self, variant_id: int) -> list[dict]:
"""Return all score history for a variant, newest first per scorer."""
rows = self.conn.execute(
"SELECT * FROM variant_scores WHERE variant_id = ? ORDER BY scored_at DESC",
(variant_id,),
).fetchall()
return [dict(r) for r in rows]
def get_latest_scores(self, variant_id: int) -> list[dict]:
"""Return only the most recent score per scorer for a variant."""
rows = self.conn.execute(
"SELECT * FROM latest_scores WHERE variant_id = ? ORDER BY tier",
(variant_id,),
).fetchall()
return [dict(r) for r in rows]
# -- pipeline dashboard ---------------------------------------------------
def get_pipeline_dashboard(self) -> dict:
"""Aggregate pipeline state for the dashboard UI.
Returns shape matching the frontend PipelineState interface:
funnel: { total_completed, total_processed, scoring: {name: {scored, passed, pass_rate, avg_confidence}}, approved, installed }
failed_gates: [{ gate, count }]
sprite_coverage: [{ sprite_id, entity_id, total_variants, processed, tier_counts: {name: {scored, passed}}, all_passed, deficit }]
recent_scores: [{ variant_id, sprite_id, scorer_name, gate_passed, confidence, scored_at }]
"""
scorer_names = ["qwen3", "haiku", "opus"]
# ── Funnel ───────────────────────────────────────────────────────────
total_completed = self.conn.execute(
"SELECT COUNT(*) FROM variants WHERE job_status = 'completed'"
).fetchone()[0]
total_processed = self.conn.execute(
"SELECT COUNT(*) FROM variants WHERE job_status = 'completed' AND processed_path IS NOT NULL"
).fetchone()[0]
scoring: dict[str, dict] = {}
for name in scorer_names:
row = self.conn.execute(
"""SELECT COUNT(*) as scored, SUM(gate_passed) as passed,
AVG(CASE WHEN gate_passed = 1 THEN confidence END) as avg_conf
FROM latest_scores WHERE scorer_name = ?""",
(name,),
).fetchone()
scored = row["scored"] or 0
passed = row["passed"] or 0
scoring[name] = {
"scored": scored,
"passed": passed,
"pass_rate": round(passed / scored, 4) if scored else 0.0,
"avg_confidence": round(row["avg_conf"] or 0.0, 3),
}
approved = self.conn.execute(
"SELECT COUNT(*) FROM variants WHERE is_approved = 1"
).fetchone()[0]
installed = self.conn.execute(
"SELECT COUNT(*) FROM sprites WHERE status = 'installed'"
).fetchone()[0]
funnel = {
"total_completed": total_completed,
"total_processed": total_processed,
"scoring": scoring,
"approved": approved,
"installed": installed,
}
# ── Failed gates (top 10 most common) ────────────────────────────────
gate_rows = self.conn.execute(
"""SELECT failed_gate_reason as gate, COUNT(*) as count
FROM latest_scores
WHERE gate_passed = 0 AND failed_gate_reason IS NOT NULL
AND failed_gate_reason != ''
GROUP BY failed_gate_reason ORDER BY count DESC LIMIT 10"""
).fetchall()
failed_gates = [{"gate": r["gate"], "count": r["count"]} for r in gate_rows]
# ── Sprite coverage (units) ──────────────────────────────────────────
target = 3
unit_rows = self.conn.execute(
"SELECT id, entity_id FROM sprites WHERE category = 'units' ORDER BY entity_id"
).fetchall()
sprite_coverage: list[dict] = []
for sr in unit_rows:
sid = sr["id"]
eid = sr["entity_id"]
total_variants = self.conn.execute(
"SELECT COUNT(*) FROM variants WHERE sprite_id = ? AND job_status = 'completed'",
(sid,),
).fetchone()[0]
processed = self.conn.execute(
"SELECT COUNT(*) FROM variants WHERE sprite_id = ? AND job_status = 'completed' AND processed_path IS NOT NULL",
(sid,),
).fetchone()[0]
tier_counts: dict[str, dict] = {}
for name in scorer_names:
scored = self.conn.execute(
"""SELECT COUNT(DISTINCT ls.variant_id) FROM latest_scores ls
JOIN variants v ON ls.variant_id = v.id
WHERE v.sprite_id = ? AND ls.scorer_name = ?""",
(sid, name),
).fetchone()[0]
passed = self.conn.execute(
"""SELECT COUNT(DISTINCT ls.variant_id) FROM latest_scores ls
JOIN variants v ON ls.variant_id = v.id
WHERE v.sprite_id = ? AND ls.scorer_name = ? AND ls.gate_passed = 1""",
(sid, name),
).fetchone()[0]
tier_counts[name] = {"scored": scored, "passed": passed}
all_passed = self.conn.execute(
"""SELECT COUNT(DISTINCT v.id) FROM variants v
WHERE v.sprite_id = ? AND v.job_status = 'completed'
AND EXISTS (SELECT 1 FROM latest_scores ls WHERE ls.variant_id = v.id)
AND NOT EXISTS (SELECT 1 FROM latest_scores ls WHERE ls.variant_id = v.id AND ls.gate_passed = 0)""",
(sid,),
).fetchone()[0]
sprite_coverage.append({
"sprite_id": sid,
"entity_id": eid,
"total_variants": total_variants,
"processed": processed,
"tier_counts": tier_counts,
"all_passed": all_passed,
"deficit": max(0, target - all_passed),
})
# ── Recent scores (last 10) ──────────────────────────────────────────
recent_rows = self.conn.execute(
"""SELECT vs.variant_id, v.sprite_id, vs.scorer_name,
vs.gate_passed, vs.confidence, vs.scored_at
FROM variant_scores vs
JOIN variants v ON vs.variant_id = v.id
ORDER BY vs.id DESC LIMIT 10"""
).fetchall()
recent_scores = [
{
"variant_id": r["variant_id"],
"sprite_id": r["sprite_id"],
"scorer_name": r["scorer_name"],
"gate_passed": bool(r["gate_passed"]),
"confidence": r["confidence"],
"scored_at": r["scored_at"],
}
for r in recent_rows
]
return {
"funnel": funnel,
"failed_gates": failed_gates,
"sprite_coverage": sprite_coverage,
"recent_scores": recent_scores,
}
# -- cleanup ---------------------------------------------------------------
def close(self) -> None:
self.conn.close()