Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions benchmarks/benchmark_lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ mkdir -p "$PYTHONPYCACHEPREFIX" 2>/dev/null || true

GPU_MONITOR_PID=""
GPU_METRICS_CSV="/workspace/gpu_metrics.csv"
export GPU_METRICS_CSV

# Start background GPU monitoring that logs metrics every second to CSV.
# Auto-detects NVIDIA (nvidia-smi) or AMD (amd-smi) GPUs.
Expand All @@ -32,6 +33,7 @@ start_gpu_monitor() {
done

GPU_METRICS_CSV="$output"
export GPU_METRICS_CSV

if command -v nvidia-smi &>/dev/null; then
nvidia-smi --query-gpu=timestamp,index,power.draw,temperature.gpu,clocks.current.sm,clocks.current.memory,utilization.gpu,utilization.memory \
Expand Down
6 changes: 6 additions & 0 deletions perf-changelog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3129,3 +3129,9 @@
description:
- "Add --use-chat-template to run_benchmark_serving so prompts are formatted with the Qwen chat template (matching the other Qwen MTP recipes)"
pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1555

- config-keys:
- qwen3.5-fp8-h200-sglang
description:
- "Validates measured-power aggregation pipeline (PR #1558). No config change. Entry intentionally kept past merge so run-sweep produces the first canonical agg JSON with avg_power_w + joules_per_output_token on main, seeding the dashboard's day-zero data."
pr-link: https://github.com/SemiAnalysisAI/InferenceX/pull/1558
293 changes: 293 additions & 0 deletions utils/aggregate_power.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
"""Aggregate measured GPU power from a vendor SMI CSV into the agg result JSON.

Reads a GPU-metrics CSV produced by `start_gpu_monitor` (nvidia-smi or amd-smi),
filters samples to the benchmark load window using start/end Unix timestamps
written by benchmark_serving.py, and patches two keys into the aggregated
result JSON consumed by InferenceX-app's ETL:

- avg_power_w: mean per-GPU power draw (W) during the load window
- joules_per_output_token: (avg_power_w * num_gpus * duration_s) / total_output_tokens

The ETL (`packages/db/src/etl/benchmark-mapper.ts`) auto-captures any numeric
field in the agg JSON into the `metrics` JSONB column, so no schema migration
is required.

Vendor schema detection is regex-based: any timestamp-like column + any column
whose name contains "power" (excluding "limit"/"cap"/"max") is picked up.
NVIDIA emits "power.draw [W]"; AMD's amd-smi varies by version. Both are
handled.

This script is best-effort. Missing or malformed CSV exits 0 without patching
so a monitoring hiccup never breaks the benchmark upload.
"""

from __future__ import annotations

import argparse
import csv
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from statistics import mean


_POWER_COL_RE = re.compile(r"power", re.IGNORECASE)
_POWER_EXCLUDE_RE = re.compile(r"limit|cap|max|min", re.IGNORECASE)
_TIMESTAMP_COL_RE = re.compile(r"time", re.IGNORECASE)
_GPU_INDEX_COL_RE = re.compile(r"^(index|gpu|gpu_id|gpu_index|card|device)$", re.IGNORECASE)
_NUMBER_RE = re.compile(r"-?\d+(?:\.\d+)?")


def _parse_timestamp(value: str) -> float | None:
"""Best-effort timestamp parse to Unix epoch seconds (local wall clock).

Handles the formats observed in practice:
- nvidia-smi: "2025/01/15 12:34:56.789" (local time, no TZ)
- amd-smi: ISO 8601 "2025-01-15T12:34:56.789" or epoch seconds
- Plain numeric epoch (int or float, s or ms)
"""
value = value.strip()
if not value:
return None
# Plain epoch number — accept both seconds and milliseconds.
if _NUMBER_RE.fullmatch(value):
n = float(value)
return n / 1000.0 if n > 1e12 else n
# nvidia-smi: "YYYY/MM/DD HH:MM:SS.ffffff"
for fmt in ("%Y/%m/%d %H:%M:%S.%f", "%Y/%m/%d %H:%M:%S"):
try:
return datetime.strptime(value, fmt).timestamp()
except ValueError:
pass
# ISO 8601 (amd-smi variants). fromisoformat tolerates 'T' or space separator
# in Python 3.11+; older versions need 'T'.
iso_value = value.replace(" ", "T", 1) if " " in value and "T" not in value else value
try:
dt = datetime.fromisoformat(iso_value)
except ValueError:
return None
if dt.tzinfo is None:
# Treat naive timestamps as local time (matches nvidia-smi convention).
return dt.timestamp()
return dt.astimezone(timezone.utc).timestamp()


def _parse_power(value: str) -> float | None:
"""Extract the first numeric value from a power cell.

nvidia-smi formats power as "412.34 W"; some configurations report
"[N/A]" when power capping is disabled. AMD reports a bare number.
"""
value = value.strip()
if not value or value.lower() in {"[n/a]", "n/a", "na"}:
return None
m = _NUMBER_RE.search(value)
if not m:
return None
try:
return float(m.group(0))
except ValueError:
return None


def _detect_columns(header: list[str]) -> tuple[str | None, str | None, str | None]:
"""Return (timestamp_col, power_col, gpu_index_col) from a CSV header.

Power column: contains "power" and not "limit"/"cap"/"max"/"min".
Timestamp column: contains "time".
GPU index column: optional — used to count distinct GPUs per sample.
"""
timestamp_col = next((c for c in header if _TIMESTAMP_COL_RE.search(c)), None)
power_col = next(
(c for c in header if _POWER_COL_RE.search(c) and not _POWER_EXCLUDE_RE.search(c)),
None,
)
gpu_col = next((c for c in header if _GPU_INDEX_COL_RE.match(c.strip())), None)
return timestamp_col, power_col, gpu_col


def aggregate_power(
csv_path: Path,
start_unix: float,
end_unix: float,
) -> tuple[float, int] | None:
"""Return (per_gpu_avg_power_w, num_gpus) for samples in [start, end].

Returns None if the CSV is missing, empty, has no detectable power column,
or no rows fall in the window.
"""
if not csv_path.is_file() or csv_path.stat().st_size == 0:
return None
if end_unix <= start_unix:
return None

try:
with csv_path.open("r", newline="", encoding="utf-8", errors="replace") as f:
reader = csv.DictReader(f, skipinitialspace=True)
header = [c.strip() for c in (reader.fieldnames or [])]
reader.fieldnames = header
timestamp_col, power_col, gpu_col = _detect_columns(header)
if not timestamp_col or not power_col:
return None

# Group power readings by sample timestamp so per-sample total power
# (sum across GPUs) is computed correctly even if rows are interleaved.
#
# per_sample_row_count is the structural divisor: it's incremented for
# every contributing row regardless of whether a GPU-index column was
# detected. per_sample_gpus / gpu_keys are only populated when gpu_col
# is present and provide the canonical num_gpus via distinct-id count.
# When gpu_col is absent (vendor schema variant whose header doesn't
# match _GPU_INDEX_COL_RE), we fall back to inferring num_gpus from
# the modal row count per timestamp — assuming one row per GPU per
# sample, which is what every SMI tool we've seen actually emits.
per_sample_total: dict[float, float] = {}
per_sample_row_count: dict[float, int] = {}
per_sample_gpus: dict[float, set[str]] = {}
gpu_keys: set[str] = set()

for row in reader:
ts_raw = (row.get(timestamp_col) or "").strip()
pw_raw = (row.get(power_col) or "").strip()
ts = _parse_timestamp(ts_raw)
pw = _parse_power(pw_raw)
if ts is None or pw is None:
continue
if ts < start_unix or ts > end_unix:
continue
# Bucket by sample timestamp (rounded to ms to absorb sub-ms drift).
bucket = round(ts, 3)
per_sample_total[bucket] = per_sample_total.get(bucket, 0.0) + pw
per_sample_row_count[bucket] = per_sample_row_count.get(bucket, 0) + 1
if gpu_col:
gpu_id = (row.get(gpu_col) or "").strip()
if gpu_id:
per_sample_gpus.setdefault(bucket, set()).add(gpu_id)
gpu_keys.add(gpu_id)
except (OSError, csv.Error):
return None

if not per_sample_total:
return None

# Per-sample divisor and overall num_gpus.
# - If a GPU column was detected, trust distinct GPU IDs (correct for any
# sampling pattern, including hot-swap or partial visibility).
# - Otherwise, infer from row count (one row per GPU per sample).
if gpu_col and gpu_keys:
num_gpus = len(gpu_keys)
per_sample_mean_per_gpu = [
total / max(len(per_sample_gpus.get(ts, ())), 1)
for ts, total in per_sample_total.items()
]
else:
num_gpus = max(per_sample_row_count.values())
per_sample_mean_per_gpu = [
total / per_sample_row_count[ts] for ts, total in per_sample_total.items()
]
return mean(per_sample_mean_per_gpu), num_gpus
Comment on lines +151 to +190
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Latent robustness bug in aggregate_power (utils/aggregate_power.py:141-168): when _detect_columns can't find a GPU index column (strict regex ^(index|gpu|gpu_id|gpu_index|card|device)$), every row collapses to gpu_id='0', causing avg_power_w to report system-total power (N × W) instead of per-GPU mean. Today's nvidia-smi (index) and amd-smi (gpu) headers match the regex so the in-tree pipeline is safe, but any future amd-smi schema variant (e.g. device_id, GPU ID) would silently break the headline new metric by a factor of N. Suggested fix: when gpu_col is None, infer num_gpus from the distinct row count per timestamp.

Extended reasoning...

What the bug is

In utils/aggregate_power.py, _detect_columns matches a GPU-index column using a strict regex anchored at both ends: r"^(index|gpu|gpu_id|gpu_index|card|device)$". When no header matches (gpu_col is None), the row-processing loop at line 153 falls back to a literal '0':

gpu_id = (row.get(gpu_col) or "0").strip() if gpu_col else "0"

Every row at every timestamp now gets bucketed under the single key '0'. Three downstream values are affected:

  1. per_sample_total[bucket] accumulates power across all N GPUs at that timestamp (e.g. 8 × 500 W = 4000)
  2. per_sample_gpus[bucket] = {'0'} — a single-element set, since every row contributes the same '0'
  3. gpu_keys = {'0'}num_gpus = max(1, 1) = 1

Then at line 166:

per_sample_mean_per_gpu = [total / max(len(per_sample_gpus[ts]), 1) for ts, total in per_sample_total.items()]

becomes total / 1 — the SUM, not the per-GPU mean. aggregate_power returns (N*W, 1) where it should return (W, N).

Step-by-step proof

CSV with 8 GPUs at 500 W, headers timestamp,device_id,power.draw [W] (device_id does NOT match ^(index|gpu|gpu_id|gpu_index|card|device)$ — note the trailing _id):

step value
_detect_columns returns ('timestamp', 'power.draw [W]', None)
Row 1 (ts=T, gpu 0, 500W) → gpu_id '0'
Row 2 (ts=T, gpu 1, 500W) → gpu_id '0'
…rows 3-8 all '0'
per_sample_total[T] 500+500+500+500+500+500+500+500 = 4000
per_sample_gpus[T] {'0'}
num_gpus max(len({'0'}), 1) = 1
per_sample_mean_per_gpu for T 4000 / 1 = 4000
Returned tuple (4000.0, 1)

run() then computes joules_per_output_token = 4000 × 1 × duration / tokens, which coincidentally equals the correct 500 × 8 × duration / tokens because num_gpus=1 cancels the inflation. But avg_power_w is written to the agg JSON as 4000 W — a value impossible for a single H100/H200 (TDP ~700 W) that will mislead the dashboard.

Why existing code doesn't prevent it

The regex ^(index|gpu|gpu_id|gpu_index|card|device)$ is anchored at both ends, so common alternatives like device_id, gpu_serial, GPU ID (with space), or slot all fail to match. The power-column regex by contrast is permissive (just r"power" with an excludelist) — the asymmetry means the function tolerates power-column name variation but not GPU-column variation.

Impact

In this PR's pipeline: zero. benchmarks/benchmark_lib.sh invokes nvidia-smi --query-gpu=timestamp,index,... (index matches) or amd-smi metric --csv (gpu matches), so the bug is latent. The submitter and all verifiers acknowledge this.

Outside this PR: the docstring of _parse_timestamp notes "amd-smi varies by version" — a future amd-smi release that renames gpu to gpu_id would still match, but a release that uses device_id or GPU ID would silently produce a factor-N error in the headline avg_power_w field. This is the worst kind of regression: the number stays plausible-looking (within an order of magnitude) but is systematically wrong.

Addressing the refutation

A refuting verifier correctly notes that (a) the CSV producer is in-tree, (b) the joules-per-token field stays correct due to the cancellation, and (c) this is defensive coding rather than an actionable production bug. All true today. The reason to flag it anyway is that avg_power_w is the standalone headline new metric introduced by this PR — it's surfaced directly to the dashboard, not just used as an intermediate. A silent factor-N error in a charted value is exactly the failure mode regex-based schema detection is supposed to avoid, and the fix is a one-liner.

Suggested fix

When gpu_col is None, count distinct rows per timestamp to recover num_gpus:

if gpu_col is None:
    # Each row at the same timestamp must be a distinct GPU.
    per_sample_gpus.setdefault(bucket, set()).add(str(per_sample_total.get(bucket, 0)))  # unique key per row

Or more cleanly, track row counts per bucket separately and use total / row_count as the per-GPU mean for that timestamp. Either approach keeps the per-GPU mean honest while the existing gpu_col-based path remains unchanged.



def _load_bench_window(bench_result_path: Path) -> tuple[float, float, float, int] | None:
"""Read (start_unix, end_unix, duration_s, total_output_tokens) from the raw bench JSON."""
try:
bench = json.loads(bench_result_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
start = bench.get("benchmark_start_time_unix")
end = bench.get("benchmark_end_time_unix")
duration = bench.get("duration")
total_output = bench.get("total_output_tokens")
if not all(isinstance(v, (int, float)) for v in (start, end, duration)):
return None
if not isinstance(total_output, int) or total_output <= 0:
return None
return float(start), float(end), float(duration), int(total_output)


def patch_agg_result(
agg_path: Path,
avg_power_w: float,
joules_per_output_token: float,
) -> None:
"""Read the agg JSON, add the two power keys, and write it back atomically."""
data = json.loads(agg_path.read_text(encoding="utf-8"))
data["avg_power_w"] = round(avg_power_w, 3)
data["joules_per_output_token"] = round(joules_per_output_token, 6)
tmp_path = agg_path.with_suffix(agg_path.suffix + ".tmp")
tmp_path.write_text(json.dumps(data, indent=2), encoding="utf-8")
tmp_path.replace(agg_path)


def run(csv_path: Path, bench_result: Path, agg_result: Path) -> int:
window = _load_bench_window(bench_result)
if window is None:
print(
f"[aggregate_power] No bench window in {bench_result} — skipping power aggregation",
file=sys.stderr,
)
return 0
start, end, duration, total_output = window

result = aggregate_power(csv_path, start, end)
if result is None:
print(
f"[aggregate_power] No usable power samples in {csv_path} for "
f"window [{start}, {end}] — skipping",
file=sys.stderr,
)
return 0
avg_power_w, num_gpus = result

# Joules consumed by the system during the bench window / output tokens.
joules_per_output_token = (avg_power_w * num_gpus * duration) / total_output

if not agg_result.is_file():
print(
f"[aggregate_power] Agg result {agg_result} missing — cannot patch",
file=sys.stderr,
)
return 0

try:
patch_agg_result(agg_result, avg_power_w, joules_per_output_token)
except (OSError, json.JSONDecodeError) as exc:
print(f"[aggregate_power] Failed to patch {agg_result}: {exc}", file=sys.stderr)
return 0

print(
f"[aggregate_power] avg_power_w={avg_power_w:.2f} (per GPU, n={num_gpus}) "
f"joules_per_output_token={joules_per_output_token:.4f} "
f"duration={duration:.1f}s output_tokens={total_output} -> {agg_result}"
)
return 0


def main() -> int:
parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
parser.add_argument(
"--csv",
type=Path,
default=Path("/workspace/gpu_metrics.csv"),
help="Path to gpu_metrics.csv from start_gpu_monitor (default: /workspace/gpu_metrics.csv)",
)
parser.add_argument(
"--bench-result",
type=Path,
required=True,
help="Path to the raw benchmark_serving.py result JSON (provides bench window + token counts)",
)
parser.add_argument(
"--agg-result",
type=Path,
required=True,
help="Path to the agg_<run>.json output of process_result.py (will be patched in place)",
)
args = parser.parse_args()
return run(args.csv, args.bench_result, args.agg_result)


if __name__ == "__main__":
sys.exit(main())
4 changes: 4 additions & 0 deletions utils/bench_serving/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ async def limited_request_func(request_func_input, pbar):
print("Starting main benchmark run...")

benchmark_start_time = time.perf_counter()
benchmark_start_time_unix = time.time()
tasks: List[asyncio.Task] = []
async for request in get_request(input_requests, request_rate, burstiness):
prompt, prompt_len, output_len, mm_content = request
Expand Down Expand Up @@ -615,6 +616,7 @@ async def limited_request_func(request_func_input, pbar):
pbar.close()

benchmark_duration = time.perf_counter() - benchmark_start_time
benchmark_end_time_unix = time.time()

metrics, actual_output_lens = calculate_metrics(
input_requests=input_requests,
Expand Down Expand Up @@ -645,6 +647,8 @@ async def limited_request_func(request_func_input, pbar):

result = {
"duration": benchmark_duration,
"benchmark_start_time_unix": benchmark_start_time_unix,
"benchmark_end_time_unix": benchmark_end_time_unix,
"completed": metrics.completed,
"total_input_tokens": metrics.total_input,
"total_output_tokens": metrics.total_output,
Expand Down
25 changes: 24 additions & 1 deletion utils/process_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,28 @@ def get_required_env_vars(required_vars):

print(json.dumps(data, indent=2))

with open(f'agg_{result_filename}.json', 'w') as f:
agg_path = Path(f'agg_{result_filename}.json')
with open(agg_path, 'w') as f:
json.dump(data, f, indent=2)

# Best-effort: patch measured power into the agg JSON. Never fails the run.
try:
from aggregate_power import run as _aggregate_power_run

_csv_candidates = [
os.environ.get('GPU_METRICS_CSV'),
'gpu_metrics.csv',
'/workspace/gpu_metrics.csv',
]
_csv_path = next(
(Path(p) for p in _csv_candidates if p and Path(p).is_file()),
None,
)
if _csv_path is not None:
_aggregate_power_run(
csv_path=_csv_path,
bench_result=Path(f'{result_filename}.json'),
agg_result=agg_path,
)
except Exception as exc: # noqa: BLE001 — never block on telemetry
print(f'[process_result] power aggregation skipped: {exc}', file=sys.stderr)
Loading