magicciv/tools/sprite-generation/engine/registry.py
Claude Code 8f011cf82e chore(pages): 🔧 Update build script for failed page deployment
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-03-30 08:50:21 -07:00

1580 lines
64 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""SQLite-backed sprite registry tracking ~4,500 sprites through their lifecycle."""
from __future__ import annotations
import sqlite3
import threading
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
def _now() -> str:
return datetime.now(timezone.utc).isoformat()
_SCHEMA = """
CREATE TABLE IF NOT EXISTS sprites (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
entity_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'needed',
sprite_path TEXT,
install_path TEXT,
prompt TEXT,
negative_prompt TEXT,
gen_width INTEGER DEFAULT 1024,
gen_height INTEGER DEFAULT 512,
target_width INTEGER DEFAULT 384,
target_height INTEGER DEFAULT 332,
source_file TEXT,
dimensions TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS sprite_dimensions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sprite_id TEXT NOT NULL REFERENCES sprites(id) ON DELETE CASCADE,
dimension_type TEXT NOT NULL,
dimension_value TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'needed',
prompt_modifier TEXT,
install_path TEXT,
approved_variant_id INTEGER REFERENCES variants(id),
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(sprite_id, dimension_type, dimension_value)
);
CREATE TABLE IF NOT EXISTS variants (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sprite_id TEXT NOT NULL REFERENCES sprites(id) ON DELETE CASCADE,
dimension_id INTEGER REFERENCES sprite_dimensions(id) ON DELETE CASCADE,
seed INTEGER NOT NULL,
job_id TEXT,
job_status TEXT NOT NULL DEFAULT 'submitted',
raw_path TEXT,
processed_path TEXT,
prompt_modifier TEXT,
model TEXT,
prompt_used TEXT,
negative_used TEXT,
guidance_scale REAL,
steps INTEGER,
prompt_author TEXT,
is_approved INTEGER NOT NULL DEFAULT 0,
rating INTEGER,
notes TEXT,
generation_ms INTEGER,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS variant_scores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
variant_id INTEGER NOT NULL REFERENCES variants(id) ON DELETE CASCADE,
scorer_name TEXT NOT NULL,
scorer_model TEXT NOT NULL,
tier INTEGER NOT NULL,
gates TEXT,
quality TEXT,
gate_passed INTEGER NOT NULL DEFAULT 0,
confidence REAL NOT NULL DEFAULT 0.0,
failed_gate_reason TEXT,
quality_floor_failed INTEGER NOT NULL DEFAULT 0,
raw_response TEXT,
scored_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS generation_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at TEXT NOT NULL,
finished_at TEXT,
category TEXT,
total_jobs INTEGER NOT NULL DEFAULT 0,
completed INTEGER NOT NULL DEFAULT 0,
failed INTEGER NOT NULL DEFAULT 0,
variants_per INTEGER NOT NULL DEFAULT 8
);
CREATE TABLE IF NOT EXISTS prompt_templates (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
template TEXT NOT NULL,
negative TEXT NOT NULL,
version INTEGER NOT NULL DEFAULT 1,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sprites_category ON sprites(category);
CREATE INDEX IF NOT EXISTS idx_sprites_status ON sprites(status);
CREATE INDEX IF NOT EXISTS idx_sprite_dimensions_sprite ON sprite_dimensions(sprite_id);
CREATE INDEX IF NOT EXISTS idx_variants_sprite ON variants(sprite_id);
CREATE INDEX IF NOT EXISTS idx_variants_dimension ON variants(dimension_id);
CREATE INDEX IF NOT EXISTS idx_variants_job_status ON variants(job_status);
CREATE INDEX IF NOT EXISTS idx_variant_scores_variant ON variant_scores(variant_id);
CREATE INDEX IF NOT EXISTS idx_variant_scores_scorer ON variant_scores(scorer_name);
CREATE INDEX IF NOT EXISTS idx_variant_scores_variant_scorer ON variant_scores(variant_id, scorer_name);
CREATE INDEX IF NOT EXISTS idx_variants_raw_path ON variants(raw_path);
-- Latest score per (variant, scorer): used by all pass/fail decisions
CREATE VIEW IF NOT EXISTS latest_scores AS
SELECT vs.*
FROM variant_scores vs
WHERE vs.id = (
SELECT MAX(id) FROM variant_scores
WHERE variant_id = vs.variant_id AND scorer_name = vs.scorer_name
);
-- Seeds that produced high-quality variants: fed back into generator.
-- UNIQUE(seed, category) — one entry per seed-category pair; INSERT OR REPLACE
-- updates when a better score is found.
CREATE TABLE IF NOT EXISTS seed_pool (
id INTEGER PRIMARY KEY AUTOINCREMENT,
seed INTEGER NOT NULL,
category TEXT NOT NULL,
entity_id TEXT NOT NULL,
avg_quality REAL NOT NULL DEFAULT 0.0,
gate_pass_rate REAL NOT NULL DEFAULT 1.0,
best_scorer TEXT,
variant_id INTEGER NOT NULL REFERENCES variants(id) ON DELETE CASCADE,
is_approved INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
UNIQUE(seed, category)
);
-- Per-entity generation hints: aggregated best guidance + modifier indices.
CREATE TABLE IF NOT EXISTS generation_hints (
id INTEGER PRIMARY KEY AUTOINCREMENT,
entity_id TEXT NOT NULL,
category TEXT NOT NULL,
best_guidance REAL,
best_modifier_indices TEXT,
avg_quality REAL,
sample_count INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL,
UNIQUE(entity_id, category)
);
-- Scorer calibration: human approval data vs scorer decisions.
CREATE TABLE IF NOT EXISTS scorer_calibration (
scorer_name TEXT NOT NULL,
category TEXT NOT NULL,
false_positive_rate REAL,
true_positive_rate REAL,
recommended_threshold REAL,
sample_count INTEGER NOT NULL DEFAULT 0,
computed_at TEXT NOT NULL,
PRIMARY KEY(scorer_name, category)
);
CREATE INDEX IF NOT EXISTS idx_seed_pool_category ON seed_pool(category);
CREATE INDEX IF NOT EXISTS idx_seed_pool_quality ON seed_pool(avg_quality DESC);
CREATE INDEX IF NOT EXISTS idx_generation_hints_entity ON generation_hints(entity_id, category);
"""
class SpriteRegistry:
def __init__(self, db_path: Path) -> None:
db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(str(db_path), check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self._lock = threading.Lock()
self.conn.execute("PRAGMA journal_mode=WAL")
self.conn.execute("PRAGMA foreign_keys=ON")
self.conn.executescript(_SCHEMA)
self._migrate()
def _migrate(self) -> None:
# variants column additions
existing_v = {row[1] for row in self.conn.execute("PRAGMA table_info(variants)").fetchall()}
for col, typ in [
("model", "TEXT"),
("prompt_used", "TEXT"),
("negative_used", "TEXT"),
("guidance_scale", "REAL"),
("steps", "INTEGER"),
("prompt_author", "TEXT"),
("scored_by", "TEXT"),
("scored_at", "TEXT"),
("review_tier", "INTEGER DEFAULT 0"),
("reject_reason", "TEXT"),
]:
if col not in existing_v:
self.conn.execute(f"ALTER TABLE variants ADD COLUMN {col} {typ}")
# Drop UNIQUE(variant_id, scorer_name) from variant_scores if present.
# SQLite doesn't support DROP CONSTRAINT — we recreate the table without it.
idxs = {row[1] for row in self.conn.execute(
"SELECT * FROM sqlite_master WHERE type='index' AND tbl_name='variant_scores'"
).fetchall()}
if any("unique" in idx.lower() or "sqlite_autoindex" in idx.lower() for idx in idxs):
self.conn.executescript("""
BEGIN;
DROP VIEW IF EXISTS latest_scores;
CREATE TABLE IF NOT EXISTS variant_scores_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
variant_id INTEGER NOT NULL REFERENCES variants(id) ON DELETE CASCADE,
scorer_name TEXT NOT NULL,
scorer_model TEXT NOT NULL,
tier INTEGER NOT NULL,
gates TEXT,
quality TEXT,
gate_passed INTEGER NOT NULL DEFAULT 0,
confidence REAL NOT NULL DEFAULT 0.0,
failed_gate_reason TEXT,
quality_floor_failed INTEGER NOT NULL DEFAULT 0,
raw_response TEXT,
scored_at TEXT NOT NULL
);
INSERT OR IGNORE INTO variant_scores_new
SELECT id, variant_id, scorer_name, scorer_model, tier,
gates, quality, gate_passed, confidence,
failed_gate_reason, quality_floor_failed, raw_response, scored_at
FROM variant_scores;
DROP TABLE variant_scores;
ALTER TABLE variant_scores_new RENAME TO variant_scores;
CREATE INDEX IF NOT EXISTS idx_variant_scores_variant
ON variant_scores(variant_id);
CREATE INDEX IF NOT EXISTS idx_variant_scores_scorer
ON variant_scores(scorer_name);
CREATE INDEX IF NOT EXISTS idx_variant_scores_variant_scorer
ON variant_scores(variant_id, scorer_name);
CREATE VIEW latest_scores AS
SELECT vs.* FROM variant_scores vs
WHERE vs.id = (
SELECT MAX(id) FROM variant_scores
WHERE variant_id = vs.variant_id AND scorer_name = vs.scorer_name
);
COMMIT;
""")
# Ensure latest_scores view exists (CREATE VIEW is idempotent via IF NOT EXISTS)
self.conn.executescript("""
CREATE VIEW IF NOT EXISTS latest_scores AS
SELECT vs.*
FROM variant_scores vs
WHERE vs.id = (
SELECT MAX(id) FROM variant_scores
WHERE variant_id = vs.variant_id AND scorer_name = vs.scorer_name
);
""")
# Create new tables if they don't exist (for existing DBs)
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS seed_pool (
id INTEGER PRIMARY KEY AUTOINCREMENT,
seed INTEGER NOT NULL,
category TEXT NOT NULL,
entity_id TEXT NOT NULL,
avg_quality REAL NOT NULL DEFAULT 0.0,
gate_pass_rate REAL NOT NULL DEFAULT 1.0,
best_scorer TEXT,
variant_id INTEGER NOT NULL REFERENCES variants(id) ON DELETE CASCADE,
is_approved INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
UNIQUE(seed, category)
);
CREATE TABLE IF NOT EXISTS generation_hints (
id INTEGER PRIMARY KEY AUTOINCREMENT,
entity_id TEXT NOT NULL,
category TEXT NOT NULL,
best_guidance REAL,
best_modifier_indices TEXT,
avg_quality REAL,
sample_count INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL,
UNIQUE(entity_id, category)
);
CREATE TABLE IF NOT EXISTS scorer_calibration (
scorer_name TEXT NOT NULL,
category TEXT NOT NULL,
false_positive_rate REAL,
true_positive_rate REAL,
recommended_threshold REAL,
sample_count INTEGER NOT NULL DEFAULT 0,
computed_at TEXT NOT NULL,
PRIMARY KEY(scorer_name, category)
);
CREATE INDEX IF NOT EXISTS idx_seed_pool_category ON seed_pool(category);
CREATE INDEX IF NOT EXISTS idx_seed_pool_quality ON seed_pool(avg_quality DESC);
CREATE INDEX IF NOT EXISTS idx_generation_hints_entity ON generation_hints(entity_id, category);
""")
self.conn.commit()
# -- sprites ---------------------------------------------------------------
def upsert_sprite(
self,
id: str,
category: str,
entity_id: str,
*,
status: str = "needed",
sprite_path: str | None = None,
install_path: str | None = None,
prompt: str | None = None,
negative_prompt: str | None = None,
gen_width: int = 1024,
gen_height: int = 512,
target_width: int = 384,
target_height: int = 332,
source_file: str | None = None,
dimensions: str | None = None,
) -> bool:
now = _now()
with self.conn:
existing = self.conn.execute(
"SELECT id FROM sprites WHERE id = ?", (id,)
).fetchone()
if existing:
self.conn.execute(
"""UPDATE sprites SET category=?, entity_id=?, sprite_path=?,
install_path=?, prompt=?, negative_prompt=?, gen_width=?,
gen_height=?, target_width=?, target_height=?, source_file=?,
dimensions=?, updated_at=?
WHERE id=?""",
(
category, entity_id, sprite_path, install_path, prompt,
negative_prompt, gen_width, gen_height, target_width,
target_height, source_file, dimensions, now, id,
),
)
return False
self.conn.execute(
"""INSERT INTO sprites (id, category, entity_id, status, sprite_path,
install_path, prompt, negative_prompt, gen_width, gen_height,
target_width, target_height, source_file, dimensions,
created_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
id, category, entity_id, status, sprite_path, install_path,
prompt, negative_prompt, gen_width, gen_height, target_width,
target_height, source_file, dimensions, now, now,
),
)
return True
def update_sprite_status(self, sprite_id: str, status: str) -> None:
with self.conn:
self.conn.execute(
"UPDATE sprites SET status=?, updated_at=? WHERE id=?",
(status, _now(), sprite_id),
)
def get_sprite(self, sprite_id: str) -> dict | None:
row = self.conn.execute(
"SELECT * FROM sprites WHERE id = ?", (sprite_id,)
).fetchone()
if not row:
return None
sprite = dict(row)
dims = self.conn.execute(
"SELECT * FROM sprite_dimensions WHERE sprite_id = ? ORDER BY dimension_type, dimension_value",
(sprite_id,),
).fetchall()
sprite["dimensions_list"] = [dict(d) for d in dims]
variants = self.conn.execute(
"SELECT * FROM variants WHERE sprite_id = ? AND dimension_id IS NULL ORDER BY created_at",
(sprite_id,),
).fetchall()
sprite["variants"] = [dict(v) for v in variants]
for dim in sprite["dimensions_list"]:
dim_variants = self.conn.execute(
"SELECT * FROM variants WHERE dimension_id = ? ORDER BY created_at",
(dim["id"],),
).fetchall()
dim["variants"] = [dict(v) for v in dim_variants]
return sprite
def get_sprites(
self,
category: str | None = None,
status: str | None = None,
search: str | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict]:
clauses: list[str] = []
params: list[str | int] = []
if category:
clauses.append("category = ?")
params.append(category)
if status:
clauses.append("status = ?")
params.append(status)
if search:
clauses.append("(id LIKE ? OR entity_id LIKE ? OR prompt LIKE ?)")
term = f"%{search}%"
params.extend([term, term, term])
where = f" WHERE {' AND '.join(clauses)}" if clauses else ""
params.extend([limit, offset])
rows = self.conn.execute(
f"SELECT * FROM sprites{where} ORDER BY category, id LIMIT ? OFFSET ?",
params,
).fetchall()
return [dict(r) for r in rows]
def reject_sprite(self, sprite_id: str, dimension_id: int | None = None) -> None:
now = _now()
with self.conn:
if dimension_id is not None:
self.conn.execute(
"UPDATE variants SET is_approved=0, rating=-1 WHERE dimension_id=?",
(dimension_id,),
)
self.conn.execute(
"UPDATE sprite_dimensions SET status='needed', approved_variant_id=NULL, updated_at=? WHERE id=?",
(now, dimension_id),
)
else:
self.conn.execute(
"UPDATE variants SET is_approved=0, rating=-1 WHERE sprite_id=? AND dimension_id IS NULL",
(sprite_id,),
)
self.conn.execute(
"UPDATE sprites SET status='needed', updated_at=? WHERE id=?",
(now, sprite_id),
)
def get_approved_uninstalled(self, category: str | None = None) -> list[dict]:
clauses = ["status = 'approved'"]
params: list[str] = []
if category:
clauses.append("category = ?")
params.append(category)
where = " WHERE " + " AND ".join(clauses)
rows = self.conn.execute(
f"SELECT * FROM sprites{where} ORDER BY category, id", params
).fetchall()
results = [dict(r) for r in rows]
# Also include dimensions that are approved but not installed
dim_clauses = ["sd.status = 'approved'"]
dim_params: list[str] = []
if category:
dim_clauses.append("s.category = ?")
dim_params.append(category)
dim_where = " WHERE " + " AND ".join(dim_clauses)
dim_rows = self.conn.execute(
f"""SELECT sd.*, s.category, s.entity_id FROM sprite_dimensions sd
JOIN sprites s ON sd.sprite_id = s.id{dim_where}
ORDER BY s.category, sd.sprite_id""",
dim_params,
).fetchall()
for r in results:
r["approved_dimensions"] = [
dict(d) for d in dim_rows if d["sprite_id"] == r["id"]
]
return results
def mark_installed(self, sprite_id: str, dimension_id: int | None = None) -> None:
now = _now()
with self.conn:
if dimension_id is not None:
self.conn.execute(
"UPDATE sprite_dimensions SET status='installed', updated_at=? WHERE id=?",
(now, dimension_id),
)
else:
self.conn.execute(
"UPDATE sprites SET status='installed', updated_at=? WHERE id=?",
(now, sprite_id),
)
# -- dimensions ------------------------------------------------------------
def upsert_dimension(
self,
sprite_id: str,
dimension_type: str,
dimension_value: str,
*,
status: str = "needed",
prompt_modifier: str | None = None,
install_path: str | None = None,
) -> int:
now = _now()
with self.conn:
existing = self.conn.execute(
"""SELECT id FROM sprite_dimensions
WHERE sprite_id=? AND dimension_type=? AND dimension_value=?""",
(sprite_id, dimension_type, dimension_value),
).fetchone()
if existing:
self.conn.execute(
"""UPDATE sprite_dimensions SET prompt_modifier=?,
install_path=?, updated_at=? WHERE id=?""",
(prompt_modifier, install_path, now, existing["id"]),
)
return existing["id"]
cur = self.conn.execute(
"""INSERT INTO sprite_dimensions (sprite_id, dimension_type,
dimension_value, status, prompt_modifier, install_path,
created_at, updated_at)
VALUES (?,?,?,?,?,?,?,?)""",
(
sprite_id, dimension_type, dimension_value, status,
prompt_modifier, install_path, now, now,
),
)
return cur.lastrowid
def update_dimension_status(self, dimension_id: int, status: str) -> None:
with self.conn:
self.conn.execute(
"UPDATE sprite_dimensions SET status=?, updated_at=? WHERE id=?",
(status, _now(), dimension_id),
)
# -- variants --------------------------------------------------------------
def add_variant(
self,
sprite_id: str,
seed: int,
dimension_id: int | None = None,
prompt_modifier: str | None = None,
job_id: str | None = None,
model: str | None = None,
prompt_used: str | None = None,
negative_used: str | None = None,
guidance_scale: float | None = None,
steps: int | None = None,
prompt_author: str | None = None,
) -> int:
with self.conn:
cur = self.conn.execute(
"""INSERT INTO variants (sprite_id, dimension_id, seed, job_id,
prompt_modifier, model, prompt_used, negative_used,
guidance_scale, steps, prompt_author, created_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
(
sprite_id, dimension_id, seed, job_id, prompt_modifier,
model, prompt_used, negative_used, guidance_scale, steps,
prompt_author, _now(),
),
)
return cur.lastrowid
def update_variant_status(
self,
variant_id: int,
job_status: str,
*,
raw_path: str | None = None,
processed_path: str | None = None,
generation_ms: int | None = None,
) -> None:
with self.conn:
sets = ["job_status=?"]
params: list = [job_status]
if raw_path is not None:
sets.append("raw_path=?")
params.append(raw_path)
if processed_path is not None:
sets.append("processed_path=?")
params.append(processed_path)
if generation_ms is not None:
sets.append("generation_ms=?")
params.append(generation_ms)
params.append(variant_id)
self.conn.execute(
f"UPDATE variants SET {', '.join(sets)} WHERE id=?", params
)
# When a variant completes, transition parent sprite to 'review'
if job_status == "completed":
row = self.conn.execute(
"SELECT sprite_id FROM variants WHERE id=?", (variant_id,)
).fetchone()
if row:
self.conn.execute(
"UPDATE sprites SET status='review', updated_at=? "
"WHERE id=? AND status IN ('needed', 'generating')",
(_now(), row["sprite_id"]),
)
def approve_variant(self, variant_id: int) -> None:
with self.conn:
row = self.conn.execute(
"SELECT sprite_id, dimension_id FROM variants WHERE id=?",
(variant_id,),
).fetchone()
if not row:
return
now = _now()
self.conn.execute(
"UPDATE variants SET is_approved=1 WHERE id=?", (variant_id,)
)
if row["dimension_id"] is not None:
self.conn.execute(
"""UPDATE sprite_dimensions SET status='approved',
approved_variant_id=?, updated_at=? WHERE id=?""",
(variant_id, now, row["dimension_id"]),
)
else:
self.conn.execute(
"UPDATE sprites SET status='approved', updated_at=? WHERE id=?",
(now, row["sprite_id"]),
)
def reject_variant(self, variant_id: int, reason: str | None = None) -> None:
"""Mark a variant as rejected (rating = -1). Persists skip decisions."""
with self.conn:
self.conn.execute(
"UPDATE variants SET rating=-1, reject_reason=? WHERE id=?",
(reason, variant_id),
)
def get_variant(self, variant_id: int) -> dict | None:
row = self.conn.execute(
"SELECT * FROM variants WHERE id=?", (variant_id,)
).fetchone()
return dict(row) if row else None
def get_variants(
self, sprite_id: str, dimension_id: int | None = None
) -> list[dict]:
if dimension_id is not None:
rows = self.conn.execute(
"SELECT * FROM variants WHERE sprite_id=? AND dimension_id=? ORDER BY created_at",
(sprite_id, dimension_id),
).fetchall()
else:
rows = self.conn.execute(
"SELECT * FROM variants WHERE sprite_id=? ORDER BY created_at",
(sprite_id,),
).fetchall()
return [dict(r) for r in rows]
def get_pending_variants(self) -> list[dict]:
rows = self.conn.execute(
"SELECT * FROM variants WHERE job_status IN ('submitted', 'running') ORDER BY created_at"
).fetchall()
return [dict(r) for r in rows]
def get_recent_variants(self, limit: int = 30, since_id: int | None = None) -> list[dict]:
"""Recently completed variants with sprite metadata for the stream ticker."""
clauses = ["v.job_status = 'completed'", "v.raw_path IS NOT NULL"]
params: list[str | int] = []
if since_id is not None:
clauses.append("v.id > ?")
params.append(since_id)
where = " WHERE " + " AND ".join(clauses)
params.append(limit)
rows = self.conn.execute(
f"""SELECT v.id as variant_id, v.sprite_id, s.category, s.entity_id,
v.raw_path, v.processed_path, v.seed, v.created_at,
v.rating, v.notes, v.is_approved,
v.scored_by, v.review_tier
FROM variants v
JOIN sprites s ON v.sprite_id = s.id
{where}
ORDER BY v.id DESC
LIMIT ?""",
params,
).fetchall()
return [dict(r) for r in rows]
def get_review_variants(self, limit: int = 500) -> list[dict]:
"""Variants that passed scoring and are ready for human review.
Returns completed variants with rating >= 1 and not yet approved,
ordered by rating descending. Same shape as get_recent_variants.
"""
rows = self.conn.execute(
"""SELECT v.id as variant_id, v.sprite_id, s.category, s.entity_id,
v.raw_path, v.processed_path, v.seed, v.created_at,
v.rating, v.notes, v.is_approved,
v.scored_by, v.review_tier
FROM variants v
JOIN sprites s ON v.sprite_id = s.id
WHERE v.job_status = 'completed'
AND v.raw_path IS NOT NULL
AND v.rating >= 1
AND v.is_approved = 0
ORDER BY v.rating DESC, v.id DESC
LIMIT ?""",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def query_variants(
self,
mode: str = "all",
limit: int = 50,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Paginated variant query for browse/theater UIs.
mode='all' — all completed variants with images
mode='completed' — same as 'all'
mode='review' — rating >= 1 and not yet approved
mode='processed' — background-removed variants
mode='approved' — approved variants
mode='installed' — installed into game assets
mode='scored_<name>' — passed a specific scorer's gate (e.g. 'scored_qwen3')
Returns (items, total_count).
"""
extra_params: list = []
if mode in ("all", "completed"):
base_where = "v.job_status = 'completed' AND v.raw_path IS NOT NULL"
elif mode == "review":
base_where = (
"v.job_status = 'completed' AND v.raw_path IS NOT NULL"
" AND v.rating >= 1 AND v.is_approved = 0"
)
elif mode == "processed":
base_where = "v.processed_path IS NOT NULL"
elif mode == "approved":
base_where = "v.is_approved = 1"
elif mode == "installed":
base_where = "v.job_status = 'installed'"
elif mode.startswith("scored_"):
scorer = mode[len("scored_"):]
base_where = (
"EXISTS ("
" SELECT 1 FROM latest_scores ls"
" WHERE ls.variant_id = v.id AND ls.scorer_name = ? AND ls.gate_passed = 1"
")"
)
extra_params.append(scorer)
else:
base_where = "v.job_status = 'completed' AND v.raw_path IS NOT NULL"
total = self.conn.execute(
f"SELECT COUNT(*) FROM variants v WHERE {base_where}",
extra_params,
).fetchone()[0]
rows = self.conn.execute(
f"""SELECT v.id as variant_id, v.sprite_id, s.category, s.entity_id,
v.raw_path, v.processed_path, v.seed, v.created_at,
v.rating, v.notes, v.is_approved,
v.scored_by, v.review_tier,
best.quality AS quality_json,
best.gates AS gates_json,
best.scorer_name AS quality_scorer
FROM variants v
JOIN sprites s ON v.sprite_id = s.id
LEFT JOIN (
SELECT ls.variant_id, ls.quality, ls.gates, ls.scorer_name
FROM latest_scores ls
WHERE ls.gate_passed = 1
AND ls.tier = (
SELECT MAX(ls2.tier)
FROM latest_scores ls2
WHERE ls2.variant_id = ls.variant_id AND ls2.gate_passed = 1
)
) best ON best.variant_id = v.id
WHERE {base_where}
ORDER BY v.id DESC
LIMIT ? OFFSET ?""",
extra_params + [limit, offset],
).fetchall()
return [dict(r) for r in rows], total
# -- stats -----------------------------------------------------------------
def get_stats(self) -> dict:
rows = self.conn.execute(
"SELECT category, status, COUNT(*) as cnt FROM sprites GROUP BY category, status"
).fetchall()
by_category: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
total: dict[str, int] = defaultdict(int)
for r in rows:
by_category[r["category"]][r["status"]] = r["cnt"]
total[r["status"]] += r["cnt"]
return {
"by_category": {k: dict(v) for k, v in by_category.items()},
"total": dict(total),
}
# -- generation runs -------------------------------------------------------
def start_run(self, category: str | None = None, variants_per: int = 8) -> int:
with self.conn:
cur = self.conn.execute(
"""INSERT INTO generation_runs (started_at, category, variants_per)
VALUES (?,?,?)""",
(_now(), category, variants_per),
)
return cur.lastrowid
def update_run(
self,
run_id: int,
completed_delta: int = 0,
failed_delta: int = 0,
finished: bool = False,
) -> None:
with self.conn:
sets = []
params: list = []
if completed_delta:
sets.append("completed = completed + ?")
params.append(completed_delta)
if failed_delta:
sets.append("failed = failed + ?")
params.append(failed_delta)
if finished:
sets.append("finished_at = ?")
params.append(_now())
if not sets:
return
params.append(run_id)
self.conn.execute(
f"UPDATE generation_runs SET {', '.join(sets)} WHERE id=?", params
)
def get_runs(self) -> list[dict]:
rows = self.conn.execute(
"SELECT * FROM generation_runs ORDER BY started_at DESC"
).fetchall()
return [dict(r) for r in rows]
# -- review queue ----------------------------------------------------------
def get_review_queue(self, limit: int = 50) -> list[dict]:
"""Sprites in 'review' with ranked variants, ordered by best confidence.
Returns sprites that have at least one variant with a rating.
Each sprite includes its top 3 variants sorted by rating desc.
"""
rows = self.conn.execute(
"""SELECT s.*, MAX(v.rating) as best_rating
FROM sprites s
JOIN variants v ON v.sprite_id = s.id
WHERE s.status = 'review' AND v.job_status = 'completed' AND v.rating IS NOT NULL
GROUP BY s.id
ORDER BY best_rating DESC, s.id
LIMIT ?""",
(limit,),
).fetchall()
result = []
for r in rows:
sprite = dict(r)
variants = self.conn.execute(
"""SELECT * FROM variants
WHERE sprite_id = ? AND job_status = 'completed' AND rating IS NOT NULL
ORDER BY rating DESC, id
LIMIT 3""",
(sprite["id"],),
).fetchall()
sprite["top_variants"] = [dict(v) for v in variants]
result.append(sprite)
return result
def get_progress(self) -> dict:
"""Overall pipeline progress for dashboard."""
status_counts = self.conn.execute(
"SELECT status, COUNT(*) as cnt FROM sprites GROUP BY status"
).fetchall()
total = sum(r["cnt"] for r in status_counts)
by_status = {r["status"]: r["cnt"] for r in status_counts}
category_counts = self.conn.execute(
"SELECT category, status, COUNT(*) as cnt FROM sprites GROUP BY category, status"
).fetchall()
by_category: dict[str, dict[str, int]] = defaultdict(lambda: defaultdict(int))
for r in category_counts:
by_category[r["category"]][r["status"]] = r["cnt"]
review_ready = self.conn.execute(
"""SELECT COUNT(DISTINCT s.id) as cnt
FROM sprites s
JOIN variants v ON v.sprite_id = s.id
WHERE s.status = 'review' AND v.rating IS NOT NULL"""
).fetchone()["cnt"]
return {
"total": total,
"by_status": by_status,
"by_category": {k: dict(v) for k, v in by_category.items()},
"review_ready": review_ready,
"installed_pct": round(by_status.get("installed", 0) / total * 100, 1) if total else 0,
}
# -- reconciliation --------------------------------------------------------
def reconcile_from_disk(self, raw_dir: Path, dry_run: bool = False) -> dict:
"""Scan raw/ for PNG files not tracked as completed, create missing records.
Filename convention: {category}_{entity}_{suffix}.png
e.g. units_bowmen_dwarves_m_7510.png → sprite_id='units/bowmen_dwarves_m'
The trailing number is used only to derive the sprite_id prefix — it is NOT
used as the DB variant ID. Idempotency is keyed on raw_path so that two files
with the same trailing number for different sprites never stomp each other.
"""
import re
files = sorted(raw_dir.glob("*.png"))
pattern = re.compile(r"^(.+?)_(\d+)\.png$")
# Idempotency: track which absolute paths are already recorded as completed.
# Keyed on raw_path so variant IDs from different DB lifetimes never collide.
tracked_paths: set[str] = {
row["raw_path"]
for row in self.conn.execute(
"SELECT raw_path FROM variants WHERE raw_path IS NOT NULL AND job_status = 'completed'"
).fetchall()
}
# Build lookup of all sprite IDs
sprite_ids = {
row["id"] for row in self.conn.execute("SELECT id FROM sprites").fetchall()
}
already_tracked = 0
reconciled = 0
unmatched = []
unparseable = []
for f in files:
m = pattern.match(f.name)
if not m:
unparseable.append(f.name)
continue
prefix = m.group(1)
raw_path = str(f)
# Derive sprite_id: replace first _ with / (category separator)
# e.g. "units_bowmen_dwarves_m" → "units/bowmen_dwarves_m"
first_underscore = prefix.index("_")
category = prefix[:first_underscore]
entity = prefix[first_underscore + 1:]
sprite_id = f"{category}/{entity}"
# Already fully tracked — skip
if raw_path in tracked_paths:
already_tracked += 1
continue
# Orphaned file — no matching sprite in the registry
if sprite_id not in sprite_ids:
if not dry_run:
f.unlink()
unmatched.append({"file": f.name, "sprite_id": sprite_id})
continue
if not dry_run:
# Insert a fresh variant record — let autoincrement assign the ID.
# Never force a specific ID from the filename: that caused silent data
# stomping when two sprites had files with the same trailing number.
self.conn.execute(
"""INSERT INTO variants (sprite_id, seed, job_status, raw_path, created_at)
VALUES (?, 0, 'completed', ?, ?)""",
(sprite_id, raw_path, _now()),
)
self.conn.execute(
"UPDATE sprites SET status='review', updated_at=? "
"WHERE id=? AND status IN ('needed','generating')",
(_now(), sprite_id),
)
reconciled += 1
if not dry_run:
self.conn.commit()
return {
"disk_files": len(files),
"unparseable": len(unparseable),
"already_tracked": already_tracked,
"reconciled": reconciled,
"updated": 0,
"unmatched": len(unmatched),
"unmatched_details": unmatched[:20],
}
# -- cleanup ---------------------------------------------------------------
# -- per-scorer scorecards ------------------------------------------------
def store_score(
self,
variant_id: int,
scorer_name: str,
scorer_model: str,
tier: int,
result: dict,
raw_response: str | None = None,
) -> None:
import json as _json
with self.conn:
self.conn.execute(
"""INSERT INTO variant_scores
(variant_id, scorer_name, scorer_model, tier, gates, quality,
gate_passed, confidence, failed_gate_reason, quality_floor_failed,
raw_response, scored_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
variant_id, scorer_name, scorer_model, tier,
_json.dumps(result.get("gates", {})),
_json.dumps(result.get("quality", {})),
1 if result.get("gate_passed") else 0,
result.get("confidence", 0.0),
result.get("failed_gate_reason"),
1 if result.get("quality_floor_failed") else 0,
raw_response,
_now(),
),
)
def get_scores(self, variant_id: int) -> list[dict]:
"""Return all score history for a variant, newest first per scorer."""
rows = self.conn.execute(
"SELECT * FROM variant_scores WHERE variant_id = ? ORDER BY scored_at DESC",
(variant_id,),
).fetchall()
return [dict(r) for r in rows]
def get_latest_scores(self, variant_id: int) -> list[dict]:
"""Return only the most recent score per scorer for a variant."""
rows = self.conn.execute(
"SELECT * FROM latest_scores WHERE variant_id = ? ORDER BY tier",
(variant_id,),
).fetchall()
return [dict(r) for r in rows]
# -- pipeline dashboard ---------------------------------------------------
def get_pipeline_dashboard(self) -> dict:
"""Aggregate pipeline state for the dashboard UI.
Returns shape matching the frontend PipelineState interface:
funnel: { total_completed, total_processed, scoring: {name: {scored, passed, pass_rate, avg_confidence}}, approved, installed }
failed_gates: [{ gate, count }]
sprite_coverage: [{ sprite_id, entity_id, total_variants, processed, tier_counts: {name: {scored, passed}}, all_passed, deficit }]
recent_scores: [{ variant_id, sprite_id, scorer_name, gate_passed, confidence, scored_at }]
"""
with self._lock:
return self._get_pipeline_dashboard_locked()
def _get_pipeline_dashboard_locked(self) -> dict:
scorer_names: list[str] = [
r[0] for r in self.conn.execute(
"SELECT DISTINCT scorer_name FROM variant_scores ORDER BY scorer_name"
).fetchall()
] or ["qwen3", "haiku", "sonnet", "opus"] # fallback when DB has no scores yet
# ── Funnel ───────────────────────────────────────────────────────────
total_completed = self.conn.execute(
"SELECT COUNT(*) FROM variants WHERE job_status = 'completed'"
).fetchone()[0]
total_processed = self.conn.execute(
"SELECT COUNT(*) FROM variants WHERE job_status = 'completed' AND processed_path IS NOT NULL"
).fetchone()[0]
scoring: dict[str, dict] = {}
for name in scorer_names:
row = self.conn.execute(
"""SELECT COUNT(*) as scored, SUM(gate_passed) as passed,
AVG(CASE WHEN gate_passed = 1 THEN confidence END) as avg_conf
FROM latest_scores WHERE scorer_name = ?""",
(name,),
).fetchone()
scored = row["scored"] or 0
passed = row["passed"] or 0
scoring[name] = {
"scored": scored,
"passed": passed,
"pass_rate": round(passed / scored, 4) if scored else 0.0,
"avg_confidence": round(row["avg_conf"] or 0.0, 3),
}
unscored = self.conn.execute(
"""SELECT COUNT(*) FROM variants
WHERE job_status = 'completed'
AND NOT EXISTS (
SELECT 1 FROM latest_scores WHERE variant_id = variants.id
)"""
).fetchone()[0]
approved = self.conn.execute(
"SELECT COUNT(*) FROM variants WHERE is_approved = 1"
).fetchone()[0]
installed = self.conn.execute(
"SELECT COUNT(*) FROM sprites WHERE status = 'installed'"
).fetchone()[0]
funnel = {
"total_completed": total_completed,
"total_processed": total_processed,
"unscored": unscored,
"scoring": scoring,
"approved": approved,
"installed": installed,
}
# ── Failed gates (top 10 most common) ────────────────────────────────
gate_rows = self.conn.execute(
"""SELECT failed_gate_reason as gate, COUNT(*) as count
FROM latest_scores
WHERE gate_passed = 0 AND failed_gate_reason IS NOT NULL
AND failed_gate_reason != ''
GROUP BY failed_gate_reason ORDER BY count DESC LIMIT 10"""
).fetchall()
failed_gates = [{"gate": r["gate"], "count": r["count"]} for r in gate_rows]
# ── Sprite coverage (units) ──────────────────────────────────────────
target = 3
unit_rows = self.conn.execute(
"SELECT id, entity_id FROM sprites WHERE category = 'units' ORDER BY entity_id"
).fetchall()
sprite_coverage: list[dict] = []
for sr in unit_rows:
sid = sr["id"]
eid = sr["entity_id"]
total_variants = self.conn.execute(
"SELECT COUNT(*) FROM variants WHERE sprite_id = ? AND job_status = 'completed'",
(sid,),
).fetchone()[0]
processed = self.conn.execute(
"SELECT COUNT(*) FROM variants WHERE sprite_id = ? AND job_status = 'completed' AND processed_path IS NOT NULL",
(sid,),
).fetchone()[0]
tier_counts: dict[str, dict] = {}
for name in scorer_names:
scored = self.conn.execute(
"""SELECT COUNT(DISTINCT ls.variant_id) FROM latest_scores ls
JOIN variants v ON ls.variant_id = v.id
WHERE v.sprite_id = ? AND ls.scorer_name = ?""",
(sid, name),
).fetchone()[0]
passed = self.conn.execute(
"""SELECT COUNT(DISTINCT ls.variant_id) FROM latest_scores ls
JOIN variants v ON ls.variant_id = v.id
WHERE v.sprite_id = ? AND ls.scorer_name = ? AND ls.gate_passed = 1""",
(sid, name),
).fetchone()[0]
tier_counts[name] = {"scored": scored, "passed": passed}
all_passed = self.conn.execute(
"""SELECT COUNT(DISTINCT v.id) FROM variants v
WHERE v.sprite_id = ? AND v.job_status = 'completed'
AND EXISTS (SELECT 1 FROM latest_scores ls WHERE ls.variant_id = v.id)
AND NOT EXISTS (SELECT 1 FROM latest_scores ls WHERE ls.variant_id = v.id AND ls.gate_passed = 0)""",
(sid,),
).fetchone()[0]
sprite_coverage.append({
"sprite_id": sid,
"entity_id": eid,
"total_variants": total_variants,
"processed": processed,
"tier_counts": tier_counts,
"all_passed": all_passed,
"deficit": max(0, target - all_passed),
})
# ── Recent scores (last 10) ──────────────────────────────────────────
recent_rows = self.conn.execute(
"""SELECT vs.variant_id, v.sprite_id, vs.scorer_name,
vs.gate_passed, vs.confidence, vs.scored_at
FROM variant_scores vs
JOIN variants v ON vs.variant_id = v.id
ORDER BY vs.id DESC LIMIT 10"""
).fetchall()
recent_scores = [
{
"variant_id": r["variant_id"],
"sprite_id": r["sprite_id"],
"scorer_name": r["scorer_name"],
"gate_passed": bool(r["gate_passed"]),
"confidence": r["confidence"],
"scored_at": r["scored_at"],
}
for r in recent_rows
]
return {
"funnel": funnel,
"failed_gates": failed_gates,
"sprite_coverage": sprite_coverage,
"recent_scores": recent_scores,
}
# -- seed pool -------------------------------------------------------------
def get_proven_seeds(
self,
category: str,
entity_id: str | None = None,
limit: int = 20,
min_quality: float = 65.0,
) -> list[dict]:
"""Return proven seeds for a category, entity-specific first then broader pool."""
rows = self.conn.execute(
"""SELECT sp.seed, sp.category, sp.entity_id, sp.avg_quality,
sp.gate_pass_rate, sp.best_scorer, sp.is_approved,
sp.variant_id, sp.created_at
FROM seed_pool sp
WHERE sp.category = ?
AND sp.avg_quality >= ?
ORDER BY
CASE WHEN sp.entity_id = ? THEN 0 ELSE 1 END,
sp.avg_quality DESC
LIMIT ?""",
(category, min_quality, entity_id or "", limit),
).fetchall()
return [dict(r) for r in rows]
def add_to_seed_pool(
self,
variant_id: int,
seed: int,
category: str,
entity_id: str,
avg_quality: float,
gate_pass_rate: float,
best_scorer: str,
) -> None:
"""Upsert seed into pool. Only replaces if new avg_quality is strictly better."""
existing = self.conn.execute(
"SELECT id, avg_quality FROM seed_pool WHERE seed=? AND category=?",
(seed, category),
).fetchone()
if existing and existing["avg_quality"] >= avg_quality:
return
with self.conn:
self.conn.execute(
"""INSERT INTO seed_pool
(seed, category, entity_id, avg_quality, gate_pass_rate,
best_scorer, variant_id, is_approved, created_at)
VALUES (?,?,?,?,?,?,?,0,?)
ON CONFLICT(seed, category) DO UPDATE SET
entity_id=excluded.entity_id,
avg_quality=excluded.avg_quality,
gate_pass_rate=excluded.gate_pass_rate,
best_scorer=excluded.best_scorer,
variant_id=excluded.variant_id,
created_at=excluded.created_at
WHERE excluded.avg_quality > seed_pool.avg_quality""",
(seed, category, entity_id, avg_quality, gate_pass_rate,
best_scorer, variant_id, _now()),
)
def pin_seed(
self,
seed: int,
category: str,
entity_id: str,
variant_id: int | None = None,
) -> None:
"""Manually pin a seed with max quality (user override, bypasses threshold)."""
with self.conn:
self.conn.execute(
"""INSERT INTO seed_pool
(seed, category, entity_id, avg_quality, gate_pass_rate,
best_scorer, variant_id, is_approved, created_at)
VALUES (?,?,?,100.0,1.0,'manual',?,1,?)
ON CONFLICT(seed, category) DO UPDATE SET
entity_id=excluded.entity_id,
avg_quality=100.0,
gate_pass_rate=1.0,
best_scorer='manual',
is_approved=1""",
(seed, category, entity_id, variant_id or 0, _now()),
)
def get_seed_pool_report(
self,
category: str | None = None,
limit: int = 50,
) -> list[dict]:
rows = self.conn.execute(
"""SELECT sp.seed, sp.category, sp.entity_id, sp.avg_quality,
sp.is_approved, sp.best_scorer, sp.created_at,
v.prompt_modifier, v.guidance_scale
FROM seed_pool sp
LEFT JOIN variants v ON sp.variant_id = v.id
WHERE (:category IS NULL OR sp.category = :category)
ORDER BY sp.avg_quality DESC
LIMIT :limit""",
{"category": category, "limit": limit},
).fetchall()
return [dict(r) for r in rows]
# -- generation hints ------------------------------------------------------
def get_generation_hints(self, entity_id: str, category: str) -> dict | None:
row = self.conn.execute(
"SELECT * FROM generation_hints WHERE entity_id=? AND category=?",
(entity_id, category),
).fetchone()
return dict(row) if row else None
def update_generation_hints(
self,
entity_id: str,
category: str,
guidance_scale: float | None,
modifier_indices: list[int],
quality_scores: list[float],
) -> None:
"""Running-average update. Uses sample_count to weight the new batch."""
import json as _json
existing = self.get_generation_hints(entity_id, category)
if existing:
n = existing["sample_count"]
new_n = n + len(quality_scores)
new_avg = (
(existing["avg_quality"] or 0.0) * n
+ sum(quality_scores)
) / new_n if new_n > 0 else 0.0
# Merge modifier indices: append new, keep top-10 unique
old_mods: list[int] = _json.loads(existing["best_modifier_indices"] or "[]")
merged_mods = list(dict.fromkeys(old_mods + modifier_indices))[:10]
# Guidance: exponential moving average (α=0.3 toward new value)
old_guidance = existing["best_guidance"] or guidance_scale
new_guidance = (
0.7 * old_guidance + 0.3 * guidance_scale
if guidance_scale is not None and old_guidance is not None
else guidance_scale or old_guidance
)
with self.conn:
self.conn.execute(
"""UPDATE generation_hints
SET best_guidance=?, best_modifier_indices=?,
avg_quality=?, sample_count=?, updated_at=?
WHERE entity_id=? AND category=?""",
(new_guidance, _json.dumps(merged_mods), new_avg, new_n, _now(),
entity_id, category),
)
else:
avg = sum(quality_scores) / len(quality_scores) if quality_scores else 0.0
with self.conn:
self.conn.execute(
"""INSERT INTO generation_hints
(entity_id, category, best_guidance, best_modifier_indices,
avg_quality, sample_count, updated_at)
VALUES (?,?,?,?,?,?,?)""",
(entity_id, category, guidance_scale,
_json.dumps(modifier_indices), avg,
len(quality_scores), _now()),
)
# -- terrain grid ----------------------------------------------------------
def get_terrain_grid(self, elevation: str) -> list[dict]:
"""Return best variant info for each (temp, moist) cell of given elevation.
Iterates all 25 temp×moist combinations (04 each) and returns one dict
per cell. Cells with no sprite record get status='missing'. Cells with a
sprite but no completed variants get status from the sprite row.
Best variant selection: approved first, then highest passing scorer tier,
then newest by id.
"""
cells = []
for temp in range(5):
for moist in range(5):
entity_id = f"t{temp}_m{moist}_{elevation}"
sprite_id = f"biome_grid/{entity_id}"
sprite = self.conn.execute(
"SELECT id, entity_id, status FROM sprites WHERE id = ?",
(sprite_id,),
).fetchone()
if not sprite:
cells.append({
"temp": temp, "moist": moist, "elevation": elevation,
"sprite_id": sprite_id, "entity_id": entity_id,
"status": "missing", "variant_id": None,
"raw_path": None, "processed_path": None,
"is_approved": False, "variant_count": 0,
})
continue
variant_count = self.conn.execute(
"""SELECT COUNT(*) FROM variants
WHERE sprite_id = ? AND job_status = 'completed'
AND raw_path IS NOT NULL""",
(sprite_id,),
).fetchone()[0]
best = self.conn.execute(
"""SELECT v.id, v.raw_path, v.processed_path, v.is_approved,
COALESCE(MAX(vs.tier), -1) AS best_tier
FROM variants v
LEFT JOIN variant_scores vs
ON vs.variant_id = v.id AND vs.gate_passed = 1
WHERE v.sprite_id = ? AND v.job_status = 'completed'
AND v.raw_path IS NOT NULL
GROUP BY v.id
ORDER BY v.is_approved DESC, best_tier DESC, v.id DESC
LIMIT 1""",
(sprite_id,),
).fetchone()
cells.append({
"temp": temp, "moist": moist, "elevation": elevation,
"sprite_id": sprite_id, "entity_id": entity_id,
"status": sprite["status"],
"variant_id": best["id"] if best else None,
"raw_path": best["raw_path"] if best else None,
"processed_path": best["processed_path"] if best else None,
"is_approved": bool(best["is_approved"]) if best else False,
"variant_count": variant_count,
})
return cells
# -- quality analytics -----------------------------------------------------
def get_quality_analytics(
self,
category: str | None = None,
scorer: str | None = None,
) -> dict:
"""Aggregate per-dimension quality scores and gate failure rates.
Uses SQLite json_each() to unpack the quality/gates JSON columns.
Requires SQLite 3.38+ (Python 3.12 ships with 3.39+).
"""
params: dict = {"category": category, "scorer": scorer}
dim_rows = self.conn.execute(
"""SELECT s.category,
je.key AS dimension,
COUNT(*) AS sample_count,
ROUND(AVG(CAST(je.value AS REAL)), 1) AS avg_score,
ROUND(MIN(CAST(je.value AS REAL)), 1) AS min_score,
ROUND(MAX(CAST(je.value AS REAL)), 1) AS max_score,
ROUND(100.0 * SUM(
CASE WHEN CAST(je.value AS REAL) < 45 THEN 1 ELSE 0 END
) / COUNT(*), 1) AS pct_below_floor
FROM latest_scores ls
JOIN variants v ON ls.variant_id = v.id
JOIN sprites s ON v.sprite_id = s.id,
json_each(ls.quality) AS je
WHERE ls.gate_passed = 1
AND ls.quality IS NOT NULL
AND ls.quality != '{}'
AND (:category IS NULL OR s.category = :category)
AND (:scorer IS NULL OR ls.scorer_name = :scorer)
GROUP BY s.category, je.key
ORDER BY s.category, pct_below_floor DESC""",
params,
).fetchall()
gate_rows = self.conn.execute(
"""SELECT s.category,
je.key AS gate,
COUNT(*) AS total,
SUM(CASE WHEN CAST(je.value AS INTEGER) = 0 THEN 1 ELSE 0 END) AS failures,
ROUND(100.0 * SUM(
CASE WHEN CAST(je.value AS INTEGER) = 0 THEN 1 ELSE 0 END
) / COUNT(*), 1) AS failure_pct
FROM latest_scores ls
JOIN variants v ON ls.variant_id = v.id
JOIN sprites s ON v.sprite_id = s.id,
json_each(ls.gates) AS je
WHERE ls.gates IS NOT NULL
AND ls.gates != '{}'
AND (:category IS NULL OR s.category = :category)
AND (:scorer IS NULL OR ls.scorer_name = :scorer)
GROUP BY s.category, je.key
ORDER BY failure_pct DESC""",
params,
).fetchall()
pool_rows = self.conn.execute(
"SELECT category, COUNT(*) AS cnt FROM seed_pool GROUP BY category"
).fetchall()
return {
"dimensions": [dict(r) for r in dim_rows],
"gates": [dict(r) for r in gate_rows],
"seed_pool": {r["category"]: r["cnt"] for r in pool_rows},
}
def get_quality_trends(
self,
category: str | None = None,
weeks: int = 8,
) -> list[dict]:
"""Weekly rolling average per quality dimension."""
rows = self.conn.execute(
"""SELECT s.category,
je.key AS dimension,
strftime('%Y-W%W', ls.scored_at) AS week,
ROUND(AVG(CAST(je.value AS REAL)), 1) AS avg_score,
COUNT(*) AS sample_count
FROM latest_scores ls
JOIN variants v ON ls.variant_id = v.id
JOIN sprites s ON v.sprite_id = s.id,
json_each(ls.quality) AS je
WHERE ls.gate_passed = 1
AND ls.quality IS NOT NULL
AND ls.quality != '{}'
AND (:category IS NULL OR s.category = :category)
AND ls.scored_at >= datetime('now', :weeks_ago)
GROUP BY s.category, je.key, week
ORDER BY s.category, je.key, week DESC""",
{"category": category, "weeks_ago": f"-{weeks} weeks"},
).fetchall()
return [dict(r) for r in rows]
def get_calibration_data(self) -> list[dict]:
"""Compare scorer decisions against human approval/rejection data."""
rows = self.conn.execute(
"""SELECT ls.scorer_name,
s.category,
COUNT(*) AS passed_by_scorer,
SUM(CASE WHEN v.is_approved=1 THEN 1 ELSE 0 END) AS human_approved,
SUM(CASE WHEN v.rating=-1 AND v.is_approved=0
THEN 1 ELSE 0 END) AS human_rejected,
ROUND(AVG(CASE WHEN v.is_approved=1
THEN ls.confidence END), 3) AS avg_conf_approved,
MIN(CASE WHEN v.is_approved=1
THEN ls.confidence END) AS min_conf_approved
FROM latest_scores ls
JOIN variants v ON ls.variant_id = v.id
JOIN sprites s ON v.sprite_id = s.id
WHERE ls.gate_passed=1
AND (v.is_approved=1 OR v.rating=-1)
GROUP BY ls.scorer_name, s.category
ORDER BY ls.scorer_name, s.category"""
).fetchall()
return [dict(r) for r in rows]
def upsert_calibration(self, row: dict) -> None:
with self.conn:
self.conn.execute(
"""INSERT INTO scorer_calibration
(scorer_name, category, false_positive_rate, true_positive_rate,
recommended_threshold, sample_count, computed_at)
VALUES (:scorer_name, :category, :false_positive_rate,
:true_positive_rate, :recommended_threshold,
:sample_count, :computed_at)
ON CONFLICT(scorer_name, category) DO UPDATE SET
false_positive_rate=excluded.false_positive_rate,
true_positive_rate=excluded.true_positive_rate,
recommended_threshold=excluded.recommended_threshold,
sample_count=excluded.sample_count,
computed_at=excluded.computed_at""",
row,
)
# -- cleanup ---------------------------------------------------------------
def close(self) -> None:
self.conn.close()