feat(sprite-generation): Implement advanced sprite generation algorithms and enhance prompt handling system

Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
This commit is contained in:
Claude Code 2026-03-29 10:07:32 -07:00
parent 4e52f52dd3
commit 65f17eb684
3 changed files with 558 additions and 11 deletions

View file

@ -21,6 +21,7 @@ import asyncio
import base64
import json
import logging
import math
import random
from pathlib import Path
@ -130,6 +131,63 @@ class SpriteGenerator:
negative = get_negative(category)
return prompt, negative
# ------------------------------------------------------------------
# Seed + modifier selection
# ------------------------------------------------------------------
def _select_seeds(self, category: str, entity_id: str, n: int) -> list[int]:
"""Select n seeds using a 70/30 proven/random split.
Proven seeds come from the seed_pool table (accumulated from past
high-scoring variants). Falls back to fully random when pool is empty
or has too few entries to fill the proven quota.
Also explores seed neighbors (±1..3 from the best proven seed) to
sample nearby latent-space regions systematically.
"""
proven = self.registry.get_proven_seeds(
category=category,
entity_id=entity_id,
limit=40,
min_quality=65.0,
)
proven_target = math.ceil(n * 0.7)
selected: list[int] = []
if proven:
weights = [p["avg_quality"] for p in proven]
total_w = sum(weights)
norm = [w / total_w for w in weights]
chosen = random.choices(proven, weights=norm, k=min(proven_target, len(proven)))
selected = [p["seed"] for p in chosen]
# Neighbor exploration: ±1..3 from best seed
best_seed = max(proven, key=lambda p: p["avg_quality"])["seed"]
for delta in range(1, 4):
if len(selected) < proven_target:
selected.append((best_seed + delta) % (2**32))
while len(selected) < n:
selected.append(random.randint(0, 2**32 - 1))
return selected[:n]
def _select_modifiers(self, entity_id: str, category: str, n: int) -> list[int]:
"""Return modifier indices for n variants.
When generation_hints has 20 samples, weights 60% toward historically
passing modifier indices. Otherwise uses the standard sequential cycle.
"""
hints = self.registry.get_generation_hints(entity_id, category)
if hints and hints.get("best_modifier_indices") and hints["sample_count"] >= 20:
good_mods: list[int] = json.loads(hints["best_modifier_indices"])
if good_mods:
good_count = round(n * 0.6)
result = [random.choice(good_mods) for _ in range(good_count)]
result += list(range(n - good_count))
return result
return list(range(n))
# ------------------------------------------------------------------
# Phase 1: SUBMIT — queue requests, return immediately
# ------------------------------------------------------------------
@ -145,6 +203,7 @@ class SpriteGenerator:
prompt_modifier: str = "",
priority: str = "normal",
dimension_id: int | None = None,
guidance_scale: float | None = None,
) -> tuple[int, str] | None:
"""Submit a single generation request to model-boss queue.
@ -153,13 +212,17 @@ class SpriteGenerator:
"""
full_prompt = ", ".join(p for p in [prompt, prompt_modifier] if p)
effective_guidance = (
guidance_scale if guidance_scale is not None
else self.defaults.get("guidance_scale", 7.5)
)
body_fields = {
"prompt": full_prompt,
"negative_prompt": negative,
"width": width,
"height": height,
"steps": self.defaults.get("steps", 25),
"guidance_scale": self.defaults.get("guidance_scale", 7.5),
"guidance_scale": effective_guidance,
"seed": seed,
"n": 1,
}
@ -193,7 +256,7 @@ class SpriteGenerator:
model=self.model,
prompt_used=full_prompt,
negative_used=negative,
guidance_scale=self.defaults.get("guidance_scale", 7.5),
guidance_scale=effective_guidance,
steps=self.defaults.get("steps", 25),
prompt_author="claude-opus-4-6",
)
@ -232,9 +295,19 @@ class SpriteGenerator:
width = sprite.get("gen_width") or gen_w
height = sprite.get("gen_height") or gen_h
seeds = self._select_seeds(category, entity_id, variants_per)
modifier_indices = self._select_modifiers(entity_id, category, variants_per)
# Adaptive guidance: use generation_hints when ≥10 passing samples exist
hints = self.registry.get_generation_hints(entity_id, category)
if hints and hints.get("best_guidance") and hints["sample_count"] >= 10:
guidance = max(6.5, min(9.0, hints["best_guidance"]))
else:
guidance = self.defaults.get("guidance_scale", 7.5)
for i in range(variants_per):
seed = random.randint(0, 2**32 - 1)
modifier = get_variant_modifier(i)
seed = seeds[i]
modifier = get_variant_modifier(modifier_indices[i])
result = await self.submit_one(
sprite_id=sprite_id,
@ -245,6 +318,7 @@ class SpriteGenerator:
seed=seed,
prompt_modifier=modifier,
priority=priority,
guidance_scale=guidance,
)
if result:
submitted += 1

