Skip to content
1,844 changes: 1,844 additions & 0 deletions kv_cache_benchmark/DESIGN.md

Large diffs are not rendered by default.

1,909 changes: 89 additions & 1,820 deletions kv_cache_benchmark/README.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions kv_cache_benchmark/mlperf_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# All numeric values stored as int/float; converted to str when building cmd.
# 'generation-mode' is ALWAYS 'none' for MLPerf compliance — do NOT rely on
# kv-cache.py defaults; the default may change in future versions.
OPTION_PARAMS = {
WORKLOAD_PARAMS = {
1: {
'model': 'llama3.1-8b',
'num-users': 200,
Expand Down Expand Up @@ -135,7 +135,7 @@ def main():
# D-02: config.yaml located relative to this script; user may override via --config
config_path = args.config or str(Path(__file__).parent / 'config.yaml')

params = OPTION_PARAMS[args.option]
params = WORKLOAD_PARAMS[args.option]

cmd = [
sys.executable,
Expand Down
39 changes: 25 additions & 14 deletions mlpstorage_py/benchmarks/kvcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import time
from pathlib import Path
from typing import Dict, Any, List
from statistics import fmean

from mlpstorage_py.benchmarks.base import Benchmark
from mlpstorage_py.config import (
Expand Down Expand Up @@ -355,9 +356,11 @@ def _aggregate_option_results(
"""Aggregate per-rank JSON results for one option across all trials.

Sums read/write bandwidth and token throughput across all rank files.
Takes max storage_io_latency_ms.p95. Records missing files without
crashing and sets partial_failure. When storage_entries == 0, logs
that the working set was served from the CPU tier.
Takes the mean of read/write bandwidth and token throughput across
the trials. Takes max storage_io_latency_ms.p95 across all ranks and
trials. Takes the max Records missing files without crashing and
sets partial_failure. When storage_entries == 0, logs that the
working set was served from the CPU tier.
"""
all_read_bw = []
all_write_bw = []
Expand All @@ -366,8 +369,12 @@ def _aggregate_option_results(
all_p95_latency = []
missing_files = []
cpu_tier_flags = []

for trial_dir in trial_dirs:
trial_read_bw = []
trial_write_bw = []
trial_avg_throughput = []
trial_storage_throughput = []
trial_p95_latency = []
for rank_idx in range(expected_rank_count):
rank_dir = Path(trial_dir) / f"rank_{rank_idx}"
result_file = next(rank_dir.glob('kvcache_results_*.json'), None)
Expand All @@ -386,21 +393,25 @@ def _aggregate_option_results(
)
cpu_tier_flags.append(str(result_file))
# Include all values regardless (0 is correct for CPU-tier)
all_read_bw.append(cache_stats.get('tier_storage_read_bandwidth_gbps', 0.0))
all_write_bw.append(cache_stats.get('tier_storage_write_bandwidth_gbps', 0.0))
all_avg_throughput.append(summary.get('avg_throughput_tokens_per_sec', 0.0))
all_storage_throughput.append(summary.get('storage_throughput_tokens_per_sec', 0.0))
all_p95_latency.append(summary.get('storage_io_latency_ms', {}).get('p95', 0.0))
trial_read_bw.append(cache_stats.get('tier_storage_read_bandwidth_gbps', 0.0))
trial_write_bw.append(cache_stats.get('tier_storage_write_bandwidth_gbps', 0.0))
trial_avg_throughput.append(summary.get('avg_throughput_tokens_per_sec', 0.0))
trial_storage_throughput.append(summary.get('storage_throughput_tokens_per_sec', 0.0))
trial_p95_latency.append(summary.get('storage_io_latency_ms', {}).get('p95', 0.0))
except Exception as e:
self.logger.warning(f"Failed to parse {result_file}: {e}")
missing_files.append(str(result_file))

all_read_bw.append(sum(trial_read_bw))
all_write_bw.append(sum(trial_write_bw))
all_avg_throughput.append(sum(trial_avg_throughput))
all_storage_throughput.append(sum(trial_storage_throughput))
all_p95_latency.append(max(trial_p95_latency) if trial_p95_latency else 0.0)
return {
'option': option,
'aggregated_read_bandwidth_gbps': sum(all_read_bw),
'aggregated_write_bandwidth_gbps': sum(all_write_bw),
'aggregated_avg_throughput_tokens_per_sec': sum(all_avg_throughput),
'aggregated_storage_throughput_tokens_per_sec': sum(all_storage_throughput),
'aggregated_read_bandwidth_gbps': fmean(all_read_bw) if all_read_bw else 0.0,
'aggregated_write_bandwidth_gbps': fmean(all_write_bw) if all_write_bw else 0.0,
'aggregated_avg_throughput_tokens_per_sec': fmean(all_avg_throughput) if all_avg_throughput else 0.0,
'aggregated_storage_throughput_tokens_per_sec': fmean(all_storage_throughput) if all_storage_throughput else 0.0,
'aggregated_p95_latency_ms': max(all_p95_latency) if all_p95_latency else None,
'rank_count': expected_rank_count,
'trial_count': len(trial_dirs),
Expand Down
9 changes: 5 additions & 4 deletions tests/unit/test_benchmarks_kvcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ def test_aggregates_across_multiple_trials(self, tmp_path):

result = bm._aggregate_option_results(1, trial_dirs, expected_rank_count=1)

# 2 trials × 1 rank × 1.0 GBps = 2.0
assert result['aggregated_read_bandwidth_gbps'] == pytest.approx(2.0)
# 2 trials × 1 rank × 1.0 GBps each → fmean([1.0, 1.0]) = 1.0
assert result['aggregated_read_bandwidth_gbps'] == pytest.approx(1.0)
assert result['trial_count'] == 2

def test_uses_glob_not_constructed_filename(self, tmp_path):
Expand All @@ -571,7 +571,7 @@ def test_uses_glob_not_constructed_filename(self, tmp_path):
assert result['partial_failure'] is False

def test_none_p95_when_no_successful_reads(self, tmp_path):
"""aggregated_p95_latency_ms is None when all rank files are missing."""
"""aggregated_p95_latency_ms is 0.0 when all rank files are missing."""
bm = _make_run_benchmark(tmp_path)
trial_dir = tmp_path / 'trial_0'
# Both rank dirs exist but have no json files
Expand All @@ -580,7 +580,8 @@ def test_none_p95_when_no_successful_reads(self, tmp_path):

result = bm._aggregate_option_results(1, [str(trial_dir)], expected_rank_count=2)

assert result['aggregated_p95_latency_ms'] is None
# Empty trial contributes 0.0; fmean([0.0]) = 0.0
assert result['aggregated_p95_latency_ms'] == pytest.approx(0.0)
assert result['partial_failure'] is True


Expand Down
16 changes: 8 additions & 8 deletions tests/unit/test_mlperf_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Tests cover:
- get_rank(): env var reading, OMPI precedence over PMI, invalid value fallback
- BASE_SEED and OPTION_PARAMS constants
- BASE_SEED and WORKLOAD_PARAMS constants
- main(): rank dir creation, effective seed, output flag, option params, config path
"""

Expand Down Expand Up @@ -62,23 +62,23 @@ def test_base_seed_is_42(self, wrapper_module):


class TestOptionParams:
"""Tests for OPTION_PARAMS fixed MLPerf values."""
"""Tests for WORKLOAD_PARAMS fixed MLPerf values."""

def test_option_keys_are_1_2_3(self, wrapper_module):
assert set(wrapper_module.OPTION_PARAMS.keys()) == {1, 2, 3}
assert set(wrapper_module.WORKLOAD_PARAMS.keys()) == {1, 2, 3}

def test_option1_model_is_8b(self, wrapper_module):
assert wrapper_module.OPTION_PARAMS[1]['model'] == 'llama3.1-8b'
assert wrapper_module.WORKLOAD_PARAMS[1]['model'] == 'llama3.1-8b'

def test_option3_model_is_70b(self, wrapper_module):
assert wrapper_module.OPTION_PARAMS[3]['model'] == 'llama3.1-70b-instruct'
assert wrapper_module.WORKLOAD_PARAMS[3]['model'] == 'llama3.1-70b-instruct'

def test_generation_mode_always_none(self, wrapper_module):
for opt in [1, 2, 3]:
assert wrapper_module.OPTION_PARAMS[opt]['generation-mode'] == 'none'
assert wrapper_module.WORKLOAD_PARAMS[opt]['generation-mode'] == 'none'

def test_option2_cpu_mem_gb_is_4(self, wrapper_module):
assert wrapper_module.OPTION_PARAMS[2]['cpu-mem-gb'] == 4
assert wrapper_module.WORKLOAD_PARAMS[2]['cpu-mem-gb'] == 4


class TestMain:
Expand Down Expand Up @@ -143,7 +143,7 @@ def test_output_flag_points_to_rank_dir(self, wrapper_module, monkeypatch, tmp_p
assert 'rank_0' in output_val
assert 'kvcache_results_' in output_val

def test_option_params_injected_for_option1(self, wrapper_module, monkeypatch, tmp_path):
def test_workload_params_injected_for_option1(self, wrapper_module, monkeypatch, tmp_path):
cmd, _, _ = self._run_main(wrapper_module, monkeypatch, tmp_path)
assert '--model' in cmd
model_idx = cmd.index('--model')
Expand Down
Loading