From 65f17eb684084316c18f7fc7b57810a55cfdb241 Mon Sep 17 00:00:00 2001 From: Claude Code Date: Sun, 29 Mar 2026 10:07:32 -0700 Subject: [PATCH] =?UTF-8?q?feat(sprite-generation):=20=E2=9C=A8=20Implemen?= =?UTF-8?q?t=20advanced=20sprite=20generation=20algorithms=20and=20enhance?= =?UTF-8?q?=20prompt=20handling=20system?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Lilith Autocommit --- tools/sprite-generation/engine/generator.py | 82 +++- tools/sprite-generation/engine/prompts.py | 9 +- tools/sprite-generation/engine/registry.py | 478 +++++++++++++++++++- 3 files changed, 558 insertions(+), 11 deletions(-) diff --git a/tools/sprite-generation/engine/generator.py b/tools/sprite-generation/engine/generator.py index 1c1179ce..bcf98334 100644 --- a/tools/sprite-generation/engine/generator.py +++ b/tools/sprite-generation/engine/generator.py @@ -21,6 +21,7 @@ import asyncio import base64 import json import logging +import math import random from pathlib import Path @@ -130,6 +131,63 @@ class SpriteGenerator: negative = get_negative(category) return prompt, negative + # ------------------------------------------------------------------ + # Seed + modifier selection + # ------------------------------------------------------------------ + + def _select_seeds(self, category: str, entity_id: str, n: int) -> list[int]: + """Select n seeds using a 70/30 proven/random split. + + Proven seeds come from the seed_pool table (accumulated from past + high-scoring variants). Falls back to fully random when pool is empty + or has too few entries to fill the proven quota. + + Also explores seed neighbors (±1..3 from the best proven seed) to + sample nearby latent-space regions systematically. + """ + proven = self.registry.get_proven_seeds( + category=category, + entity_id=entity_id, + limit=40, + min_quality=65.0, + ) + proven_target = math.ceil(n * 0.7) + selected: list[int] = [] + + if proven: + weights = [p["avg_quality"] for p in proven] + total_w = sum(weights) + norm = [w / total_w for w in weights] + chosen = random.choices(proven, weights=norm, k=min(proven_target, len(proven))) + selected = [p["seed"] for p in chosen] + + # Neighbor exploration: ±1..3 from best seed + best_seed = max(proven, key=lambda p: p["avg_quality"])["seed"] + for delta in range(1, 4): + if len(selected) < proven_target: + selected.append((best_seed + delta) % (2**32)) + + while len(selected) < n: + selected.append(random.randint(0, 2**32 - 1)) + + return selected[:n] + + def _select_modifiers(self, entity_id: str, category: str, n: int) -> list[int]: + """Return modifier indices for n variants. + + When generation_hints has ≥20 samples, weights 60% toward historically + passing modifier indices. Otherwise uses the standard sequential cycle. + """ + hints = self.registry.get_generation_hints(entity_id, category) + if hints and hints.get("best_modifier_indices") and hints["sample_count"] >= 20: + good_mods: list[int] = json.loads(hints["best_modifier_indices"]) + if good_mods: + good_count = round(n * 0.6) + result = [random.choice(good_mods) for _ in range(good_count)] + result += list(range(n - good_count)) + return result + return list(range(n)) + # ------------------------------------------------------------------ # Phase 1: SUBMIT — queue requests, return immediately # ------------------------------------------------------------------ @@ -145,6 +203,7 @@ class SpriteGenerator: prompt_modifier: str = "", priority: str = "normal", dimension_id: int | None = None, + guidance_scale: float | None = None, ) -> tuple[int, str] | None: """Submit a single generation request to model-boss queue. @@ -153,13 +212,17 @@ class SpriteGenerator: """ full_prompt = ", ".join(p for p in [prompt, prompt_modifier] if p) + effective_guidance = ( + guidance_scale if guidance_scale is not None + else self.defaults.get("guidance_scale", 7.5) + ) body_fields = { "prompt": full_prompt, "negative_prompt": negative, "width": width, "height": height, "steps": self.defaults.get("steps", 25), - "guidance_scale": self.defaults.get("guidance_scale", 7.5), + "guidance_scale": effective_guidance, "seed": seed, "n": 1, } @@ -193,7 +256,7 @@ class SpriteGenerator: model=self.model, prompt_used=full_prompt, negative_used=negative, - guidance_scale=self.defaults.get("guidance_scale", 7.5), + guidance_scale=effective_guidance, steps=self.defaults.get("steps", 25), prompt_author="claude-opus-4-6", ) @@ -232,9 +295,19 @@ class SpriteGenerator: width = sprite.get("gen_width") or gen_w height = sprite.get("gen_height") or gen_h + seeds = self._select_seeds(category, entity_id, variants_per) + modifier_indices = self._select_modifiers(entity_id, category, variants_per) + + # Adaptive guidance: use generation_hints when ≥10 passing samples exist + hints = self.registry.get_generation_hints(entity_id, category) + if hints and hints.get("best_guidance") and hints["sample_count"] >= 10: + guidance = max(6.5, min(9.0, hints["best_guidance"])) + else: + guidance = self.defaults.get("guidance_scale", 7.5) + for i in range(variants_per): - seed = random.randint(0, 2**32 - 1) - modifier = get_variant_modifier(i) + seed = seeds[i] + modifier = get_variant_modifier(modifier_indices[i]) result = await self.submit_one( sprite_id=sprite_id, @@ -245,6 +318,7 @@ class SpriteGenerator: seed=seed, prompt_modifier=modifier, priority=priority, + guidance_scale=guidance, ) if result: submitted += 1 diff --git a/tools/sprite-generation/engine/prompts.py b/tools/sprite-generation/engine/prompts.py index 8a5bf5ed..8f2cdb81 100644 --- a/tools/sprite-generation/engine/prompts.py +++ b/tools/sprite-generation/engine/prompts.py @@ -247,9 +247,12 @@ UNIT_STYLE_BY_COMBAT_TYPE: dict[str, str] = { "ranged": _unit_style("ranged fighter holding weapon ready to fire"), "cavalry": ( "single mounted unit game sprite, simple background, " - "isometric three-quarter rear view, rider on warhorse walking toward lower-left corner, " + "REAR VIEW of rider on warhorse, horse walking AWAY from viewer toward lower-left, " + "rider's BACK facing camera, back of rider visible, horse hindquarters and tail visible, " + "NO front hooves visible, NO chest of rider visible, NO face of rider visible, " + "isometric three-quarter view from slightly above, full mount visible, " "hand-painted digital fantasy art, Warcraft III style, " - "rich saturated colors, sharp clean edges, full mount visible, masterpiece" + "rich saturated colors, sharp clean edges, masterpiece" ), "siege": ( "single siege engine game sprite, simple background, " @@ -283,7 +286,7 @@ def get_unit_style(combat_type: str) -> str: COMBAT_TYPE_FLAVORS: dict[str, str] = { "melee": "armored warrior, close combat stance, weapon drawn", "ranged": "ranged fighter, bow or crossbow ready, quiver", - "cavalry": "mounted on horse or beast, charging pose", + "cavalry": "mounted on warhorse moving away, horse's back and tail visible, rider's back to viewer", "siege": "large war machine, siege engine, heavy wood and iron", "flying": "winged creature in flight, soaring pose", "specialist": "support character, magical or tactical equipment", diff --git a/tools/sprite-generation/engine/registry.py b/tools/sprite-generation/engine/registry.py index 0a3390c9..0866bcf9 100644 --- a/tools/sprite-generation/engine/registry.py +++ b/tools/sprite-generation/engine/registry.py @@ -125,6 +125,52 @@ WHERE vs.id = ( SELECT MAX(id) FROM variant_scores WHERE variant_id = vs.variant_id AND scorer_name = vs.scorer_name ); + +-- Seeds that produced high-quality variants: fed back into generator. +-- UNIQUE(seed, category) — one entry per seed-category pair; INSERT OR REPLACE +-- updates when a better score is found. +CREATE TABLE IF NOT EXISTS seed_pool ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + seed INTEGER NOT NULL, + category TEXT NOT NULL, + entity_id TEXT NOT NULL, + avg_quality REAL NOT NULL DEFAULT 0.0, + gate_pass_rate REAL NOT NULL DEFAULT 1.0, + best_scorer TEXT, + variant_id INTEGER NOT NULL REFERENCES variants(id) ON DELETE CASCADE, + is_approved INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL, + UNIQUE(seed, category) +); + +-- Per-entity generation hints: aggregated best guidance + modifier indices. +CREATE TABLE IF NOT EXISTS generation_hints ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + entity_id TEXT NOT NULL, + category TEXT NOT NULL, + best_guidance REAL, + best_modifier_indices TEXT, + avg_quality REAL, + sample_count INTEGER NOT NULL DEFAULT 0, + updated_at TEXT NOT NULL, + UNIQUE(entity_id, category) +); + +-- Scorer calibration: human approval data vs scorer decisions. +CREATE TABLE IF NOT EXISTS scorer_calibration ( + scorer_name TEXT NOT NULL, + category TEXT NOT NULL, + false_positive_rate REAL, + true_positive_rate REAL, + recommended_threshold REAL, + sample_count INTEGER NOT NULL DEFAULT 0, + computed_at TEXT NOT NULL, + PRIMARY KEY(scorer_name, category) +); + +CREATE INDEX IF NOT EXISTS idx_seed_pool_category ON seed_pool(category); +CREATE INDEX IF NOT EXISTS idx_seed_pool_quality ON seed_pool(avg_quality DESC); +CREATE INDEX IF NOT EXISTS idx_generation_hints_entity ON generation_hints(entity_id, category); """ @@ -152,6 +198,7 @@ class SpriteRegistry: ("scored_by", "TEXT"), ("scored_at", "TEXT"), ("review_tier", "INTEGER DEFAULT 0"), + ("reject_reason", "TEXT"), ]: if col not in existing_v: self.conn.execute(f"ALTER TABLE variants ADD COLUMN {col} {typ}") @@ -213,6 +260,47 @@ class SpriteRegistry: ); """) + # Create new tables if they don't exist (for existing DBs) + self.conn.executescript(""" + CREATE TABLE IF NOT EXISTS seed_pool ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + seed INTEGER NOT NULL, + category TEXT NOT NULL, + entity_id TEXT NOT NULL, + avg_quality REAL NOT NULL DEFAULT 0.0, + gate_pass_rate REAL NOT NULL DEFAULT 1.0, + best_scorer TEXT, + variant_id INTEGER NOT NULL REFERENCES variants(id) ON DELETE CASCADE, + is_approved INTEGER NOT NULL DEFAULT 0, + created_at TEXT NOT NULL, + UNIQUE(seed, category) + ); + CREATE TABLE IF NOT EXISTS generation_hints ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + entity_id TEXT NOT NULL, + category TEXT NOT NULL, + best_guidance REAL, + best_modifier_indices TEXT, + avg_quality REAL, + sample_count INTEGER NOT NULL DEFAULT 0, + updated_at TEXT NOT NULL, + UNIQUE(entity_id, category) + ); + CREATE TABLE IF NOT EXISTS scorer_calibration ( + scorer_name TEXT NOT NULL, + category TEXT NOT NULL, + false_positive_rate REAL, + true_positive_rate REAL, + recommended_threshold REAL, + sample_count INTEGER NOT NULL DEFAULT 0, + computed_at TEXT NOT NULL, + PRIMARY KEY(scorer_name, category) + ); + CREATE INDEX IF NOT EXISTS idx_seed_pool_category ON seed_pool(category); + CREATE INDEX IF NOT EXISTS idx_seed_pool_quality ON seed_pool(avg_quality DESC); + CREATE INDEX IF NOT EXISTS idx_generation_hints_entity ON generation_hints(entity_id, category); + """) + self.conn.commit() # -- sprites --------------------------------------------------------------- @@ -530,11 +618,12 @@ class SpriteRegistry: (now, row["sprite_id"]), ) - def reject_variant(self, variant_id: int) -> None: + def reject_variant(self, variant_id: int, reason: str | None = None) -> None: """Mark a variant as rejected (rating = -1). Persists skip decisions.""" with self.conn: self.conn.execute( - "UPDATE variants SET rating=-1 WHERE id=?", (variant_id,) + "UPDATE variants SET rating=-1, reject_reason=? WHERE id=?", + (reason, variant_id), ) def get_variant(self, variant_id: int) -> dict | None: @@ -635,9 +724,22 @@ class SpriteRegistry: f"""SELECT v.id as variant_id, v.sprite_id, s.category, s.entity_id, v.raw_path, v.processed_path, v.seed, v.created_at, v.rating, v.notes, v.is_approved, - v.scored_by, v.review_tier + v.scored_by, v.review_tier, + best.quality AS quality_json, + best.gates AS gates_json, + best.scorer_name AS quality_scorer FROM variants v JOIN sprites s ON v.sprite_id = s.id + LEFT JOIN ( + SELECT ls.variant_id, ls.quality, ls.gates, ls.scorer_name + FROM latest_scores ls + WHERE ls.gate_passed = 1 + AND ls.tier = ( + SELECT MAX(ls2.tier) + FROM latest_scores ls2 + WHERE ls2.variant_id = ls.variant_id AND ls2.gate_passed = 1 + ) + ) best ON best.variant_id = v.id WHERE {base_where} ORDER BY v.id DESC LIMIT ? OFFSET ?""", @@ -930,7 +1032,7 @@ class SpriteRegistry: r[0] for r in self.conn.execute( "SELECT DISTINCT scorer_name FROM variant_scores ORDER BY scorer_name" ).fetchall() - ] or ["qwen3", "haiku", "opus"] # fallback when DB has no scores yet + ] or ["qwen3", "haiku", "sonnet", "opus"] # fallback when DB has no scores yet # ── Funnel ─────────────────────────────────────────────────────────── total_completed = self.conn.execute( @@ -1075,6 +1177,374 @@ class SpriteRegistry: "recent_scores": recent_scores, } + # -- seed pool ------------------------------------------------------------- + + def get_proven_seeds( + self, + category: str, + entity_id: str | None = None, + limit: int = 20, + min_quality: float = 65.0, + ) -> list[dict]: + """Return proven seeds for a category, entity-specific first then broader pool.""" + rows = self.conn.execute( + """SELECT sp.seed, sp.category, sp.entity_id, sp.avg_quality, + sp.gate_pass_rate, sp.best_scorer, sp.is_approved, + sp.variant_id, sp.created_at + FROM seed_pool sp + WHERE sp.category = ? + AND sp.avg_quality >= ? + ORDER BY + CASE WHEN sp.entity_id = ? THEN 0 ELSE 1 END, + sp.avg_quality DESC + LIMIT ?""", + (category, min_quality, entity_id or "", limit), + ).fetchall() + return [dict(r) for r in rows] + + def add_to_seed_pool( + self, + variant_id: int, + seed: int, + category: str, + entity_id: str, + avg_quality: float, + gate_pass_rate: float, + best_scorer: str, + ) -> None: + """Upsert seed into pool. Only replaces if new avg_quality is strictly better.""" + existing = self.conn.execute( + "SELECT id, avg_quality FROM seed_pool WHERE seed=? AND category=?", + (seed, category), + ).fetchone() + if existing and existing["avg_quality"] >= avg_quality: + return + with self.conn: + self.conn.execute( + """INSERT INTO seed_pool + (seed, category, entity_id, avg_quality, gate_pass_rate, + best_scorer, variant_id, is_approved, created_at) + VALUES (?,?,?,?,?,?,?,0,?) + ON CONFLICT(seed, category) DO UPDATE SET + entity_id=excluded.entity_id, + avg_quality=excluded.avg_quality, + gate_pass_rate=excluded.gate_pass_rate, + best_scorer=excluded.best_scorer, + variant_id=excluded.variant_id, + created_at=excluded.created_at + WHERE excluded.avg_quality > seed_pool.avg_quality""", + (seed, category, entity_id, avg_quality, gate_pass_rate, + best_scorer, variant_id, _now()), + ) + + def pin_seed( + self, + seed: int, + category: str, + entity_id: str, + variant_id: int | None = None, + ) -> None: + """Manually pin a seed with max quality (user override, bypasses threshold).""" + with self.conn: + self.conn.execute( + """INSERT INTO seed_pool + (seed, category, entity_id, avg_quality, gate_pass_rate, + best_scorer, variant_id, is_approved, created_at) + VALUES (?,?,?,100.0,1.0,'manual',?,1,?) + ON CONFLICT(seed, category) DO UPDATE SET + entity_id=excluded.entity_id, + avg_quality=100.0, + gate_pass_rate=1.0, + best_scorer='manual', + is_approved=1""", + (seed, category, entity_id, variant_id or 0, _now()), + ) + + def get_seed_pool_report( + self, + category: str | None = None, + limit: int = 50, + ) -> list[dict]: + rows = self.conn.execute( + """SELECT sp.seed, sp.category, sp.entity_id, sp.avg_quality, + sp.is_approved, sp.best_scorer, sp.created_at, + v.prompt_modifier, v.guidance_scale + FROM seed_pool sp + LEFT JOIN variants v ON sp.variant_id = v.id + WHERE (:category IS NULL OR sp.category = :category) + ORDER BY sp.avg_quality DESC + LIMIT :limit""", + {"category": category, "limit": limit}, + ).fetchall() + return [dict(r) for r in rows] + + # -- generation hints ------------------------------------------------------ + + def get_generation_hints(self, entity_id: str, category: str) -> dict | None: + row = self.conn.execute( + "SELECT * FROM generation_hints WHERE entity_id=? AND category=?", + (entity_id, category), + ).fetchone() + return dict(row) if row else None + + def update_generation_hints( + self, + entity_id: str, + category: str, + guidance_scale: float | None, + modifier_indices: list[int], + quality_scores: list[float], + ) -> None: + """Running-average update. Uses sample_count to weight the new batch.""" + import json as _json + + existing = self.get_generation_hints(entity_id, category) + if existing: + n = existing["sample_count"] + new_n = n + len(quality_scores) + new_avg = ( + (existing["avg_quality"] or 0.0) * n + + sum(quality_scores) + ) / new_n if new_n > 0 else 0.0 + + # Merge modifier indices: append new, keep top-10 unique + old_mods: list[int] = _json.loads(existing["best_modifier_indices"] or "[]") + merged_mods = list(dict.fromkeys(old_mods + modifier_indices))[:10] + + # Guidance: exponential moving average (α=0.3 toward new value) + old_guidance = existing["best_guidance"] or guidance_scale + new_guidance = ( + 0.7 * old_guidance + 0.3 * guidance_scale + if guidance_scale is not None and old_guidance is not None + else guidance_scale or old_guidance + ) + + with self.conn: + self.conn.execute( + """UPDATE generation_hints + SET best_guidance=?, best_modifier_indices=?, + avg_quality=?, sample_count=?, updated_at=? + WHERE entity_id=? AND category=?""", + (new_guidance, _json.dumps(merged_mods), new_avg, new_n, _now(), + entity_id, category), + ) + else: + avg = sum(quality_scores) / len(quality_scores) if quality_scores else 0.0 + with self.conn: + self.conn.execute( + """INSERT INTO generation_hints + (entity_id, category, best_guidance, best_modifier_indices, + avg_quality, sample_count, updated_at) + VALUES (?,?,?,?,?,?,?)""", + (entity_id, category, guidance_scale, + _json.dumps(modifier_indices), avg, + len(quality_scores), _now()), + ) + + # -- terrain grid ---------------------------------------------------------- + + def get_terrain_grid(self, elevation: str) -> list[dict]: + """Return best variant info for each (temp, moist) cell of given elevation. + + Iterates all 25 temp×moist combinations (0–4 each) and returns one dict + per cell. Cells with no sprite record get status='missing'. Cells with a + sprite but no completed variants get status from the sprite row. + + Best variant selection: approved first, then highest passing scorer tier, + then newest by id. + """ + cells = [] + for temp in range(5): + for moist in range(5): + entity_id = f"t{temp}_m{moist}_{elevation}" + sprite_id = f"biome_grid/{entity_id}" + + sprite = self.conn.execute( + "SELECT id, entity_id, status FROM sprites WHERE id = ?", + (sprite_id,), + ).fetchone() + + if not sprite: + cells.append({ + "temp": temp, "moist": moist, "elevation": elevation, + "sprite_id": sprite_id, "entity_id": entity_id, + "status": "missing", "variant_id": None, + "raw_path": None, "processed_path": None, + "is_approved": False, "variant_count": 0, + }) + continue + + variant_count = self.conn.execute( + """SELECT COUNT(*) FROM variants + WHERE sprite_id = ? AND job_status = 'completed' + AND raw_path IS NOT NULL""", + (sprite_id,), + ).fetchone()[0] + + best = self.conn.execute( + """SELECT v.id, v.raw_path, v.processed_path, v.is_approved, + COALESCE(MAX(vs.tier), -1) AS best_tier + FROM variants v + LEFT JOIN variant_scores vs + ON vs.variant_id = v.id AND vs.gate_passed = 1 + WHERE v.sprite_id = ? AND v.job_status = 'completed' + AND v.raw_path IS NOT NULL + GROUP BY v.id + ORDER BY v.is_approved DESC, best_tier DESC, v.id DESC + LIMIT 1""", + (sprite_id,), + ).fetchone() + + cells.append({ + "temp": temp, "moist": moist, "elevation": elevation, + "sprite_id": sprite_id, "entity_id": entity_id, + "status": sprite["status"], + "variant_id": best["id"] if best else None, + "raw_path": best["raw_path"] if best else None, + "processed_path": best["processed_path"] if best else None, + "is_approved": bool(best["is_approved"]) if best else False, + "variant_count": variant_count, + }) + + return cells + + # -- quality analytics ----------------------------------------------------- + + def get_quality_analytics( + self, + category: str | None = None, + scorer: str | None = None, + ) -> dict: + """Aggregate per-dimension quality scores and gate failure rates. + + Uses SQLite json_each() to unpack the quality/gates JSON columns. + Requires SQLite 3.38+ (Python 3.12 ships with 3.39+). + """ + params: dict = {"category": category, "scorer": scorer} + + dim_rows = self.conn.execute( + """SELECT s.category, + je.key AS dimension, + COUNT(*) AS sample_count, + ROUND(AVG(CAST(je.value AS REAL)), 1) AS avg_score, + ROUND(MIN(CAST(je.value AS REAL)), 1) AS min_score, + ROUND(MAX(CAST(je.value AS REAL)), 1) AS max_score, + ROUND(100.0 * SUM( + CASE WHEN CAST(je.value AS REAL) < 45 THEN 1 ELSE 0 END + ) / COUNT(*), 1) AS pct_below_floor + FROM latest_scores ls + JOIN variants v ON ls.variant_id = v.id + JOIN sprites s ON v.sprite_id = s.id, + json_each(ls.quality) AS je + WHERE ls.gate_passed = 1 + AND ls.quality IS NOT NULL + AND ls.quality != '{}' + AND (:category IS NULL OR s.category = :category) + AND (:scorer IS NULL OR ls.scorer_name = :scorer) + GROUP BY s.category, je.key + ORDER BY s.category, pct_below_floor DESC""", + params, + ).fetchall() + + gate_rows = self.conn.execute( + """SELECT s.category, + je.key AS gate, + COUNT(*) AS total, + SUM(CASE WHEN CAST(je.value AS INTEGER) = 0 THEN 1 ELSE 0 END) AS failures, + ROUND(100.0 * SUM( + CASE WHEN CAST(je.value AS INTEGER) = 0 THEN 1 ELSE 0 END + ) / COUNT(*), 1) AS failure_pct + FROM latest_scores ls + JOIN variants v ON ls.variant_id = v.id + JOIN sprites s ON v.sprite_id = s.id, + json_each(ls.gates) AS je + WHERE ls.gates IS NOT NULL + AND ls.gates != '{}' + AND (:category IS NULL OR s.category = :category) + AND (:scorer IS NULL OR ls.scorer_name = :scorer) + GROUP BY s.category, je.key + ORDER BY failure_pct DESC""", + params, + ).fetchall() + + pool_rows = self.conn.execute( + "SELECT category, COUNT(*) AS cnt FROM seed_pool GROUP BY category" + ).fetchall() + + return { + "dimensions": [dict(r) for r in dim_rows], + "gates": [dict(r) for r in gate_rows], + "seed_pool": {r["category"]: r["cnt"] for r in pool_rows}, + } + + def get_quality_trends( + self, + category: str | None = None, + weeks: int = 8, + ) -> list[dict]: + """Weekly rolling average per quality dimension.""" + rows = self.conn.execute( + """SELECT s.category, + je.key AS dimension, + strftime('%Y-W%W', ls.scored_at) AS week, + ROUND(AVG(CAST(je.value AS REAL)), 1) AS avg_score, + COUNT(*) AS sample_count + FROM latest_scores ls + JOIN variants v ON ls.variant_id = v.id + JOIN sprites s ON v.sprite_id = s.id, + json_each(ls.quality) AS je + WHERE ls.gate_passed = 1 + AND ls.quality IS NOT NULL + AND ls.quality != '{}' + AND (:category IS NULL OR s.category = :category) + AND ls.scored_at >= datetime('now', :weeks_ago) + GROUP BY s.category, je.key, week + ORDER BY s.category, je.key, week DESC""", + {"category": category, "weeks_ago": f"-{weeks} weeks"}, + ).fetchall() + return [dict(r) for r in rows] + + def get_calibration_data(self) -> list[dict]: + """Compare scorer decisions against human approval/rejection data.""" + rows = self.conn.execute( + """SELECT ls.scorer_name, + s.category, + COUNT(*) AS passed_by_scorer, + SUM(CASE WHEN v.is_approved=1 THEN 1 ELSE 0 END) AS human_approved, + SUM(CASE WHEN v.rating=-1 AND v.is_approved=0 + THEN 1 ELSE 0 END) AS human_rejected, + ROUND(AVG(CASE WHEN v.is_approved=1 + THEN ls.confidence END), 3) AS avg_conf_approved, + MIN(CASE WHEN v.is_approved=1 + THEN ls.confidence END) AS min_conf_approved + FROM latest_scores ls + JOIN variants v ON ls.variant_id = v.id + JOIN sprites s ON v.sprite_id = s.id + WHERE ls.gate_passed=1 + AND (v.is_approved=1 OR v.rating=-1) + GROUP BY ls.scorer_name, s.category + ORDER BY ls.scorer_name, s.category""" + ).fetchall() + return [dict(r) for r in rows] + + def upsert_calibration(self, row: dict) -> None: + with self.conn: + self.conn.execute( + """INSERT INTO scorer_calibration + (scorer_name, category, false_positive_rate, true_positive_rate, + recommended_threshold, sample_count, computed_at) + VALUES (:scorer_name, :category, :false_positive_rate, + :true_positive_rate, :recommended_threshold, + :sample_count, :computed_at) + ON CONFLICT(scorer_name, category) DO UPDATE SET + false_positive_rate=excluded.false_positive_rate, + true_positive_rate=excluded.true_positive_rate, + recommended_threshold=excluded.recommended_threshold, + sample_count=excluded.sample_count, + computed_at=excluded.computed_at""", + row, + ) + # -- cleanup --------------------------------------------------------------- def close(self) -> None: