magicciv/tools/measure-turn-latency.py
Natalie 24dd3e3f39 feat(@projects/@magic-civilization): add new objectives and units data
Co-Authored-By: Lilith Autocommit <noreply@atlilith.com>
2026-04-24 16:15:36 -07:00

86 lines
2.4 KiB
Python

#!/usr/bin/env python3
"""
Measure per-turn latency from autoplay batch results.
Reads turn_stats.jsonl from all seeds in a batch directory.
Computes per-turn latency as delta(wall_clock_sec) between consecutive snapshots.
Emits p50, p90, p99 statistics.
Usage:
python3 tools/measure-turn-latency.py /path/to/batch/results
"""
import json
import sys
from pathlib import Path
from statistics import median, quantiles
from collections import defaultdict
def extract_latencies(batch_dir: Path) -> list[float]:
"""Extract per-turn latencies from all seed directories in a batch."""
latencies = []
# Find all turn_stats.jsonl files recursively
for stats_file in batch_dir.rglob("turn_stats.jsonl"):
with open(stats_file) as f:
snapshots = [json.loads(line) for line in f]
# Compute per-turn latency as delta(wall_clock_sec)
for i in range(1, len(snapshots)):
prev_sec = snapshots[i - 1]["wall_clock_sec"]
curr_sec = snapshots[i]["wall_clock_sec"]
turn_latency = curr_sec - prev_sec
latencies.append(turn_latency)
return latencies
def main():
if len(sys.argv) < 2:
print("Usage: measure-turn-latency.py <batch_results_dir>")
sys.exit(1)
batch_dir = Path(sys.argv[1])
if not batch_dir.is_dir():
print(f"Error: {batch_dir} not found")
sys.exit(1)
latencies = extract_latencies(batch_dir)
if not latencies:
print("Error: no turn_stats.jsonl files found in batch directory")
sys.exit(1)
# Sort for percentile calculation
latencies.sort()
n = len(latencies)
# Compute percentiles
p50 = median(latencies)
quantile_list = quantiles(latencies, n=100)
p90 = quantile_list[89] # 90th percentile (0-indexed, so 89)
p99 = quantile_list[98] # 99th percentile
min_lat = min(latencies)
max_lat = max(latencies)
mean_lat = sum(latencies) / n
print(f"Turn latency profile ({n} samples)")
print(f" p50: {p50:.4f} s")
print(f" p90: {p90:.4f} s")
print(f" p99: {p99:.4f} s")
print(f" min: {min_lat:.4f} s")
print(f" max: {max_lat:.4f} s")
print(f" mean: {mean_lat:.4f} s")
# Return exit code 0 if p99 < 1.0s
if p99 < 1.0:
print("\n✓ p99 ≤ 1.0s target met")
sys.exit(0)
else:
print(f"\n✗ p99 {p99:.4f}s exceeds 1.0s target")
sys.exit(1)
if __name__ == "__main__":
main()