perf(sprite-generation): Optimize sprite ranking and registry operations for faster variant selection

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Claude Code 2026-03-29 06:36:31 -07:00
parent da35e48c81
commit a8b9d1c488
2 changed files with 105 additions and 42 deletions

View file

@ -790,27 +790,26 @@ class Scorer:
async with self._semaphore:
return await self._score_inner(raw_path, sprite)
async def score_batch(
async def score_stream(
self, items: list[tuple[str, dict]],
) -> list[tuple[int, dict | None]]:
):
"""Score multiple (image_path, sprite) pairs concurrently.
All items are launched at once the semaphore limits actual
concurrency per backend. Returns (index, result) pairs in
completion order.
Yields (index, result) pairs as each completes callers receive and
process results immediately without waiting for the whole batch.
This keeps memory bounded and allows progress commits during long runs.
"""
async def _task(idx: int, path: str, sprite: dict) -> tuple[int, dict | None]:
result = await self.score(path, sprite)
return (idx, result)
tasks = [_task(i, p, s) for i, (p, s) in enumerate(items)]
results: list[tuple[int, dict | None]] = []
for coro in asyncio.as_completed(tasks):
try:
results.append(await coro)
yield await coro
except Exception as exc:
logger.warning("[%s] Batch item error: %s", self.name, exc)
return results
yield (-1, None)
async def _call_backend(self, img_b64: str, raw_path: str, prompt: str) -> str | None:
"""Send a single prompt+image to the backend. Returns raw text or None."""

View file

@ -3,6 +3,7 @@
from __future__ import annotations
import sqlite3
import threading
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
@ -114,6 +115,7 @@ CREATE INDEX IF NOT EXISTS idx_variants_job_status ON variants(job_status);
CREATE INDEX IF NOT EXISTS idx_variant_scores_variant ON variant_scores(variant_id);
CREATE INDEX IF NOT EXISTS idx_variant_scores_scorer ON variant_scores(scorer_name);
CREATE INDEX IF NOT EXISTS idx_variant_scores_variant_scorer ON variant_scores(variant_id, scorer_name);
CREATE INDEX IF NOT EXISTS idx_variants_raw_path ON variants(raw_path);
-- Latest score per (variant, scorer): used by all pass/fail decisions
CREATE VIEW IF NOT EXISTS latest_scores AS
@ -131,6 +133,7 @@ class SpriteRegistry:
db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(str(db_path), check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self._lock = threading.Lock()
self.conn.execute("PRAGMA journal_mode=WAL")
self.conn.execute("PRAGMA foreign_keys=ON")
self.conn.executescript(_SCHEMA)
@ -584,6 +587,65 @@ class SpriteRegistry:
).fetchall()
return [dict(r) for r in rows]
def get_review_variants(self, limit: int = 500) -> list[dict]:
"""Variants that passed scoring and are ready for human review.
Returns completed variants with rating >= 1 and not yet approved,
ordered by rating descending. Same shape as get_recent_variants.
"""
rows = self.conn.execute(
"""SELECT v.id as variant_id, v.sprite_id, s.category, s.entity_id,
v.raw_path, v.processed_path, v.seed, v.created_at,
v.rating, v.notes, v.is_approved,
v.scored_by, v.review_tier
FROM variants v
JOIN sprites s ON v.sprite_id = s.id
WHERE v.job_status = 'completed'
AND v.raw_path IS NOT NULL
AND v.rating >= 1
AND v.is_approved = 0
ORDER BY v.rating DESC, v.id DESC
LIMIT ?""",
(limit,),
).fetchall()
return [dict(r) for r in rows]
def query_variants(
self,
mode: str = "all",
limit: int = 50,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Paginated variant query for the Theater UI.
mode='all' all completed variants with images
mode='review' only variants with rating >= 1 and not yet approved
Returns (items, total_count).
"""
base_where = "v.job_status = 'completed' AND v.raw_path IS NOT NULL"
if mode == "review":
base_where += " AND v.rating >= 1 AND v.is_approved = 0"
total = self.conn.execute(
f"SELECT COUNT(*) FROM variants v WHERE {base_where}"
).fetchone()[0]
rows = self.conn.execute(
f"""SELECT v.id as variant_id, v.sprite_id, s.category, s.entity_id,
v.raw_path, v.processed_path, v.seed, v.created_at,
v.rating, v.notes, v.is_approved,
v.scored_by, v.review_tier
FROM variants v
JOIN sprites s ON v.sprite_id = s.id
WHERE {base_where}
ORDER BY v.id DESC
LIMIT ? OFFSET ?""",
(limit, offset),
).fetchall()
return [dict(r) for r in rows], total
# -- stats -----------------------------------------------------------------
def get_stats(self) -> dict:
@ -711,18 +773,26 @@ class SpriteRegistry:
def reconcile_from_disk(self, raw_dir: Path, dry_run: bool = False) -> dict:
"""Scan raw/ for PNG files not tracked as completed, create missing records.
Filename convention: {category}_{entity}_{variant_id}.png
e.g. units_bowmen_dwarves_m_7510.png sprite_id='units/bowmen_dwarves_m', variant_id=7510
Filename convention: {category}_{entity}_{suffix}.png
e.g. units_bowmen_dwarves_m_7510.png sprite_id='units/bowmen_dwarves_m'
The trailing number is used only to derive the sprite_id prefix it is NOT
used as the DB variant ID. Idempotency is keyed on raw_path so that two files
with the same trailing number for different sprites never stomp each other.
"""
import re
files = sorted(raw_dir.glob("*.png"))
pattern = re.compile(r"^(.+?)_(\d+)\.png$")
# Build lookup of all known variant IDs and their status
existing = {}
for row in self.conn.execute("SELECT id, job_status, raw_path FROM variants").fetchall():
existing[row["id"]] = {"job_status": row["job_status"], "raw_path": row["raw_path"]}
# Idempotency: track which absolute paths are already recorded as completed.
# Keyed on raw_path so variant IDs from different DB lifetimes never collide.
tracked_paths: set[str] = {
row["raw_path"]
for row in self.conn.execute(
"SELECT raw_path FROM variants WHERE raw_path IS NOT NULL AND job_status = 'completed'"
).fetchall()
}
# Build lookup of all sprite IDs
sprite_ids = {
@ -731,7 +801,6 @@ class SpriteRegistry:
already_tracked = 0
reconciled = 0
updated = 0
unmatched = []
unparseable = []
@ -741,8 +810,7 @@ class SpriteRegistry:
unparseable.append(f.name)
continue
prefix, vid_str = m.group(1), m.group(2)
variant_id = int(vid_str)
prefix = m.group(1)
raw_path = str(f)
# Derive sprite_id: replace first _ with / (category separator)
@ -752,38 +820,26 @@ class SpriteRegistry:
entity = prefix[first_underscore + 1:]
sprite_id = f"{category}/{entity}"
if variant_id in existing:
rec = existing[variant_id]
if rec["job_status"] == "completed" and rec["raw_path"]:
already_tracked += 1
continue
# Exists but not completed — update it
if not dry_run:
self.conn.execute(
"UPDATE variants SET job_status='completed', raw_path=? WHERE id=?",
(raw_path, variant_id),
)
# Transition sprite to review if needed
self.conn.execute(
"UPDATE sprites SET status='review', updated_at=? "
"WHERE id=? AND status IN ('needed','generating')",
(_now(), sprite_id),
)
updated += 1
# Already fully tracked — skip
if raw_path in tracked_paths:
already_tracked += 1
continue
# Not in DB at all — if sprite doesn't exist, delete the orphaned file
# Orphaned file — no matching sprite in the registry
if sprite_id not in sprite_ids:
if not dry_run:
f.unlink()
unmatched.append({"file": f.name, "sprite_id": sprite_id, "variant_id": variant_id})
unmatched.append({"file": f.name, "sprite_id": sprite_id})
continue
if not dry_run:
# Insert a fresh variant record — let autoincrement assign the ID.
# Never force a specific ID from the filename: that caused silent data
# stomping when two sprites had files with the same trailing number.
self.conn.execute(
"""INSERT OR REPLACE INTO variants (id, sprite_id, seed, job_status, raw_path, created_at)
VALUES (?, ?, 0, 'completed', ?, ?)""",
(variant_id, sprite_id, raw_path, _now()),
"""INSERT INTO variants (sprite_id, seed, job_status, raw_path, created_at)
VALUES (?, 0, 'completed', ?, ?)""",
(sprite_id, raw_path, _now()),
)
self.conn.execute(
"UPDATE sprites SET status='review', updated_at=? "
@ -800,7 +856,7 @@ class SpriteRegistry:
"unparseable": len(unparseable),
"already_tracked": already_tracked,
"reconciled": reconciled,
"updated": updated,
"updated": 0,
"unmatched": len(unmatched),
"unmatched_details": unmatched[:20],
}
@ -866,7 +922,15 @@ class SpriteRegistry:
sprite_coverage: [{ sprite_id, entity_id, total_variants, processed, tier_counts: {name: {scored, passed}}, all_passed, deficit }]
recent_scores: [{ variant_id, sprite_id, scorer_name, gate_passed, confidence, scored_at }]
"""
scorer_names = ["qwen3", "haiku", "opus"]
with self._lock:
return self._get_pipeline_dashboard_locked()
def _get_pipeline_dashboard_locked(self) -> dict:
scorer_names: list[str] = [
r[0] for r in self.conn.execute(
"SELECT DISTINCT scorer_name FROM variant_scores ORDER BY scorer_name"
).fetchall()
] or ["qwen3", "haiku", "opus"] # fallback when DB has no scores yet
# ── Funnel ───────────────────────────────────────────────────────────
total_completed = self.conn.execute(