From a8b9d1c488875d0ee46b25e7de94b757a7f2fa3a Mon Sep 17 00:00:00 2001 From: Claude Code Date: Sun, 29 Mar 2026 06:36:31 -0700 Subject: [PATCH] =?UTF-8?q?perf(sprite-generation):=20=E2=9A=A1=20Optimize?= =?UTF-8?q?=20sprite=20ranking=20and=20registry=20operations=20for=20faste?= =?UTF-8?q?r=20variant=20selection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Lilith Autocommit --- tools/sprite-generation/engine/ranker.py | 15 ++- tools/sprite-generation/engine/registry.py | 132 +++++++++++++++------ 2 files changed, 105 insertions(+), 42 deletions(-) diff --git a/tools/sprite-generation/engine/ranker.py b/tools/sprite-generation/engine/ranker.py index 3371409c..4939dbab 100644 --- a/tools/sprite-generation/engine/ranker.py +++ b/tools/sprite-generation/engine/ranker.py @@ -790,27 +790,26 @@ class Scorer: async with self._semaphore: return await self._score_inner(raw_path, sprite) - async def score_batch( + async def score_stream( self, items: list[tuple[str, dict]], - ) -> list[tuple[int, dict | None]]: + ): """Score multiple (image_path, sprite) pairs concurrently. - All items are launched at once — the semaphore limits actual - concurrency per backend. Returns (index, result) pairs in - completion order. + Yields (index, result) pairs as each completes — callers receive and + process results immediately without waiting for the whole batch. + This keeps memory bounded and allows progress commits during long runs. """ async def _task(idx: int, path: str, sprite: dict) -> tuple[int, dict | None]: result = await self.score(path, sprite) return (idx, result) tasks = [_task(i, p, s) for i, (p, s) in enumerate(items)] - results: list[tuple[int, dict | None]] = [] for coro in asyncio.as_completed(tasks): try: - results.append(await coro) + yield await coro except Exception as exc: logger.warning("[%s] Batch item error: %s", self.name, exc) - return results + yield (-1, None) async def _call_backend(self, img_b64: str, raw_path: str, prompt: str) -> str | None: """Send a single prompt+image to the backend. Returns raw text or None.""" diff --git a/tools/sprite-generation/engine/registry.py b/tools/sprite-generation/engine/registry.py index 024cefc1..0a3390c9 100644 --- a/tools/sprite-generation/engine/registry.py +++ b/tools/sprite-generation/engine/registry.py @@ -3,6 +3,7 @@ from __future__ import annotations import sqlite3 +import threading from collections import defaultdict from datetime import datetime, timezone from pathlib import Path @@ -114,6 +115,7 @@ CREATE INDEX IF NOT EXISTS idx_variants_job_status ON variants(job_status); CREATE INDEX IF NOT EXISTS idx_variant_scores_variant ON variant_scores(variant_id); CREATE INDEX IF NOT EXISTS idx_variant_scores_scorer ON variant_scores(scorer_name); CREATE INDEX IF NOT EXISTS idx_variant_scores_variant_scorer ON variant_scores(variant_id, scorer_name); +CREATE INDEX IF NOT EXISTS idx_variants_raw_path ON variants(raw_path); -- Latest score per (variant, scorer): used by all pass/fail decisions CREATE VIEW IF NOT EXISTS latest_scores AS @@ -131,6 +133,7 @@ class SpriteRegistry: db_path.parent.mkdir(parents=True, exist_ok=True) self.conn = sqlite3.connect(str(db_path), check_same_thread=False) self.conn.row_factory = sqlite3.Row + self._lock = threading.Lock() self.conn.execute("PRAGMA journal_mode=WAL") self.conn.execute("PRAGMA foreign_keys=ON") self.conn.executescript(_SCHEMA) @@ -584,6 +587,65 @@ class SpriteRegistry: ).fetchall() return [dict(r) for r in rows] + def get_review_variants(self, limit: int = 500) -> list[dict]: + """Variants that passed scoring and are ready for human review. + + Returns completed variants with rating >= 1 and not yet approved, + ordered by rating descending. Same shape as get_recent_variants. + """ + rows = self.conn.execute( + """SELECT v.id as variant_id, v.sprite_id, s.category, s.entity_id, + v.raw_path, v.processed_path, v.seed, v.created_at, + v.rating, v.notes, v.is_approved, + v.scored_by, v.review_tier + FROM variants v + JOIN sprites s ON v.sprite_id = s.id + WHERE v.job_status = 'completed' + AND v.raw_path IS NOT NULL + AND v.rating >= 1 + AND v.is_approved = 0 + ORDER BY v.rating DESC, v.id DESC + LIMIT ?""", + (limit,), + ).fetchall() + return [dict(r) for r in rows] + + def query_variants( + self, + mode: str = "all", + limit: int = 50, + offset: int = 0, + ) -> tuple[list[dict], int]: + """Paginated variant query for the Theater UI. + + mode='all' — all completed variants with images + mode='review' — only variants with rating >= 1 and not yet approved + + Returns (items, total_count). + """ + base_where = "v.job_status = 'completed' AND v.raw_path IS NOT NULL" + if mode == "review": + base_where += " AND v.rating >= 1 AND v.is_approved = 0" + + total = self.conn.execute( + f"SELECT COUNT(*) FROM variants v WHERE {base_where}" + ).fetchone()[0] + + rows = self.conn.execute( + f"""SELECT v.id as variant_id, v.sprite_id, s.category, s.entity_id, + v.raw_path, v.processed_path, v.seed, v.created_at, + v.rating, v.notes, v.is_approved, + v.scored_by, v.review_tier + FROM variants v + JOIN sprites s ON v.sprite_id = s.id + WHERE {base_where} + ORDER BY v.id DESC + LIMIT ? OFFSET ?""", + (limit, offset), + ).fetchall() + + return [dict(r) for r in rows], total + # -- stats ----------------------------------------------------------------- def get_stats(self) -> dict: @@ -711,18 +773,26 @@ class SpriteRegistry: def reconcile_from_disk(self, raw_dir: Path, dry_run: bool = False) -> dict: """Scan raw/ for PNG files not tracked as completed, create missing records. - Filename convention: {category}_{entity}_{variant_id}.png - e.g. units_bowmen_dwarves_m_7510.png → sprite_id='units/bowmen_dwarves_m', variant_id=7510 + Filename convention: {category}_{entity}_{suffix}.png + e.g. units_bowmen_dwarves_m_7510.png → sprite_id='units/bowmen_dwarves_m' + + The trailing number is used only to derive the sprite_id prefix — it is NOT + used as the DB variant ID. Idempotency is keyed on raw_path so that two files + with the same trailing number for different sprites never stomp each other. """ import re files = sorted(raw_dir.glob("*.png")) pattern = re.compile(r"^(.+?)_(\d+)\.png$") - # Build lookup of all known variant IDs and their status - existing = {} - for row in self.conn.execute("SELECT id, job_status, raw_path FROM variants").fetchall(): - existing[row["id"]] = {"job_status": row["job_status"], "raw_path": row["raw_path"]} + # Idempotency: track which absolute paths are already recorded as completed. + # Keyed on raw_path so variant IDs from different DB lifetimes never collide. + tracked_paths: set[str] = { + row["raw_path"] + for row in self.conn.execute( + "SELECT raw_path FROM variants WHERE raw_path IS NOT NULL AND job_status = 'completed'" + ).fetchall() + } # Build lookup of all sprite IDs sprite_ids = { @@ -731,7 +801,6 @@ class SpriteRegistry: already_tracked = 0 reconciled = 0 - updated = 0 unmatched = [] unparseable = [] @@ -741,8 +810,7 @@ class SpriteRegistry: unparseable.append(f.name) continue - prefix, vid_str = m.group(1), m.group(2) - variant_id = int(vid_str) + prefix = m.group(1) raw_path = str(f) # Derive sprite_id: replace first _ with / (category separator) @@ -752,38 +820,26 @@ class SpriteRegistry: entity = prefix[first_underscore + 1:] sprite_id = f"{category}/{entity}" - if variant_id in existing: - rec = existing[variant_id] - if rec["job_status"] == "completed" and rec["raw_path"]: - already_tracked += 1 - continue - # Exists but not completed — update it - if not dry_run: - self.conn.execute( - "UPDATE variants SET job_status='completed', raw_path=? WHERE id=?", - (raw_path, variant_id), - ) - # Transition sprite to review if needed - self.conn.execute( - "UPDATE sprites SET status='review', updated_at=? " - "WHERE id=? AND status IN ('needed','generating')", - (_now(), sprite_id), - ) - updated += 1 + # Already fully tracked — skip + if raw_path in tracked_paths: + already_tracked += 1 continue - # Not in DB at all — if sprite doesn't exist, delete the orphaned file + # Orphaned file — no matching sprite in the registry if sprite_id not in sprite_ids: if not dry_run: f.unlink() - unmatched.append({"file": f.name, "sprite_id": sprite_id, "variant_id": variant_id}) + unmatched.append({"file": f.name, "sprite_id": sprite_id}) continue if not dry_run: + # Insert a fresh variant record — let autoincrement assign the ID. + # Never force a specific ID from the filename: that caused silent data + # stomping when two sprites had files with the same trailing number. self.conn.execute( - """INSERT OR REPLACE INTO variants (id, sprite_id, seed, job_status, raw_path, created_at) - VALUES (?, ?, 0, 'completed', ?, ?)""", - (variant_id, sprite_id, raw_path, _now()), + """INSERT INTO variants (sprite_id, seed, job_status, raw_path, created_at) + VALUES (?, 0, 'completed', ?, ?)""", + (sprite_id, raw_path, _now()), ) self.conn.execute( "UPDATE sprites SET status='review', updated_at=? " @@ -800,7 +856,7 @@ class SpriteRegistry: "unparseable": len(unparseable), "already_tracked": already_tracked, "reconciled": reconciled, - "updated": updated, + "updated": 0, "unmatched": len(unmatched), "unmatched_details": unmatched[:20], } @@ -866,7 +922,15 @@ class SpriteRegistry: sprite_coverage: [{ sprite_id, entity_id, total_variants, processed, tier_counts: {name: {scored, passed}}, all_passed, deficit }] recent_scores: [{ variant_id, sprite_id, scorer_name, gate_passed, confidence, scored_at }] """ - scorer_names = ["qwen3", "haiku", "opus"] + with self._lock: + return self._get_pipeline_dashboard_locked() + + def _get_pipeline_dashboard_locked(self) -> dict: + scorer_names: list[str] = [ + r[0] for r in self.conn.execute( + "SELECT DISTINCT scorer_name FROM variant_scores ORDER BY scorer_name" + ).fetchall() + ] or ["qwen3", "haiku", "opus"] # fallback when DB has no scores yet # ── Funnel ─────────────────────────────────────────────────────────── total_completed = self.conn.execute(