From 74512118ffc69813fe96eb1a7778d8435f5208c2 Mon Sep 17 00:00:00 2001 From: Aryan Date: Thu, 21 May 2026 16:40:36 -0700 Subject: [PATCH 1/5] feat(power): aggregate measured GPU power into agg result JSON MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds two new fields to agg_.json so the InferenceX-app dashboard can chart measured-energy metrics alongside the existing TDP-derived ones: - avg_power_w (mean per-GPU draw during the load window) - joules_per_output_token (avg_power_w * num_gpus * duration / total_output_tokens) How it works: 1. benchmark_serving.py now records benchmark_start_time_unix and benchmark_end_time_unix alongside the existing duration field so the aggregator knows exactly which slice of the long-running monitor CSV to read (the bracket-the-whole-job monitor includes server warmup and the optional eval phase, which would otherwise bias the average). 2. aggregate_power.py reads /workspace/gpu_metrics.csv (path overridable via GPU_METRICS_CSV, which benchmark_lib.sh now exports), detects the vendor schema by header regex (handles nvidia-smi "power.draw [W]" and amd-smi socket_power formats), filters samples to the bench window, and atomically patches the agg JSON. Best-effort: missing / empty / malformed CSV is logged to stderr and skipped without failing the run. 3. process_result.py invokes the aggregator right after writing the agg JSON — no workflow YAML change needed. The InferenceX-app ETL (benchmark-mapper.ts) auto-captures unknown numeric metrics into the metrics JSONB column, so no schema migration or downstream change is required for the data to land in the DB. A follow-up PR on InferenceX-app adds the two Y-axis options to the inference scatter chart. 26 unit tests covering NVIDIA + AMD CSV shapes, window filtering, multi-GPU per-sample aggregation, malformed-row resilience, missing files, division-by-zero guards, and atomic JSON patching. Co-Authored-By: Claude Opus 4.7 --- benchmarks/benchmark_lib.sh | 2 + utils/aggregate_power.py | 271 +++++++++++++++++ utils/bench_serving/benchmark_serving.py | 4 + utils/process_result.py | 25 +- utils/test_aggregate_power.py | 351 +++++++++++++++++++++++ 5 files changed, 652 insertions(+), 1 deletion(-) create mode 100644 utils/aggregate_power.py create mode 100644 utils/test_aggregate_power.py diff --git a/benchmarks/benchmark_lib.sh b/benchmarks/benchmark_lib.sh index cfd30cd04..f5e39b4cf 100644 --- a/benchmarks/benchmark_lib.sh +++ b/benchmarks/benchmark_lib.sh @@ -15,6 +15,7 @@ mkdir -p "$PYTHONPYCACHEPREFIX" 2>/dev/null || true GPU_MONITOR_PID="" GPU_METRICS_CSV="/workspace/gpu_metrics.csv" +export GPU_METRICS_CSV # Start background GPU monitoring that logs metrics every second to CSV. # Auto-detects NVIDIA (nvidia-smi) or AMD (amd-smi) GPUs. @@ -32,6 +33,7 @@ start_gpu_monitor() { done GPU_METRICS_CSV="$output" + export GPU_METRICS_CSV if command -v nvidia-smi &>/dev/null; then nvidia-smi --query-gpu=timestamp,index,power.draw,temperature.gpu,clocks.current.sm,clocks.current.memory,utilization.gpu,utilization.memory \ diff --git a/utils/aggregate_power.py b/utils/aggregate_power.py new file mode 100644 index 000000000..06f491480 --- /dev/null +++ b/utils/aggregate_power.py @@ -0,0 +1,271 @@ +"""Aggregate measured GPU power from a vendor SMI CSV into the agg result JSON. + +Reads a GPU-metrics CSV produced by `start_gpu_monitor` (nvidia-smi or amd-smi), +filters samples to the benchmark load window using start/end Unix timestamps +written by benchmark_serving.py, and patches two keys into the aggregated +result JSON consumed by InferenceX-app's ETL: + + - avg_power_w: mean per-GPU power draw (W) during the load window + - joules_per_output_token: (avg_power_w * num_gpus * duration_s) / total_output_tokens + +The ETL (`packages/db/src/etl/benchmark-mapper.ts`) auto-captures any numeric +field in the agg JSON into the `metrics` JSONB column, so no schema migration +is required. + +Vendor schema detection is regex-based: any timestamp-like column + any column +whose name contains "power" (excluding "limit"/"cap"/"max") is picked up. +NVIDIA emits "power.draw [W]"; AMD's amd-smi varies by version. Both are +handled. + +This script is best-effort. Missing or malformed CSV exits 0 without patching +so a monitoring hiccup never breaks the benchmark upload. +""" + +from __future__ import annotations + +import argparse +import csv +import json +import re +import sys +from datetime import datetime, timezone +from pathlib import Path +from statistics import mean + + +_POWER_COL_RE = re.compile(r"power", re.IGNORECASE) +_POWER_EXCLUDE_RE = re.compile(r"limit|cap|max|min", re.IGNORECASE) +_TIMESTAMP_COL_RE = re.compile(r"time", re.IGNORECASE) +_GPU_INDEX_COL_RE = re.compile(r"^(index|gpu|gpu_id|gpu_index|card|device)$", re.IGNORECASE) +_NUMBER_RE = re.compile(r"-?\d+(?:\.\d+)?") + + +def _parse_timestamp(value: str) -> float | None: + """Best-effort timestamp parse to Unix epoch seconds (local wall clock). + + Handles the formats observed in practice: + - nvidia-smi: "2025/01/15 12:34:56.789" (local time, no TZ) + - amd-smi: ISO 8601 "2025-01-15T12:34:56.789" or epoch seconds + - Plain numeric epoch (int or float, s or ms) + """ + value = value.strip() + if not value: + return None + # Plain epoch number — accept both seconds and milliseconds. + if _NUMBER_RE.fullmatch(value): + n = float(value) + return n / 1000.0 if n > 1e12 else n + # nvidia-smi: "YYYY/MM/DD HH:MM:SS.ffffff" + for fmt in ("%Y/%m/%d %H:%M:%S.%f", "%Y/%m/%d %H:%M:%S"): + try: + return datetime.strptime(value, fmt).timestamp() + except ValueError: + pass + # ISO 8601 (amd-smi variants). fromisoformat tolerates 'T' or space separator + # in Python 3.11+; older versions need 'T'. + iso_value = value.replace(" ", "T", 1) if " " in value and "T" not in value else value + try: + dt = datetime.fromisoformat(iso_value) + except ValueError: + return None + if dt.tzinfo is None: + # Treat naive timestamps as local time (matches nvidia-smi convention). + return dt.timestamp() + return dt.astimezone(timezone.utc).timestamp() + + +def _parse_power(value: str) -> float | None: + """Extract the first numeric value from a power cell. + + nvidia-smi formats power as "412.34 W"; some configurations report + "[N/A]" when power capping is disabled. AMD reports a bare number. + """ + value = value.strip() + if not value or value.lower() in {"[n/a]", "n/a", "na"}: + return None + m = _NUMBER_RE.search(value) + if not m: + return None + try: + return float(m.group(0)) + except ValueError: + return None + + +def _detect_columns(header: list[str]) -> tuple[str | None, str | None, str | None]: + """Return (timestamp_col, power_col, gpu_index_col) from a CSV header. + + Power column: contains "power" and not "limit"/"cap"/"max"/"min". + Timestamp column: contains "time". + GPU index column: optional — used to count distinct GPUs per sample. + """ + timestamp_col = next((c for c in header if _TIMESTAMP_COL_RE.search(c)), None) + power_col = next( + (c for c in header if _POWER_COL_RE.search(c) and not _POWER_EXCLUDE_RE.search(c)), + None, + ) + gpu_col = next((c for c in header if _GPU_INDEX_COL_RE.match(c.strip())), None) + return timestamp_col, power_col, gpu_col + + +def aggregate_power( + csv_path: Path, + start_unix: float, + end_unix: float, +) -> tuple[float, int] | None: + """Return (per_gpu_avg_power_w, num_gpus) for samples in [start, end]. + + Returns None if the CSV is missing, empty, has no detectable power column, + or no rows fall in the window. + """ + if not csv_path.is_file() or csv_path.stat().st_size == 0: + return None + if end_unix <= start_unix: + return None + + try: + with csv_path.open("r", newline="", encoding="utf-8", errors="replace") as f: + reader = csv.DictReader(f, skipinitialspace=True) + header = [c.strip() for c in (reader.fieldnames or [])] + reader.fieldnames = header + timestamp_col, power_col, gpu_col = _detect_columns(header) + if not timestamp_col or not power_col: + return None + + # Group power readings by sample timestamp so per-sample total power + # (sum across GPUs) is computed correctly even if rows are interleaved. + per_sample_total: dict[float, float] = {} + per_sample_gpus: dict[float, set[str]] = {} + gpu_keys: set[str] = set() + + for row in reader: + ts_raw = (row.get(timestamp_col) or "").strip() + pw_raw = (row.get(power_col) or "").strip() + ts = _parse_timestamp(ts_raw) + pw = _parse_power(pw_raw) + if ts is None or pw is None: + continue + if ts < start_unix or ts > end_unix: + continue + # Bucket by sample timestamp (rounded to ms to absorb sub-ms drift). + bucket = round(ts, 3) + per_sample_total[bucket] = per_sample_total.get(bucket, 0.0) + pw + gpu_id = (row.get(gpu_col) or "0").strip() if gpu_col else "0" + per_sample_gpus.setdefault(bucket, set()).add(gpu_id) + gpu_keys.add(gpu_id) + except (OSError, csv.Error): + return None + + if not per_sample_total: + return None + + # Number of distinct GPUs seen across the window. + num_gpus = max(len(gpu_keys), 1) + # Per-sample mean power per GPU = sum across GPUs at that timestamp / GPUs seen at that timestamp. + per_sample_mean_per_gpu = [ + total / max(len(per_sample_gpus[ts]), 1) for ts, total in per_sample_total.items() + ] + return mean(per_sample_mean_per_gpu), num_gpus + + +def _load_bench_window(bench_result_path: Path) -> tuple[float, float, float, int] | None: + """Read (start_unix, end_unix, duration_s, total_output_tokens) from the raw bench JSON.""" + try: + bench = json.loads(bench_result_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return None + start = bench.get("benchmark_start_time_unix") + end = bench.get("benchmark_end_time_unix") + duration = bench.get("duration") + total_output = bench.get("total_output_tokens") + if not all(isinstance(v, (int, float)) for v in (start, end, duration)): + return None + if not isinstance(total_output, int) or total_output <= 0: + return None + return float(start), float(end), float(duration), int(total_output) + + +def patch_agg_result( + agg_path: Path, + avg_power_w: float, + joules_per_output_token: float, +) -> None: + """Read the agg JSON, add the two power keys, and write it back atomically.""" + data = json.loads(agg_path.read_text(encoding="utf-8")) + data["avg_power_w"] = round(avg_power_w, 3) + data["joules_per_output_token"] = round(joules_per_output_token, 6) + tmp_path = agg_path.with_suffix(agg_path.suffix + ".tmp") + tmp_path.write_text(json.dumps(data, indent=2), encoding="utf-8") + tmp_path.replace(agg_path) + + +def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int: + window = _load_bench_window(bench_result) + if window is None: + print( + f"[aggregate_power] No bench window in {bench_result} — skipping power aggregation", + file=sys.stderr, + ) + return 0 + start, end, duration, total_output = window + + result = aggregate_power(csv_path, start, end) + if result is None: + print( + f"[aggregate_power] No usable power samples in {csv_path} for " + f"window [{start}, {end}] — skipping", + file=sys.stderr, + ) + return 0 + avg_power_w, num_gpus = result + + # Joules consumed by the system during the bench window / output tokens. + joules_per_output_token = (avg_power_w * num_gpus * duration) / total_output + + if not agg_result.is_file(): + print( + f"[aggregate_power] Agg result {agg_result} missing — cannot patch", + file=sys.stderr, + ) + return 0 + + try: + patch_agg_result(agg_result, avg_power_w, joules_per_output_token) + except (OSError, json.JSONDecodeError) as exc: + print(f"[aggregate_power] Failed to patch {agg_result}: {exc}", file=sys.stderr) + return 0 + + print( + f"[aggregate_power] avg_power_w={avg_power_w:.2f} (per GPU, n={num_gpus}) " + f"joules_per_output_token={joules_per_output_token:.4f} " + f"duration={duration:.1f}s output_tokens={total_output} -> {agg_result}" + ) + return 0 + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) + parser.add_argument( + "--csv", + type=Path, + default=Path("/workspace/gpu_metrics.csv"), + help="Path to gpu_metrics.csv from start_gpu_monitor (default: /workspace/gpu_metrics.csv)", + ) + parser.add_argument( + "--bench-result", + type=Path, + required=True, + help="Path to the raw benchmark_serving.py result JSON (provides bench window + token counts)", + ) + parser.add_argument( + "--agg-result", + type=Path, + required=True, + help="Path to the agg_.json output of process_result.py (will be patched in place)", + ) + args = parser.parse_args() + return run(args.csv, args.bench_result, args.agg_result) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/utils/bench_serving/benchmark_serving.py b/utils/bench_serving/benchmark_serving.py index 011b413ac..c88a7308f 100644 --- a/utils/bench_serving/benchmark_serving.py +++ b/utils/bench_serving/benchmark_serving.py @@ -572,6 +572,7 @@ async def limited_request_func(request_func_input, pbar): print("Starting main benchmark run...") benchmark_start_time = time.perf_counter() + benchmark_start_time_unix = time.time() tasks: List[asyncio.Task] = [] async for request in get_request(input_requests, request_rate, burstiness): prompt, prompt_len, output_len, mm_content = request @@ -615,6 +616,7 @@ async def limited_request_func(request_func_input, pbar): pbar.close() benchmark_duration = time.perf_counter() - benchmark_start_time + benchmark_end_time_unix = time.time() metrics, actual_output_lens = calculate_metrics( input_requests=input_requests, @@ -645,6 +647,8 @@ async def limited_request_func(request_func_input, pbar): result = { "duration": benchmark_duration, + "benchmark_start_time_unix": benchmark_start_time_unix, + "benchmark_end_time_unix": benchmark_end_time_unix, "completed": metrics.completed, "total_input_tokens": metrics.total_input, "total_output_tokens": metrics.total_output, diff --git a/utils/process_result.py b/utils/process_result.py index 4603287bc..5fb059473 100644 --- a/utils/process_result.py +++ b/utils/process_result.py @@ -133,5 +133,28 @@ def get_required_env_vars(required_vars): print(json.dumps(data, indent=2)) -with open(f'agg_{result_filename}.json', 'w') as f: +agg_path = Path(f'agg_{result_filename}.json') +with open(agg_path, 'w') as f: json.dump(data, f, indent=2) + +# Best-effort: patch measured power into the agg JSON. Never fails the run. +try: + from aggregate_power import run as _aggregate_power_run + + _csv_candidates = [ + os.environ.get('GPU_METRICS_CSV'), + 'gpu_metrics.csv', + '/workspace/gpu_metrics.csv', + ] + _csv_path = next( + (Path(p) for p in _csv_candidates if p and Path(p).is_file()), + None, + ) + if _csv_path is not None: + _aggregate_power_run( + csv_path=_csv_path, + bench_result=Path(f'{result_filename}.json'), + agg_result=agg_path, + ) +except Exception as exc: # noqa: BLE001 — never block on telemetry + print(f'[process_result] power aggregation skipped: {exc}', file=sys.stderr) diff --git a/utils/test_aggregate_power.py b/utils/test_aggregate_power.py new file mode 100644 index 000000000..d03ed51a3 --- /dev/null +++ b/utils/test_aggregate_power.py @@ -0,0 +1,351 @@ +"""Tests for aggregate_power.py. + +Covers: + - NVIDIA CSV (nvidia-smi --query-gpu format with "X W" power cells) + - AMD CSV (amd-smi --csv with ISO/epoch timestamps and bare numeric power) + - Window filtering (samples outside [start, end] are excluded) + - Multi-GPU per-sample aggregation (sum across GPUs at each timestamp, + then mean over samples — yields per-GPU mean) + - Missing / empty / malformed CSV: returns None, no exception + - End-to-end run(): patches agg JSON with avg_power_w + joules_per_output_token + - Missing bench window keys: skips gracefully without patching +""" +from __future__ import annotations + +import json +import sys +from datetime import datetime +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).parent)) + +from aggregate_power import ( # noqa: E402 + _detect_columns, + _parse_power, + _parse_timestamp, + aggregate_power, + patch_agg_result, + run, +) + + +def _nvidia_ts(epoch: float) -> str: + return datetime.fromtimestamp(epoch).strftime("%Y/%m/%d %H:%M:%S.%f") + + +def _write_nvidia_csv(path: Path, samples: list[tuple[float, int, float]]) -> None: + """samples: list of (epoch_seconds, gpu_index, power_watts).""" + lines = ["timestamp, index, power.draw [W], temperature.gpu"] + for ts, idx, pw in samples: + lines.append(f"{_nvidia_ts(ts)}, {idx}, {pw:.2f} W, 65") + path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + +def _write_amd_csv(path: Path, samples: list[tuple[float, int, float]]) -> None: + """AMD-style: ISO timestamp, bare numeric power.""" + lines = ["timestamp,gpu,socket_power,temperature"] + for ts, idx, pw in samples: + iso = datetime.fromtimestamp(ts).isoformat(timespec="milliseconds") + lines.append(f"{iso},{idx},{pw},65") + path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + +# --------------------------------------------------------------------------- # +# Column / cell parsers +# --------------------------------------------------------------------------- # + + +def test_detect_columns_nvidia(): + header = ["timestamp", "index", "power.draw [W]", "utilization.gpu"] + ts, pw, gpu = _detect_columns(header) + assert ts == "timestamp" + assert pw == "power.draw [W]" + assert gpu == "index" + + +def test_detect_columns_amd(): + header = ["timestamp", "gpu", "socket_power", "temperature"] + ts, pw, gpu = _detect_columns(header) + assert ts == "timestamp" + assert pw == "socket_power" + assert gpu == "gpu" + + +def test_detect_columns_excludes_power_limit(): + # power.limit must NOT be picked as the power column. + header = ["timestamp", "index", "power.limit [W]", "power.draw [W]"] + _, pw, _ = _detect_columns(header) + assert pw == "power.draw [W]" + + +def test_detect_columns_missing_power_returns_none(): + header = ["timestamp", "index", "temperature.gpu"] + _, pw, _ = _detect_columns(header) + assert pw is None + + +def test_parse_power_nvidia_with_units(): + assert _parse_power("412.34 W") == pytest.approx(412.34) + + +def test_parse_power_bare_number(): + assert _parse_power("412.34") == pytest.approx(412.34) + + +def test_parse_power_handles_na(): + assert _parse_power("[N/A]") is None + assert _parse_power("") is None + + +def test_parse_timestamp_nvidia_format(): + ts = _parse_timestamp("2025/01/15 12:34:56.789") + expected = datetime(2025, 1, 15, 12, 34, 56, 789_000).timestamp() + assert ts == pytest.approx(expected, abs=0.01) + + +def test_parse_timestamp_iso_format(): + ts = _parse_timestamp("2025-01-15T12:34:56.789") + expected = datetime(2025, 1, 15, 12, 34, 56, 789_000).timestamp() + assert ts == pytest.approx(expected, abs=0.01) + + +def test_parse_timestamp_epoch_seconds(): + assert _parse_timestamp("1736942096.789") == pytest.approx(1736942096.789) + + +def test_parse_timestamp_epoch_milliseconds(): + # Heuristic: values > 1e12 are treated as ms. + assert _parse_timestamp("1736942096789") == pytest.approx(1736942096.789) + + +def test_parse_timestamp_garbage_returns_none(): + assert _parse_timestamp("not-a-date") is None + assert _parse_timestamp("") is None + + +# --------------------------------------------------------------------------- # +# aggregate_power core +# --------------------------------------------------------------------------- # + + +def test_aggregate_power_nvidia_single_gpu(tmp_path: Path): + csv = tmp_path / "gpu_metrics.csv" + base = 1_700_000_000.0 + _write_nvidia_csv( + csv, + [ + (base + 1, 0, 400.0), + (base + 2, 0, 410.0), + (base + 3, 0, 420.0), + ], + ) + result = aggregate_power(csv, base, base + 10) + assert result is not None + avg_power, num_gpus = result + assert avg_power == pytest.approx(410.0) + assert num_gpus == 1 + + +def test_aggregate_power_nvidia_multi_gpu_sums_per_sample(tmp_path: Path): + """8 GPUs, each drawing 500W at each sample → per-GPU mean is 500W.""" + csv = tmp_path / "gpu_metrics.csv" + base = 1_700_000_000.0 + samples: list[tuple[float, int, float]] = [] + for sample_idx in range(3): + for gpu in range(8): + samples.append((base + sample_idx, gpu, 500.0)) + _write_nvidia_csv(csv, samples) + result = aggregate_power(csv, base, base + 10) + assert result is not None + avg_power, num_gpus = result + assert avg_power == pytest.approx(500.0) + assert num_gpus == 8 + + +def test_aggregate_power_window_filters_out_warmup_and_eval(tmp_path: Path): + """Samples before start and after end must be ignored.""" + csv = tmp_path / "gpu_metrics.csv" + base = 1_700_000_000.0 + _write_nvidia_csv( + csv, + [ + (base, 0, 100.0), # warmup — excluded + (base + 50, 0, 500.0), # bench window + (base + 60, 0, 500.0), # bench window + (base + 100, 0, 100.0), # eval phase — excluded + ], + ) + result = aggregate_power(csv, base + 45, base + 65) + assert result is not None + avg_power, _ = result + assert avg_power == pytest.approx(500.0) + + +def test_aggregate_power_amd_csv(tmp_path: Path): + csv = tmp_path / "gpu_metrics.csv" + base = 1_700_000_000.0 + _write_amd_csv( + csv, + [ + (base + 1, 0, 350.0), + (base + 1, 1, 355.0), + (base + 2, 0, 360.0), + (base + 2, 1, 365.0), + ], + ) + result = aggregate_power(csv, base, base + 10) + assert result is not None + avg_power, num_gpus = result + # per-sample mean per GPU: (350+355)/2=352.5, (360+365)/2=362.5 → mean=357.5 + assert avg_power == pytest.approx(357.5) + assert num_gpus == 2 + + +def test_aggregate_power_missing_csv_returns_none(tmp_path: Path): + csv = tmp_path / "absent.csv" + assert aggregate_power(csv, 0.0, 100.0) is None + + +def test_aggregate_power_empty_csv_returns_none(tmp_path: Path): + csv = tmp_path / "empty.csv" + csv.write_text("", encoding="utf-8") + assert aggregate_power(csv, 0.0, 100.0) is None + + +def test_aggregate_power_no_rows_in_window_returns_none(tmp_path: Path): + csv = tmp_path / "gpu_metrics.csv" + _write_nvidia_csv(csv, [(1_700_000_000.0, 0, 400.0)]) + # Window entirely before the only sample. + assert aggregate_power(csv, 1_500_000_000.0, 1_600_000_000.0) is None + + +def test_aggregate_power_skips_malformed_rows(tmp_path: Path): + csv = tmp_path / "gpu_metrics.csv" + base = 1_700_000_000.0 + content = ( + "timestamp, index, power.draw [W]\n" + f"{_nvidia_ts(base + 1)}, 0, 400 W\n" + f"garbage, 0, also-garbage\n" + f"{_nvidia_ts(base + 2)}, 0, [N/A]\n" + f"{_nvidia_ts(base + 3)}, 0, 420 W\n" + ) + csv.write_text(content, encoding="utf-8") + result = aggregate_power(csv, base, base + 10) + assert result is not None + avg_power, _ = result + # Only the two valid rows (400, 420) contribute. + assert avg_power == pytest.approx(410.0) + + +def test_aggregate_power_invalid_window_returns_none(tmp_path: Path): + csv = tmp_path / "gpu_metrics.csv" + _write_nvidia_csv(csv, [(1_700_000_000.0, 0, 400.0)]) + assert aggregate_power(csv, 100.0, 100.0) is None + assert aggregate_power(csv, 200.0, 100.0) is None + + +# --------------------------------------------------------------------------- # +# End-to-end run() — patching the agg JSON +# --------------------------------------------------------------------------- # + + +def _write_bench_result(path: Path, *, start: float, end: float, duration: float, total_output: int) -> None: + path.write_text( + json.dumps( + { + "benchmark_start_time_unix": start, + "benchmark_end_time_unix": end, + "duration": duration, + "total_output_tokens": total_output, + } + ), + encoding="utf-8", + ) + + +def test_run_patches_agg_with_power_and_joules(tmp_path: Path): + base = 1_700_000_000.0 + csv = tmp_path / "gpu_metrics.csv" + _write_nvidia_csv( + csv, + [ + (base + 1 + sample_idx, gpu, 500.0) + for sample_idx in range(2) + for gpu in range(8) + ], + ) + bench = tmp_path / "bench.json" + agg = tmp_path / "agg.json" + _write_bench_result(bench, start=base, end=base + 10, duration=10.0, total_output=20_000) + agg.write_text(json.dumps({"hw": "h200", "conc": 64}), encoding="utf-8") + + exit_code = run(csv, bench, agg) + assert exit_code == 0 + + patched = json.loads(agg.read_text()) + # Pre-existing fields preserved. + assert patched["hw"] == "h200" + assert patched["conc"] == 64 + # Power: 500W per GPU. + assert patched["avg_power_w"] == pytest.approx(500.0) + # J/output_token = 500W × 8 GPUs × 10s / 20_000 tokens = 2.0 + assert patched["joules_per_output_token"] == pytest.approx(2.0) + + +def test_run_skips_when_bench_window_missing(tmp_path: Path): + csv = tmp_path / "gpu_metrics.csv" + _write_nvidia_csv(csv, [(1_700_000_000.0, 0, 400.0)]) + bench = tmp_path / "bench.json" + bench.write_text(json.dumps({"duration": 10.0, "total_output_tokens": 1000}), encoding="utf-8") + agg = tmp_path / "agg.json" + agg.write_text(json.dumps({"hw": "h200"}), encoding="utf-8") + + exit_code = run(csv, bench, agg) + assert exit_code == 0 + + patched = json.loads(agg.read_text()) + assert "avg_power_w" not in patched + assert patched == {"hw": "h200"} + + +def test_run_skips_when_csv_missing(tmp_path: Path): + bench = tmp_path / "bench.json" + agg = tmp_path / "agg.json" + _write_bench_result(bench, start=0.0, end=10.0, duration=10.0, total_output=1000) + agg.write_text(json.dumps({"hw": "h200"}), encoding="utf-8") + + exit_code = run(tmp_path / "absent.csv", bench, agg) + assert exit_code == 0 + + patched = json.loads(agg.read_text()) + assert "avg_power_w" not in patched + + +def test_run_skips_when_total_output_tokens_zero(tmp_path: Path): + """Guards against division by zero on failed runs.""" + csv = tmp_path / "gpu_metrics.csv" + _write_nvidia_csv(csv, [(1_700_000_000.0, 0, 400.0)]) + bench = tmp_path / "bench.json" + _write_bench_result( + bench, start=1_700_000_000.0, end=1_700_000_010.0, duration=10.0, total_output=0 + ) + agg = tmp_path / "agg.json" + agg.write_text(json.dumps({"hw": "h200"}), encoding="utf-8") + + exit_code = run(csv, bench, agg) + assert exit_code == 0 + patched = json.loads(agg.read_text()) + assert "joules_per_output_token" not in patched + + +def test_patch_agg_result_is_atomic_via_tempfile(tmp_path: Path): + agg = tmp_path / "agg.json" + agg.write_text(json.dumps({"hw": "h200"}), encoding="utf-8") + patch_agg_result(agg, avg_power_w=400.0, joules_per_output_token=1.5) + data = json.loads(agg.read_text()) + assert data["avg_power_w"] == 400.0 + assert data["joules_per_output_token"] == 1.5 + # No .tmp leftover. + assert not (tmp_path / "agg.json.tmp").exists() From ef081cbf956252ced1b3a8146e35f07ebabfcdab Mon Sep 17 00:00:00 2001 From: Aryan Date: Thu, 21 May 2026 16:44:01 -0700 Subject: [PATCH 2/5] test(power): subprocess integration covering process_result + aggregator MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three new tests in TestPowerAggregationIntegration: - test_agg_json_gets_patched_with_power_and_joules: full pipeline. Stages a 1Hz nvidia-smi CSV with warmup/bench/eval phases, runs process_result.py as a subprocess with GPU_METRICS_CSV set, and verifies the agg JSON gets patched with avg_power_w (600W) and joules_per_output_token (9.6 J/tok = 600W * 8 GPUs * 60s / 30k tok). Warmup (100W) and eval (200W) samples must be excluded by the timestamp window — would otherwise bias the result downward. - test_missing_csv_does_not_break_process_result: production case for runs that ship without monitoring. process_result.py succeeds and writes the agg JSON sans power fields. - test_missing_bench_timestamps_does_not_patch: legacy bench JSON without benchmark_start_time_unix gracefully skips aggregation. Co-Authored-By: Claude Opus 4.7 --- utils/test_process_result.py | 116 +++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/utils/test_process_result.py b/utils/test_process_result.py index e3903c6e6..4037689ea 100644 --- a/utils/test_process_result.py +++ b/utils/test_process_result.py @@ -533,3 +533,119 @@ def test_conc_from_benchmark_result(self, tmp_path, single_node_env_vars): output_data = json.loads(result.stdout) assert output_data["conc"] == 128 + + +# ============================================================================= +# Integration: power aggregation patches the agg JSON +# ============================================================================= + +class TestPowerAggregationIntegration: + """End-to-end wiring: process_result.py invokes aggregate_power.py and + patches avg_power_w + joules_per_output_token into the agg JSON. + + Exercises the env-var path resolution (GPU_METRICS_CSV), the subprocess + boundary, and the best-effort try/except that wraps the aggregator call. + """ + + @staticmethod + def _write_nvidia_csv(path, start_unix, end_unix, watts_per_gpu=500.0, num_gpus=8): + """Stage a 1Hz nvidia-smi-style CSV bracketing the bench window with + warmup/eval phases that should be filtered out by the aggregator.""" + from datetime import datetime + + def ts(t): + return datetime.fromtimestamp(t).strftime("%Y/%m/%d %H:%M:%S.%f") + + lines = ["timestamp, index, power.draw [W], temperature.gpu"] + # 5s warmup at 100W (before start) — must be excluded. + for s in range(5): + for g in range(num_gpus): + lines.append(f"{ts(start_unix - 5 + s)}, {g}, 100.00 W, 50") + # Bench window samples at the requested wattage. + duration_s = int(end_unix - start_unix) + for s in range(duration_s + 1): + for g in range(num_gpus): + lines.append(f"{ts(start_unix + s)}, {g}, {watts_per_gpu:.2f} W, 75") + # 5s eval at 200W (after end) — must be excluded. + for s in range(5): + for g in range(num_gpus): + lines.append(f"{ts(end_unix + 1 + s)}, {g}, 200.00 W, 65") + path.write_text("\n".join(lines) + "\n") + + def test_agg_json_gets_patched_with_power_and_joules(self, tmp_path, single_node_env_vars): + """The full pipeline: process_result.py + aggregate_power.py.""" + start, end = 1_700_000_100.0, 1_700_000_160.0 # 60s bench window + csv_path = tmp_path / "gpu_metrics.csv" + self._write_nvidia_csv(csv_path, start, end, watts_per_gpu=600.0, num_gpus=8) + + benchmark_result = { + "model_id": "test-model", + "max_concurrency": 64, + "total_token_throughput": 1000.0, + "output_throughput": 500.0, + # Fields read by aggregate_power.py. + "benchmark_start_time_unix": start, + "benchmark_end_time_unix": end, + "duration": 60.0, + "total_output_tokens": 30_000, + } + env = {**single_node_env_vars, "GPU_METRICS_CSV": str(csv_path)} + + result = run_script(tmp_path, env, benchmark_result) + assert result.returncode == 0, f"Script failed: {result.stderr}" + + agg_path = tmp_path / "agg_benchmark_result.json" + assert agg_path.is_file() + patched = json.loads(agg_path.read_text()) + + # Pre-existing fields still present. + assert patched["hw"] == "mi300x" + assert patched["tp"] == 8 + assert patched["conc"] == 64 + # New power fields. + assert patched["avg_power_w"] == pytest.approx(600.0, abs=0.5) + # 600W × 8 GPUs × 60s / 30_000 tokens = 9.6 J/tok + assert patched["joules_per_output_token"] == pytest.approx(9.6, abs=0.05) + + def test_missing_csv_does_not_break_process_result(self, tmp_path, single_node_env_vars): + """Without GPU_METRICS_CSV (or with a missing file), process_result.py + still succeeds and writes the agg JSON — just without the power fields. + This is the production case for runs that ship without monitoring.""" + benchmark_result = { + "model_id": "test-model", + "max_concurrency": 64, + "total_token_throughput": 1000.0, + "output_throughput": 500.0, + } + + result = run_script(tmp_path, single_node_env_vars, benchmark_result) + assert result.returncode == 0, f"Script failed: {result.stderr}" + + agg_path = tmp_path / "agg_benchmark_result.json" + patched = json.loads(agg_path.read_text()) + assert "avg_power_w" not in patched + assert "joules_per_output_token" not in patched + + def test_missing_bench_timestamps_does_not_patch(self, tmp_path, single_node_env_vars): + """A CSV is present but the bench JSON predates the timestamp fields + (legacy benchmark_serving.py). Aggregator should skip silently.""" + start, end = 1_700_000_100.0, 1_700_000_160.0 + csv_path = tmp_path / "gpu_metrics.csv" + self._write_nvidia_csv(csv_path, start, end, watts_per_gpu=600.0, num_gpus=1) + + benchmark_result = { + "model_id": "test-model", + "max_concurrency": 64, + "total_token_throughput": 1000.0, + "output_throughput": 500.0, + # NOTE: deliberately missing benchmark_start_time_unix/end/total_output_tokens. + } + env = {**single_node_env_vars, "GPU_METRICS_CSV": str(csv_path)} + + result = run_script(tmp_path, env, benchmark_result) + assert result.returncode == 0, f"Script failed: {result.stderr}" + + agg_path = tmp_path / "agg_benchmark_result.json" + patched = json.loads(agg_path.read_text()) + assert "avg_power_w" not in patched + assert "joules_per_output_token" not in patched From 7c29bdeff28fbddefe99cbe2153298fce9b55713 Mon Sep 17 00:00:00 2001 From: Aryan Date: Fri, 22 May 2026 13:33:14 -0700 Subject: [PATCH 3/5] chore(perf-changelog): trigger sweep for measured-power aggregation Appends an entry listing qwen3.5-fp8-h200-sglang so run-sweep.yml fires when the sweep-enabled label is added to PR #1551. The sweep will produce the first agg_.json containing avg_power_w and joules_per_output_token, validating the aggregator end-to-end on real GPU hardware. Cheap single-node H200 config picked to minimize runner-pool contention. --- perf-changelog.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/perf-changelog.yaml b/perf-changelog.yaml index 208a2da6f..375571875 100644 --- a/perf-changelog.yaml +++ b/perf-changelog.yaml @@ -3129,3 +3129,9 @@ description: - "Add --use-chat-template to run_benchmark_serving so prompts are formatted with the Qwen chat template (matching the other Qwen MTP recipes)" pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1555 + +- config-keys: + - qwen3.5-fp8-h200-sglang + description: + - "Smoke run validating measured-power aggregation pipeline. No config change; entry exists to trigger a sweep that produces the first agg_.json with avg_power_w + joules_per_output_token populated by aggregate_power.py." + pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1558 From b33f2237a98cb8b732456a64f8225df5a886c480 Mon Sep 17 00:00:00 2001 From: Aryan Date: Fri, 22 May 2026 14:38:59 -0700 Subject: [PATCH 4/5] fix(aggregate_power): infer num_gpus from row count when GPU column absent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses review feedback on PR #1558. The original _detect_columns used a strict-anchored regex ^(index|gpu|gpu_id|gpu_index|card|device)$ for the GPU-index column while the power column uses a permissive r"power" match. That asymmetry meant a future SMI schema variant with header device_id, gpu_serial, or "GPU ID" would silently collapse every row to gpu_id="0", yielding system-total power instead of per-GPU mean. In-tree pipelines (nvidia-smi index, amd-smi gpu) match the regex and are unaffected — this is a latent bug, not a current-production one. But avg_power_w is the standalone headline new metric this PR adds; a future 4000W reading on the dashboard is the worst kind of regression (still plausible-looking, off by 8x). Fix: - Maintain per_sample_row_count independently of GPU-column detection. - When gpu_col is present, divisor stays len(per_sample_gpus[ts]) — same as before, behavior unchanged for the existing pipeline. - When gpu_col is absent, divisor is per_sample_row_count[ts] and num_gpus is the modal row count per sample. Both assume one row per GPU per sample, which is what every SMI tool we've encountered emits. New test (test_aggregate_power_no_gpu_column_infers_from_row_count) exercises a CSV header with device_id (regex miss) and asserts avg_power is the per-GPU mean (500W), not the system sum (2000W). Verified: - 27 unit tests pass (was 26) - 25 process_result tests pass (no change) --- utils/aggregate_power.py | 40 +++++++++++++++++++++++++++-------- utils/test_aggregate_power.py | 32 ++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/utils/aggregate_power.py b/utils/aggregate_power.py index 06f491480..bed1db33b 100644 --- a/utils/aggregate_power.py +++ b/utils/aggregate_power.py @@ -134,7 +134,17 @@ def aggregate_power( # Group power readings by sample timestamp so per-sample total power # (sum across GPUs) is computed correctly even if rows are interleaved. + # + # per_sample_row_count is the structural divisor: it's incremented for + # every contributing row regardless of whether a GPU-index column was + # detected. per_sample_gpus / gpu_keys are only populated when gpu_col + # is present and provide the canonical num_gpus via distinct-id count. + # When gpu_col is absent (vendor schema variant whose header doesn't + # match _GPU_INDEX_COL_RE), we fall back to inferring num_gpus from + # the modal row count per timestamp — assuming one row per GPU per + # sample, which is what every SMI tool we've seen actually emits. per_sample_total: dict[float, float] = {} + per_sample_row_count: dict[float, int] = {} per_sample_gpus: dict[float, set[str]] = {} gpu_keys: set[str] = set() @@ -150,21 +160,33 @@ def aggregate_power( # Bucket by sample timestamp (rounded to ms to absorb sub-ms drift). bucket = round(ts, 3) per_sample_total[bucket] = per_sample_total.get(bucket, 0.0) + pw - gpu_id = (row.get(gpu_col) or "0").strip() if gpu_col else "0" - per_sample_gpus.setdefault(bucket, set()).add(gpu_id) - gpu_keys.add(gpu_id) + per_sample_row_count[bucket] = per_sample_row_count.get(bucket, 0) + 1 + if gpu_col: + gpu_id = (row.get(gpu_col) or "").strip() + if gpu_id: + per_sample_gpus.setdefault(bucket, set()).add(gpu_id) + gpu_keys.add(gpu_id) except (OSError, csv.Error): return None if not per_sample_total: return None - # Number of distinct GPUs seen across the window. - num_gpus = max(len(gpu_keys), 1) - # Per-sample mean power per GPU = sum across GPUs at that timestamp / GPUs seen at that timestamp. - per_sample_mean_per_gpu = [ - total / max(len(per_sample_gpus[ts]), 1) for ts, total in per_sample_total.items() - ] + # Per-sample divisor and overall num_gpus. + # - If a GPU column was detected, trust distinct GPU IDs (correct for any + # sampling pattern, including hot-swap or partial visibility). + # - Otherwise, infer from row count (one row per GPU per sample). + if gpu_col and gpu_keys: + num_gpus = len(gpu_keys) + per_sample_mean_per_gpu = [ + total / max(len(per_sample_gpus.get(ts, ())), 1) + for ts, total in per_sample_total.items() + ] + else: + num_gpus = max(per_sample_row_count.values()) + per_sample_mean_per_gpu = [ + total / per_sample_row_count[ts] for ts, total in per_sample_total.items() + ] return mean(per_sample_mean_per_gpu), num_gpus diff --git a/utils/test_aggregate_power.py b/utils/test_aggregate_power.py index d03ed51a3..31e9fb116 100644 --- a/utils/test_aggregate_power.py +++ b/utils/test_aggregate_power.py @@ -203,6 +203,38 @@ def test_aggregate_power_amd_csv(tmp_path: Path): assert num_gpus == 2 +def test_aggregate_power_no_gpu_column_infers_from_row_count(tmp_path: Path): + """Schema-variant safety: a vendor CSV whose GPU column header doesn't + match _GPU_INDEX_COL_RE (e.g. 'device_id', 'GPU ID', 'slot') must still + yield per-GPU mean — not system-total — for avg_power_w. Pre-fix, + aggregate_power collapsed all rows to gpu_id='0' and returned the SUM.""" + csv = tmp_path / "gpu_metrics.csv" + base = 1_700_000_000.0 + # Schema with a GPU column the regex doesn't recognize ('device_id'). + lines = ["timestamp,device_id,power.draw [W]"] + from datetime import datetime + + def ts(t: float) -> str: + return datetime.fromtimestamp(t).strftime("%Y/%m/%d %H:%M:%S.%f") + + # 4 GPUs at 500W, 3 samples. + for s in range(3): + for gpu in range(4): + lines.append(f"{ts(base + s)},{gpu},500.00 W") + csv.write_text("\n".join(lines) + "\n", encoding="utf-8") + + result = aggregate_power(csv, base, base + 10) + assert result is not None + avg_power, num_gpus = result + # Without the fix: avg_power = 2000 (sum across 4 GPUs), num_gpus = 1. + # With the fix: avg_power = 500 (per-GPU mean), num_gpus = 4. + assert avg_power == pytest.approx(500.0), ( + f"avg_power_w should be per-GPU mean (500.0), got {avg_power} — " + "the no-gpu-column path is summing instead of averaging" + ) + assert num_gpus == 4, f"num_gpus should be inferred from row count (4), got {num_gpus}" + + def test_aggregate_power_missing_csv_returns_none(tmp_path: Path): csv = tmp_path / "absent.csv" assert aggregate_power(csv, 0.0, 100.0) is None From 49aa761da926513e016c7b918d88326c555d15a6 Mon Sep 17 00:00:00 2001 From: Aryan Date: Fri, 22 May 2026 14:51:04 -0700 Subject: [PATCH 5/5] docs(perf-changelog): clarify why measured-power entry is kept past merge --- perf-changelog.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/perf-changelog.yaml b/perf-changelog.yaml index 375571875..385efcbe2 100644 --- a/perf-changelog.yaml +++ b/perf-changelog.yaml @@ -3133,5 +3133,5 @@ - config-keys: - qwen3.5-fp8-h200-sglang description: - - "Smoke run validating measured-power aggregation pipeline. No config change; entry exists to trigger a sweep that produces the first agg_.json with avg_power_w + joules_per_output_token populated by aggregate_power.py." + - "Validates measured-power aggregation pipeline (PR #1558). No config change. Entry intentionally kept past merge so run-sweep produces the first canonical agg JSON with avg_power_w + joules_per_output_token on main, seeding the dashboard's day-zero data." pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1558