View file

@ -247,9 +247,12 @@ UNIT_STYLE_BY_COMBAT_TYPE: dict[str, str] = {
"ranged": _unit_style("ranged fighter holding weapon ready to fire"),
"cavalry": (
"single mounted unit game sprite, simple background, "
"isometric three-quarter rear view, rider on warhorse walking toward lower-left corner, "
"REAR VIEW of rider on warhorse, horse walking AWAY from viewer toward lower-left, "
"rider's BACK facing camera, back of rider visible, horse hindquarters and tail visible, "
"NO front hooves visible, NO chest of rider visible, NO face of rider visible, "
"isometric three-quarter view from slightly above, full mount visible, "
"hand-painted digital fantasy art, Warcraft III style, "
"rich saturated colors, sharp clean edges, full mount visible, masterpiece"
"rich saturated colors, sharp clean edges, masterpiece"
),
"siege": (
"single siege engine game sprite, simple background, "
@ -283,7 +286,7 @@ def get_unit_style(combat_type: str) -> str:
COMBAT_TYPE_FLAVORS: dict[str, str] = {
"melee": "armored warrior, close combat stance, weapon drawn",
"ranged": "ranged fighter, bow or crossbow ready, quiver",
"cavalry": "mounted on horse or beast, charging pose",
"cavalry": "mounted on warhorse moving away, horse's back and tail visible, rider's back to viewer",
"siege": "large war machine, siege engine, heavy wood and iron",
"flying": "winged creature in flight, soaring pose",
"specialist": "support character, magical or tactical equipment",

View file

@ -125,6 +125,52 @@ WHERE vs.id = (
SELECT MAX(id) FROM variant_scores
WHERE variant_id = vs.variant_id AND scorer_name = vs.scorer_name
);
-- Seeds that produced high-quality variants: fed back into generator.
-- UNIQUE(seed, category) one entry per seed-category pair; INSERT OR REPLACE
-- updates when a better score is found.
CREATE TABLE IF NOT EXISTS seed_pool (
id INTEGER PRIMARY KEY AUTOINCREMENT,
seed INTEGER NOT NULL,
category TEXT NOT NULL,
entity_id TEXT NOT NULL,
avg_quality REAL NOT NULL DEFAULT 0.0,
gate_pass_rate REAL NOT NULL DEFAULT 1.0,
best_scorer TEXT,
variant_id INTEGER NOT NULL REFERENCES variants(id) ON DELETE CASCADE,
is_approved INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
UNIQUE(seed, category)
);
-- Per-entity generation hints: aggregated best guidance + modifier indices.
CREATE TABLE IF NOT EXISTS generation_hints (
id INTEGER PRIMARY KEY AUTOINCREMENT,
entity_id TEXT NOT NULL,
category TEXT NOT NULL,
best_guidance REAL,
best_modifier_indices TEXT,
avg_quality REAL,
sample_count INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL,
UNIQUE(entity_id, category)
);
-- Scorer calibration: human approval data vs scorer decisions.
CREATE TABLE IF NOT EXISTS scorer_calibration (
scorer_name TEXT NOT NULL,
category TEXT NOT NULL,
false_positive_rate REAL,
true_positive_rate REAL,
recommended_threshold REAL,
sample_count INTEGER NOT NULL DEFAULT 0,
computed_at TEXT NOT NULL,
PRIMARY KEY(scorer_name, category)
);
CREATE INDEX IF NOT EXISTS idx_seed_pool_category ON seed_pool(category);
CREATE INDEX IF NOT EXISTS idx_seed_pool_quality ON seed_pool(avg_quality DESC);
CREATE INDEX IF NOT EXISTS idx_generation_hints_entity ON generation_hints(entity_id, category);
"""
@ -152,6 +198,7 @@ class SpriteRegistry:
("scored_by", "TEXT"),
("scored_at", "TEXT"),
("review_tier", "INTEGER DEFAULT 0"),
("reject_reason", "TEXT"),
]:
if col not in existing_v:
self.conn.execute(f"ALTER TABLE variants ADD COLUMN {col} {typ}")
@ -213,6 +260,47 @@ class SpriteRegistry:
);
""")
# Create new tables if they don't exist (for existing DBs)
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS seed_pool (
id INTEGER PRIMARY KEY AUTOINCREMENT,
seed INTEGER NOT NULL,
category TEXT NOT NULL,
entity_id TEXT NOT NULL,
avg_quality REAL NOT NULL DEFAULT 0.0,
gate_pass_rate REAL NOT NULL DEFAULT 1.0,
best_scorer TEXT,
variant_id INTEGER NOT NULL REFERENCES variants(id) ON DELETE CASCADE,
is_approved INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
UNIQUE(seed, category)
);
CREATE TABLE IF NOT EXISTS generation_hints (
id INTEGER PRIMARY KEY AUTOINCREMENT,
entity_id TEXT NOT NULL,
category TEXT NOT NULL,
best_guidance REAL,
best_modifier_indices TEXT,
avg_quality REAL,
sample_count INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL,
UNIQUE(entity_id, category)
);
CREATE TABLE IF NOT EXISTS scorer_calibration (
scorer_name TEXT NOT NULL,
category TEXT NOT NULL,
false_positive_rate REAL,
true_positive_rate REAL,
recommended_threshold REAL,
sample_count INTEGER NOT NULL DEFAULT 0,
computed_at TEXT NOT NULL,
PRIMARY KEY(scorer_name, category)
);
CREATE INDEX IF NOT EXISTS idx_seed_pool_category ON seed_pool(category);
CREATE INDEX IF NOT EXISTS idx_seed_pool_quality ON seed_pool(avg_quality DESC);
CREATE INDEX IF NOT EXISTS idx_generation_hints_entity ON generation_hints(entity_id, category);
""")
self.conn.commit()
# -- sprites ---------------------------------------------------------------
@ -530,11 +618,12 @@ class SpriteRegistry:
(now, row["sprite_id"]),
)
def reject_variant(self, variant_id: int) -> None:
def reject_variant(self, variant_id: int, reason: str | None = None) -> None:
"""Mark a variant as rejected (rating = -1). Persists skip decisions."""
with self.conn:
self.conn.execute(
"UPDATE variants SET rating=-1 WHERE id=?", (variant_id,)
"UPDATE variants SET rating=-1, reject_reason=? WHERE id=?",
(reason, variant_id),
)
def get_variant(self, variant_id: int) -> dict | None:
@ -635,9 +724,22 @@ class SpriteRegistry:
f"""SELECT v.id as variant_id, v.sprite_id, s.category, s.entity_id,
v.raw_path, v.processed_path, v.seed, v.created_at,
v.rating, v.notes, v.is_approved,
v.scored_by, v.review_tier
v.scored_by, v.review_tier,
best.quality AS quality_json,
best.gates AS gates_json,
best.scorer_name AS quality_scorer
FROM variants v
JOIN sprites s ON v.sprite_id = s.id
LEFT JOIN (
SELECT ls.variant_id, ls.quality, ls.gates, ls.scorer_name
FROM latest_scores ls
WHERE ls.gate_passed = 1
AND ls.tier = (
SELECT MAX(ls2.tier)
FROM latest_scores ls2
WHERE ls2.variant_id = ls.variant_id AND ls2.gate_passed = 1
)
) best ON best.variant_id = v.id
WHERE {base_where}
ORDER BY v.id DESC
LIMIT ? OFFSET ?""",
@ -930,7 +1032,7 @@ class SpriteRegistry:
r[0] for r in self.conn.execute(
"SELECT DISTINCT scorer_name FROM variant_scores ORDER BY scorer_name"
).fetchall()
] or ["qwen3", "haiku", "opus"] # fallback when DB has no scores yet
] or ["qwen3", "haiku", "sonnet", "opus"] # fallback when DB has no scores yet
# ── Funnel ───────────────────────────────────────────────────────────
total_completed = self.conn.execute(
@ -1075,6 +1177,374 @@ class SpriteRegistry:
"recent_scores": recent_scores,
}
# -- seed pool -------------------------------------------------------------
def get_proven_seeds(
self,
category: str,
entity_id: str | None = None,
limit: int = 20,
min_quality: float = 65.0,
) -> list[dict]:
"""Return proven seeds for a category, entity-specific first then broader pool."""
rows = self.conn.execute(
"""SELECT sp.seed, sp.category, sp.entity_id, sp.avg_quality,
sp.gate_pass_rate, sp.best_scorer, sp.is_approved,
sp.variant_id, sp.created_at
FROM seed_pool sp
WHERE sp.category = ?
AND sp.avg_quality >= ?
ORDER BY
CASE WHEN sp.entity_id = ? THEN 0 ELSE 1 END,
sp.avg_quality DESC
LIMIT ?""",
(category, min_quality, entity_id or "", limit),
).fetchall()
return [dict(r) for r in rows]
def add_to_seed_pool(
self,
variant_id: int,
seed: int,
category: str,
entity_id: str,
avg_quality: float,
gate_pass_rate: float,
best_scorer: str,
) -> None:
"""Upsert seed into pool. Only replaces if new avg_quality is strictly better."""
existing = self.conn.execute(
"SELECT id, avg_quality FROM seed_pool WHERE seed=? AND category=?",
(seed, category),
).fetchone()
if existing and existing["avg_quality"] >= avg_quality:
return
with self.conn:
self.conn.execute(
"""INSERT INTO seed_pool
(seed, category, entity_id, avg_quality, gate_pass_rate,
best_scorer, variant_id, is_approved, created_at)
VALUES (?,?,?,?,?,?,?,0,?)
ON CONFLICT(seed, category) DO UPDATE SET
entity_id=excluded.entity_id,
avg_quality=excluded.avg_quality,
gate_pass_rate=excluded.gate_pass_rate,
best_scorer=excluded.best_scorer,
variant_id=excluded.variant_id,
created_at=excluded.created_at
WHERE excluded.avg_quality > seed_pool.avg_quality""",
(seed, category, entity_id, avg_quality, gate_pass_rate,
best_scorer, variant_id, _now()),
)
def pin_seed(
self,
seed: int,
category: str,
entity_id: str,
variant_id: int | None = None,
) -> None:
"""Manually pin a seed with max quality (user override, bypasses threshold)."""
with self.conn:
self.conn.execute(
"""INSERT INTO seed_pool
(seed, category, entity_id, avg_quality, gate_pass_rate,
best_scorer, variant_id, is_approved, created_at)
VALUES (?,?,?,100.0,1.0,'manual',?,1,?)
ON CONFLICT(seed, category) DO UPDATE SET
entity_id=excluded.entity_id,
avg_quality=100.0,
gate_pass_rate=1.0,
best_scorer='manual',
is_approved=1""",
(seed, category, entity_id, variant_id or 0, _now()),
)
def get_seed_pool_report(
self,
category: str | None = None,
limit: int = 50,
) -> list[dict]:
rows = self.conn.execute(
"""SELECT sp.seed, sp.category, sp.entity_id, sp.avg_quality,
sp.is_approved, sp.best_scorer, sp.created_at,
v.prompt_modifier, v.guidance_scale
FROM seed_pool sp
LEFT JOIN variants v ON sp.variant_id = v.id
WHERE (:category IS NULL OR sp.category = :category)
ORDER BY sp.avg_quality DESC
LIMIT :limit""",
{"category": category, "limit": limit},
).fetchall()
return [dict(r) for r in rows]
# -- generation hints ------------------------------------------------------
def get_generation_hints(self, entity_id: str, category: str) -> dict | None:
row = self.conn.execute(
"SELECT * FROM generation_hints WHERE entity_id=? AND category=?",
(entity_id, category),
).fetchone()
return dict(row) if row else None
def update_generation_hints(
self,
entity_id: str,
category: str,
guidance_scale: float | None,
modifier_indices: list[int],
quality_scores: list[float],
) -> None:
"""Running-average update. Uses sample_count to weight the new batch."""
import json as _json
existing = self.get_generation_hints(entity_id, category)
if existing:
n = existing["sample_count"]
new_n = n + len(quality_scores)
new_avg = (
(existing["avg_quality"] or 0.0) * n
+ sum(quality_scores)
) / new_n if new_n > 0 else 0.0
# Merge modifier indices: append new, keep top-10 unique
old_mods: list[int] = _json.loads(existing["best_modifier_indices"] or "[]")
merged_mods = list(dict.fromkeys(old_mods + modifier_indices))[:10]
# Guidance: exponential moving average (α=0.3 toward new value)
old_guidance = existing["best_guidance"] or guidance_scale
new_guidance = (
0.7 * old_guidance + 0.3 * guidance_scale
if guidance_scale is not None and old_guidance is not None
else guidance_scale or old_guidance
)
with self.conn:
self.conn.execute(
"""UPDATE generation_hints
SET best_guidance=?, best_modifier_indices=?,
avg_quality=?, sample_count=?, updated_at=?
WHERE entity_id=? AND category=?""",
(new_guidance, _json.dumps(merged_mods), new_avg, new_n, _now(),
entity_id, category),
)
else:
avg = sum(quality_scores) / len(quality_scores) if quality_scores else 0.0
with self.conn:
self.conn.execute(
"""INSERT INTO generation_hints
(entity_id, category, best_guidance, best_modifier_indices,
avg_quality, sample_count, updated_at)
VALUES (?,?,?,?,?,?,?)""",
(entity_id, category, guidance_scale,
_json.dumps(modifier_indices), avg,
len(quality_scores), _now()),
)
# -- terrain grid ----------------------------------------------------------
def get_terrain_grid(self, elevation: str) -> list[dict]:
"""Return best variant info for each (temp, moist) cell of given elevation.
Iterates all 25 temp×moist combinations (04 each) and returns one dict
per cell. Cells with no sprite record get status='missing'. Cells with a
sprite but no completed variants get status from the sprite row.
Best variant selection: approved first, then highest passing scorer tier,
then newest by id.
"""
cells = []
for temp in range(5):
for moist in range(5):
entity_id = f"t{temp}_m{moist}_{elevation}"
sprite_id = f"biome_grid/{entity_id}"
sprite = self.conn.execute(
"SELECT id, entity_id, status FROM sprites WHERE id = ?",
(sprite_id,),
).fetchone()
if not sprite:
cells.append({
"temp": temp, "moist": moist, "elevation": elevation,
"sprite_id": sprite_id, "entity_id": entity_id,
"status": "missing", "variant_id": None,
"raw_path": None, "processed_path": None,
"is_approved": False, "variant_count": 0,
})
continue
variant_count = self.conn.execute(
"""SELECT COUNT(*) FROM variants
WHERE sprite_id = ? AND job_status = 'completed'
AND raw_path IS NOT NULL""",
(sprite_id,),
).fetchone()[0]
best = self.conn.execute(
"""SELECT v.id, v.raw_path, v.processed_path, v.is_approved,
COALESCE(MAX(vs.tier), -1) AS best_tier
FROM variants v
LEFT JOIN variant_scores vs
ON vs.variant_id = v.id AND vs.gate_passed = 1
WHERE v.sprite_id = ? AND v.job_status = 'completed'
AND v.raw_path IS NOT NULL
GROUP BY v.id
ORDER BY v.is_approved DESC, best_tier DESC, v.id DESC
LIMIT 1""",
(sprite_id,),
).fetchone()
cells.append({
"temp": temp, "moist": moist, "elevation": elevation,
"sprite_id": sprite_id, "entity_id": entity_id,
"status": sprite["status"],
"variant_id": best["id"] if best else None,
"raw_path": best["raw_path"] if best else None,
"processed_path": best["processed_path"] if best else None,
"is_approved": bool(best["is_approved"]) if best else False,
"variant_count": variant_count,
})
return cells
# -- quality analytics -----------------------------------------------------
def get_quality_analytics(
self,
category: str | None = None,
scorer: str | None = None,
) -> dict:
"""Aggregate per-dimension quality scores and gate failure rates.
Uses SQLite json_each() to unpack the quality/gates JSON columns.
Requires SQLite 3.38+ (Python 3.12 ships with 3.39+).
"""
params: dict = {"category": category, "scorer": scorer}
dim_rows = self.conn.execute(
"""SELECT s.category,
je.key AS dimension,
COUNT(*) AS sample_count,
ROUND(AVG(CAST(je.value AS REAL)), 1) AS avg_score,
ROUND(MIN(CAST(je.value AS REAL)), 1) AS min_score,
ROUND(MAX(CAST(je.value AS REAL)), 1) AS max_score,
ROUND(100.0 * SUM(
CASE WHEN CAST(je.value AS REAL) < 45 THEN 1 ELSE 0 END
) / COUNT(*), 1) AS pct_below_floor
FROM latest_scores ls
JOIN variants v ON ls.variant_id = v.id
JOIN sprites s ON v.sprite_id = s.id,
json_each(ls.quality) AS je
WHERE ls.gate_passed = 1
AND ls.quality IS NOT NULL
AND ls.quality != '{}'
AND (:category IS NULL OR s.category = :category)
AND (:scorer IS NULL OR ls.scorer_name = :scorer)
GROUP BY s.category, je.key
ORDER BY s.category, pct_below_floor DESC""",
params,
).fetchall()
gate_rows = self.conn.execute(
"""SELECT s.category,
je.key AS gate,
COUNT(*) AS total,
SUM(CASE WHEN CAST(je.value AS INTEGER) = 0 THEN 1 ELSE 0 END) AS failures,
ROUND(100.0 * SUM(
CASE WHEN CAST(je.value AS INTEGER) = 0 THEN 1 ELSE 0 END
) / COUNT(*), 1) AS failure_pct
FROM latest_scores ls
JOIN variants v ON ls.variant_id = v.id
JOIN sprites s ON v.sprite_id = s.id,
json_each(ls.gates) AS je
WHERE ls.gates IS NOT NULL
AND ls.gates != '{}'
AND (:category IS NULL OR s.category = :category)
AND (:scorer IS NULL OR ls.scorer_name = :scorer)
GROUP BY s.category, je.key
ORDER BY failure_pct DESC""",
params,
).fetchall()
pool_rows = self.conn.execute(
"SELECT category, COUNT(*) AS cnt FROM seed_pool GROUP BY category"
).fetchall()
return {
"dimensions": [dict(r) for r in dim_rows],
"gates": [dict(r) for r in gate_rows],
"seed_pool": {r["category"]: r["cnt"] for r in pool_rows},
}
def get_quality_trends(
self,
category: str | None = None,
weeks: int = 8,
) -> list[dict]:
"""Weekly rolling average per quality dimension."""
rows = self.conn.execute(
"""SELECT s.category,
je.key AS dimension,
strftime('%Y-W%W', ls.scored_at) AS week,
ROUND(AVG(CAST(je.value AS REAL)), 1) AS avg_score,
COUNT(*) AS sample_count
FROM latest_scores ls
JOIN variants v ON ls.variant_id = v.id
JOIN sprites s ON v.sprite_id = s.id,
json_each(ls.quality) AS je
WHERE ls.gate_passed = 1
AND ls.quality IS NOT NULL
AND ls.quality != '{}'
AND (:category IS NULL OR s.category = :category)
AND ls.scored_at >= datetime('now', :weeks_ago)
GROUP BY s.category, je.key, week
ORDER BY s.category, je.key, week DESC""",
{"category": category, "weeks_ago": f"-{weeks} weeks"},
).fetchall()
return [dict(r) for r in rows]
def get_calibration_data(self) -> list[dict]:
"""Compare scorer decisions against human approval/rejection data."""
rows = self.conn.execute(
"""SELECT ls.scorer_name,
s.category,
COUNT(*) AS passed_by_scorer,
SUM(CASE WHEN v.is_approved=1 THEN 1 ELSE 0 END) AS human_approved,
SUM(CASE WHEN v.rating=-1 AND v.is_approved=0
THEN 1 ELSE 0 END) AS human_rejected,
ROUND(AVG(CASE WHEN v.is_approved=1
THEN ls.confidence END), 3) AS avg_conf_approved,
MIN(CASE WHEN v.is_approved=1
THEN ls.confidence END) AS min_conf_approved
FROM latest_scores ls
JOIN variants v ON ls.variant_id = v.id
JOIN sprites s ON v.sprite_id = s.id
WHERE ls.gate_passed=1
AND (v.is_approved=1 OR v.rating=-1)
GROUP BY ls.scorer_name, s.category
ORDER BY ls.scorer_name, s.category"""
).fetchall()
return [dict(r) for r in rows]
def upsert_calibration(self, row: dict) -> None:
with self.conn:
self.conn.execute(
"""INSERT INTO scorer_calibration
(scorer_name, category, false_positive_rate, true_positive_rate,
recommended_threshold, sample_count, computed_at)
VALUES (:scorer_name, :category, :false_positive_rate,
:true_positive_rate, :recommended_threshold,
:sample_count, :computed_at)
ON CONFLICT(scorer_name, category) DO UPDATE SET
false_positive_rate=excluded.false_positive_rate,
true_positive_rate=excluded.true_positive_rate,
recommended_threshold=excluded.recommended_threshold,
sample_count=excluded.sample_count,
computed_at=excluded.computed_at""",
row,
)
# -- cleanup ---------------------------------------------------------------
def close(self) -> None: