diff --git a/deployment/manual_dispatch_release.py b/deployment/manual_dispatch_release.py index 9f24380..e51b0d2 100644 --- a/deployment/manual_dispatch_release.py +++ b/deployment/manual_dispatch_release.py @@ -1063,7 +1063,17 @@ def _finalize_remote_staged_dir( + sh_quote(stage_dir_s) + " " + sh_quote(dst_dir_s) - + " && rm -rf \"$backup\"" + + " && " + + "if [ -n \"${backup:-}\" ] && [ -e \"$backup\" -o -L \"$backup\" ]; then " + + "rm -rf \"$backup\" || " + + "{ " + + "echo " + + sh_quote( + "[manual_dispatch_release] warning: failed to remove old staged backup; keep it for later cleanup: " + ) + + " \"$backup\" 1>&2; " + + "}; " + + "fi" ) ), ) diff --git a/deployment/tests/test_gen_bare_deploy_bash.py b/deployment/tests/test_gen_bare_deploy_bash.py index 21f11a6..299ba1c 100644 --- a/deployment/tests/test_gen_bare_deploy_bash.py +++ b/deployment/tests/test_gen_bare_deploy_bash.py @@ -64,6 +64,7 @@ def _build_checks(selected_test_id: Optional[str]) -> List[Tuple[str, Callable[[ ("bootstrap_start_reuses_already_present_selection", test_bootstrap_start_reuses_already_present_selection), ("bare_start_fails_when_child_exits_within_startup_window", test_bare_start_fails_when_child_exits_within_startup_window), ("pid_ready_check_requires_full_stable_window_after_first_child_observation", test_pid_ready_check_requires_full_stable_window_after_first_child_observation), + ("pid_ready_check_ignores_nested_selection_supervisor_children", test_pid_ready_check_ignores_nested_selection_supervisor_children), ("atomic_group_start_does_not_auto_stop_on_failure", test_atomic_group_start_does_not_auto_stop_on_failure), ("atomic_group_preserves_nested_heredoc_terminator", test_atomic_group_preserves_nested_heredoc_terminator), ("atomic_group_stop_script_is_shell_valid", test_atomic_group_stop_script_is_shell_valid), @@ -812,6 +813,142 @@ def _shutdown(_signum, _frame): print("PASS: test_pid_ready_check_requires_full_stable_window_after_first_child_observation") +def test_pid_ready_check_ignores_nested_selection_supervisor_children() -> None: + proc_lifecycle = _load_python_module( + module_name="test_proc_lifecycle_codegen_nested_supervisor_runtime", + path=DEPLOYMENT_DIR / "utils" / "proc_lifecycle_codegen.py", + ) + helpers = proc_lifecycle.render_bash_proc_lifecycle_funcs_pid_tree( + timeouts=proc_lifecycle.StopTimeouts(term_seconds=60, kill_seconds=10, supersede_seconds=30) + ) + with tempfile.TemporaryDirectory(prefix="test_proc_lifecycle_nested_supervisor_") as td: + tmpdir = Path(td) + shell_script = tmpdir / "probe.sh" + root_script = tmpdir / "root_supervisor.py" + child_script = tmpdir / "real_child.py" + nested_supervisor_script = tmpdir / "selection_supervisor.py" + + child_script.write_text( + textwrap.dedent( + """ + #!/usr/bin/env python3 + import signal + import time + + def _shutdown(_signum, _frame): + raise SystemExit(0) + + signal.signal(signal.SIGTERM, _shutdown) + signal.signal(signal.SIGINT, _shutdown) + + while True: + time.sleep(0.2) + """ + ).strip() + + "\n", + encoding="utf-8", + ) + nested_supervisor_script.write_text( + textwrap.dedent( + """ + #!/usr/bin/env python3 + import signal + import time + + def _shutdown(_signum, _frame): + raise SystemExit(0) + + signal.signal(signal.SIGTERM, _shutdown) + signal.signal(signal.SIGINT, _shutdown) + + while True: + time.sleep(0.2) + """ + ).strip() + + "\n", + encoding="utf-8", + ) + root_script.write_text( + textwrap.dedent( + f""" + #!/usr/bin/env python3 + import signal + import subprocess + import sys + import time + from pathlib import Path + + procs = [] + + def _shutdown(_signum, _frame): + for proc in procs: + if proc.poll() is None: + proc.terminate() + deadline = time.time() + 5 + for proc in procs: + if proc.poll() is None: + try: + proc.wait(timeout=max(0.0, deadline - time.time())) + except subprocess.TimeoutExpired: + proc.kill() + raise SystemExit(0) + + signal.signal(signal.SIGTERM, _shutdown) + signal.signal(signal.SIGINT, _shutdown) + + procs.append(subprocess.Popen([sys.executable, str(Path({str(child_script)!r}))])) + procs.append(subprocess.Popen([sys.executable, str(Path({str(nested_supervisor_script)!r}))])) + while True: + for proc in procs: + if proc.poll() is not None: + raise SystemExit(proc.returncode or 0) + time.sleep(0.2) + """ + ).strip() + + "\n", + encoding="utf-8", + ) + + shell_script.write_text( + textwrap.dedent( + f"""\ + #!/usr/bin/env bash + set -euo pipefail + {helpers} + python3 {shlex.quote(str(root_script))} & + root_pid="$!" + startup_deadline_seconds=6 + if ! wait_service_probably_ready_pid_tree "svc_plain" "$root_pid" 4 "$startup_deadline_seconds" "[test]"; then + wait_rc="$?" + kill "$root_pid" >/dev/null 2>&1 || true + wait "$root_pid" >/dev/null 2>&1 || true + exit "$wait_rc" + fi + kill "$root_pid" >/dev/null 2>&1 || true + wait "$root_pid" >/dev/null 2>&1 || true + exit 0 + """ + ), + encoding="utf-8", + ) + shell_script.chmod(0o755) + + result = subprocess.run( + ["bash", str(shell_script)], + check=False, + capture_output=True, + text=True, + cwd=str(DEPLOYMENT_DIR.parent), + timeout=20, + ) + assert result.returncode == 0, ( + f"expected startup gate success rc={result.returncode} stdout={result.stdout!r} stderr={result.stderr!r}" + ) + assert "multiple direct child pids" not in result.stdout, result.stdout + assert "probable-ready: ok" in result.stdout, result.stdout + print("PASS: test_pid_ready_check_ignores_nested_selection_supervisor_children") + + def test_atomic_group_preserves_nested_heredoc_terminator() -> None: with tempfile.TemporaryDirectory(prefix="test_gen_bare_deploy_bash_atomic_heredoc_") as td: tmpdir = Path(td) diff --git a/deployment/tests/test_manual_dispatch_release_test_rsc_contract.py b/deployment/tests/test_manual_dispatch_release_test_rsc_contract.py index d8148f4..1e2ca67 100644 --- a/deployment/tests/test_manual_dispatch_release_test_rsc_contract.py +++ b/deployment/tests/test_manual_dispatch_release_test_rsc_contract.py @@ -30,6 +30,33 @@ def _load_module(): class TestManualDispatchReleaseTestRscContract(unittest.TestCase): + def test_finalize_remote_staged_dir_keeps_backup_when_cleanup_fails(self) -> None: + captured: list[tuple[str | None, str]] = [] + + def _fake_check_call_bash_with_optional_password(*, password: str | None, cmd: str) -> None: + captured.append((password, cmd)) + + with mock.patch.object( + _DISPATCH, + "_check_call_bash_with_optional_password", + side_effect=_fake_check_call_bash_with_optional_password, + ): + _DISPATCH._finalize_remote_staged_dir( + stage_dir_s="/remote/.fluxon_release.stage.abc123", + dst_dir_s="/remote/fluxon_release", + ssh_user="root", + ip="203.0.113.7", + ssh_port=30245, + ssh_password=None, + ) + + self.assertEqual(len(captured), 1) + password, cmd = captured[0] + self.assertIsNone(password) + self.assertIn('mv \'"\'"\'/remote/.fluxon_release.stage.abc123\'"\'"\' \'"\'"\'/remote/fluxon_release\'"\'"\'', cmd) + self.assertIn('rm -rf "$backup" || {', cmd) + self.assertIn("[manual_dispatch_release] warning: failed to remove old staged backup; keep it for later cleanup:", cmd) + def test_deploy_and_profiles_dispatches_test_rsc_tree(self) -> None: with tempfile.TemporaryDirectory() as td: release_dir = Path(td) diff --git a/deployment/tests/test_start_test_bed_bootstrap_log.py b/deployment/tests/test_start_test_bed_bootstrap_log.py index 2bd6b00..01fffc6 100644 --- a/deployment/tests/test_start_test_bed_bootstrap_log.py +++ b/deployment/tests/test_start_test_bed_bootstrap_log.py @@ -353,6 +353,41 @@ def test_parse_cluster_nodes_accepts_local_execution_mode() -> None: print("PASS: test_parse_cluster_nodes_accepts_local_execution_mode") +def test_resolve_local_node_cfg_accepts_remote_only_controller_host() -> None: + module = _load_start_test_bed_module() + cluster_nodes = { + "gpu-a": { + "hostname": "gpu-a", + "ip": "10.233.111.42", + "hostworkdir": "/srv/gpu-a", + "execution_mode": "ssh", + "ssh_host": "116.238.240.2", + "ssh_user": "root", + "ssh_port": 30245, + }, + "gpu-b": { + "hostname": "gpu-b", + "ip": "10.233.114.86", + "hostworkdir": "/srv/gpu-b", + "execution_mode": "ssh", + "ssh_host": "116.238.240.2", + "ssh_user": "root", + "ssh_port": 31408, + }, + } + original_check_output = module.subprocess.check_output + try: + module.subprocess.check_output = lambda *args, **kwargs: "infra44-ThinkStation-PX\n" + resolved = module._resolve_local_node_cfg( + cluster_nodes, + controller_url="http://10.233.111.42:53180/r/ops/fluxon_gpu_monitor_remote", + ) + finally: + module.subprocess.check_output = original_check_output + assert resolved is cluster_nodes["gpu-a"], resolved + print("PASS: test_resolve_local_node_cfg_accepts_remote_only_controller_host") + + def test_run_bare_waves_treats_local_execution_mode_node_as_local() -> None: module = _load_start_test_bed_module() cluster_nodes = { @@ -444,6 +479,99 @@ def test_run_bare_waves_treats_local_execution_mode_node_as_local() -> None: print("PASS: test_run_bare_waves_treats_local_execution_mode_node_as_local") +def test_run_bare_waves_stops_legacy_plain_services_before_atomic_launch() -> None: + module = _load_start_test_bed_module() + cluster_nodes = { + "logic-a": { + "hostname": "logic-a", + "ip": "127.0.0.1", + "hostworkdir": "/tmp/logic-a", + "execution_mode": "local", + "ssh_user": "tester", + "ssh_port": 22, + }, + } + deployconf = { + "name_prefix": "fluxon-testbed", + "service": { + "master": {"node_bind": {"node": ["logic-a"]}}, + "owner": {"node_bind": {"node": ["logic-a"]}}, + "ops_controller": {"node_bind": {"node": ["logic-a"]}}, + "ops_agent": {"node_bind": {"node": ["logic-a"]}}, + }, + "atomic_groups": { + "fluxon_core_controller": { + "phase": 1, + "nodes": ["logic-a"], + "services": ["master", "owner", "ops_controller", "ops_agent"], + } + }, + } + stop_calls: list[str] = [] + spawn_calls: list[str] = [] + original_run_local_stop = module._run_local_stop + original_spawn_local = module._spawn_local_start + original_join = module._join_bare_launch + original_collect = module._collect_bare_runtime_statuses + original_bare_script_name = module._selection_bare_script_name + original_service_names = module._selection_service_names_for_target_node + original_log_path = module._bare_wave_bootstrap_log_path + try: + module._run_local_stop = lambda *, local_node_cfg, service_name: stop_calls.append( + f"{local_node_cfg['hostname']}:{service_name}" + ) + module._spawn_local_start = lambda **kwargs: spawn_calls.append(kwargs["selection_name"]) or { + "mode": "local", + "node_name": kwargs["local_node_cfg"]["hostname"], + "selection_name": kwargs["selection_name"], + "bare_script_name": kwargs["bare_script_name"], + "bootstrap_log_path": kwargs["bootstrap_log_path"], + "expected_service_names": kwargs["expected_service_names"], + "launch_error": None, + "launcher_rc": 0, + "runtime_statuses": [], + } + module._join_bare_launch = lambda result: None + module._collect_bare_runtime_statuses = lambda **kwargs: [] + module._selection_bare_script_name = lambda **kwargs: "fluxon_core_controller" + module._selection_service_names_for_target_node = ( + lambda **kwargs: ["master", "owner", "ops_controller", "ops_agent"] + ) + module._bare_wave_bootstrap_log_path = ( + lambda **kwargs: Path("/tmp") / f"{kwargs['node_name']}_{kwargs['selection_name']}.log" + ) + module._run_bare_waves( + workdir=Path("/tmp"), + deployconf=deployconf, + cluster_nodes=cluster_nodes, + local_node_cfg=cluster_nodes["logic-a"], + waves=[ + { + "launches": [ + {"node": "logic-a", "selection_name": "fluxon_core_controller"}, + ] + } + ], + bootstrap_bare_services=set(), + ) + finally: + module._run_local_stop = original_run_local_stop + module._spawn_local_start = original_spawn_local + module._join_bare_launch = original_join + module._collect_bare_runtime_statuses = original_collect + module._selection_bare_script_name = original_bare_script_name + module._selection_service_names_for_target_node = original_service_names + module._bare_wave_bootstrap_log_path = original_log_path + assert stop_calls == [ + "logic-a:master", + "logic-a:owner", + "logic-a:ops_controller", + "logic-a:ops_agent", + ], stop_calls + assert spawn_calls == ["fluxon_core_controller"], spawn_calls + print("PASS: test_run_bare_waves_stops_legacy_plain_services_before_atomic_launch") + + def test_local_coverage_bootstrap_excludes_duplicate_local_control_plane_selection() -> None: module = _load_start_test_bed_module() deployconf = { @@ -1094,7 +1222,7 @@ def test_bare_then_apply_success_path_does_not_run_post_apply_stop() -> None: encoding="utf-8", ) - original_read_local_release_manifest_sha256 = module._read_local_release_manifest_sha256 + original_read_release_manifest_sha256 = module._read_release_manifest_sha256 original_with_release_manifest_sha256_env = module._with_release_manifest_sha256_env original_generate_daemonset_artifacts = module._generate_daemonset_artifacts original_refresh_cluster_bare_deploy_scripts = module._refresh_cluster_bare_deploy_scripts @@ -1120,7 +1248,7 @@ def test_bare_then_apply_success_path_does_not_run_post_apply_stop() -> None: call_sequence: list[str] = [] try: - module._read_local_release_manifest_sha256 = lambda **_: "sha256" + module._read_release_manifest_sha256 = lambda **_: "sha256" module._with_release_manifest_sha256_env = lambda **kwargs: kwargs["deployconf"] module._generate_daemonset_artifacts = lambda **_: None module._refresh_cluster_bare_deploy_scripts = lambda **_: None @@ -1194,7 +1322,7 @@ def _fail_remote_stop(*, node_name: str, service_name: str, **_: object) -> None finally: sys.argv = original_argv finally: - module._read_local_release_manifest_sha256 = original_read_local_release_manifest_sha256 + module._read_release_manifest_sha256 = original_read_release_manifest_sha256 module._with_release_manifest_sha256_env = original_with_release_manifest_sha256_env module._generate_daemonset_artifacts = original_generate_daemonset_artifacts module._refresh_cluster_bare_deploy_scripts = original_refresh_cluster_bare_deploy_scripts @@ -1321,7 +1449,7 @@ def test_bare_only_stops_after_controller_ready() -> None: encoding="utf-8", ) - original_read_local_release_manifest_sha256 = module._read_local_release_manifest_sha256 + original_read_release_manifest_sha256 = module._read_release_manifest_sha256 original_with_release_manifest_sha256_env = module._with_release_manifest_sha256_env original_generate_daemonset_artifacts = module._generate_daemonset_artifacts original_refresh_cluster_bare_deploy_scripts = module._refresh_cluster_bare_deploy_scripts @@ -1339,7 +1467,7 @@ def test_bare_only_stops_after_controller_ready() -> None: run_calls: list[tuple[str, object]] = [] try: - module._read_local_release_manifest_sha256 = lambda **_: "sha256" + module._read_release_manifest_sha256 = lambda **_: "sha256" module._with_release_manifest_sha256_env = lambda **kwargs: kwargs["deployconf"] module._generate_daemonset_artifacts = lambda **_: run_calls.append(("generate", None)) module._refresh_cluster_bare_deploy_scripts = lambda **_: run_calls.append(("refresh_bare", None)) @@ -1390,7 +1518,7 @@ def _fail_deploy_or_wait(**_: object) -> dict[str, str]: finally: sys.argv = original_argv finally: - module._read_local_release_manifest_sha256 = original_read_local_release_manifest_sha256 + module._read_release_manifest_sha256 = original_read_release_manifest_sha256 module._with_release_manifest_sha256_env = original_with_release_manifest_sha256_env module._generate_daemonset_artifacts = original_generate_daemonset_artifacts module._refresh_cluster_bare_deploy_scripts = original_refresh_cluster_bare_deploy_scripts @@ -1518,11 +1646,19 @@ def main() -> int: "run_bare_waves_treats_local_execution_mode_node_as_local", test_run_bare_waves_treats_local_execution_mode_node_as_local, ), + ( + "run_bare_waves_stops_legacy_plain_services_before_atomic_launch", + test_run_bare_waves_stops_legacy_plain_services_before_atomic_launch, + ), ( "local_coverage_bootstrap_excludes_duplicate_local_control_plane_selection", test_local_coverage_bootstrap_excludes_duplicate_local_control_plane_selection, ), ("parse_test_runner_ui_config_resolves_paths", test_parse_test_runner_ui_config_resolves_paths), + ( + "resolve_local_node_cfg_accepts_remote_only_controller_host", + test_resolve_local_node_cfg_accepts_remote_only_controller_host, + ), ( "normalize_bootstrap_deployconf_strips_legacy_master_p2p_listen_port", test_normalize_bootstrap_deployconf_strips_legacy_master_p2p_listen_port, diff --git a/deployment/utils/proc_lifecycle_codegen.py b/deployment/utils/proc_lifecycle_codegen.py index 197829f..d93ece6 100644 --- a/deployment/utils/proc_lifecycle_codegen.py +++ b/deployment/utils/proc_lifecycle_codegen.py @@ -164,6 +164,52 @@ def render_bash_proc_lifecycle_funcs_pid_tree(*, timeouts: StopTimeouts) -> str: ' }} +_cmdline_contains_selection_supervisor_entry() {{ + pid="$1" + if [[ ! "$pid" =~ ^[0-9]+$ ]]; then + return 1 + fi + cmdline_path="/proc/$pid/cmdline" + if [ ! -r "$cmdline_path" ]; then + return 1 + fi + tr '\\0' '\\n' < "$cmdline_path" 2>/dev/null | awk ' + {{ + if ($0 ~ /(^|\\/)selection_supervisor\\.py$/) {{ + found=1; + }} + }} + END {{ + exit(found ? 0 : 1); + }} + ' +}} + +_pid_tree_ready_candidate_child_pids() {{ + root_pid="$1" + if [[ ! "$root_pid" =~ ^[0-9]+$ ]]; then + return 1 + fi + if ! _pid_exists "$root_pid"; then + return 1 + fi + + direct_child_pids="$(_pid_tree_direct_child_pids "$root_pid" 2>/dev/null || true)" + if [ -z "$direct_child_pids" ]; then + echo "" + return 0 + fi + + ready_child_pids="" + for child_pid in $direct_child_pids; do + if _cmdline_contains_selection_supervisor_entry "$child_pid"; then + continue + fi + ready_child_pids="$ready_child_pids $child_pid" + done + echo "${{ready_child_pids# }}" +}} + _now_monotonic_ms() {{ python3 - <<'__FLUXON_MONOTONIC_MS__' import time @@ -206,7 +252,7 @@ def render_bash_proc_lifecycle_funcs_pid_tree(*, timeouts: StopTimeouts) -> str: return 1 fi - current_child_pids="$(_pid_tree_direct_child_pids "$root_pid" 2>/dev/null || true)" + current_child_pids="$(_pid_tree_ready_candidate_child_pids "$root_pid" 2>/dev/null || true)" current_child_pid="" if [ -n "$current_child_pids" ]; then set -- $current_child_pids diff --git "a/fluxon_doc_cn/design/observ_0_KV\350\265\204\346\272\220\347\233\221\346\216\247\351\235\242\346\235\277\344\270\216Metric_Trends.md" "b/fluxon_doc_cn/design/observ_0_KV\350\265\204\346\272\220\347\233\221\346\216\247\351\235\242\346\235\277\344\270\216Metric_Trends.md" new file mode 100644 index 0000000..9a6d545 --- /dev/null +++ "b/fluxon_doc_cn/design/observ_0_KV\350\265\204\346\272\220\347\233\221\346\216\247\351\235\242\346\235\277\344\270\216Metric_Trends.md" @@ -0,0 +1,138 @@ +# Observ 设计 0 - 监控面板 Metric Trends + +## 0. 总起 + +本文只定义 KV 监控面板里的 `Metric Trends` 区域。对应页面模板在 `fluxon_rs/fluxon_cli/templates/monitor_table.html`,页面上显示为: + +```html +Metric Trends (KV aggregate + member drill-down) +``` + +稳定结论: + +- `Metric Trends` 是 KV 面板里的趋势图区域,负责展示聚合指标卡片和 per owner 展开视图。 +- 指标卡片支持多线折线图。容量型指标必须把用量和总量放在同一张图里。 +- 用户可以多选指标卡片,同时展开多个 owner drilldown block。 +- 折线 hover 时必须显示 tooltip、垂直辅助线和折线上的对齐辅助点。 +- 周期刷新必须复用已有 DOM,避免页面跳动、展开状态丢失和 hover 中断。 + +## 1. 区域结构 + +`Metric Trends` 区域由三层组成: + +| 层级 | 页面元素 | 职责 | +| --- | --- | --- | +| 顶部控制 | window selector、role filters | 控制趋势窗口和可见成员角色 | +| 聚合卡片 | `#metric_grid` 下的 `.metric_card` | 展示每个指标的最新值和聚合曲线 | +| 展开视图 | `#member_metric_sections` 下的 owner blocks | 展示选中指标的 per owner 曲线和成员行 | + +用户进入 KV 面板时先看到聚合卡片。点击一个指标卡片后,该指标会进入选中集合,并在下方生成一个 owner drilldown block;再次点击同一卡片会关闭该指标的展开视图。 + +## 2. 指标卡片 + +每个 `.metric_card` 展示三类信息: + +- 指标名,例如 `Node CPU`、`Node Memory`、`GPU Memory`。 +- 最新值。多线指标用 `主线 / 对比线 / 附加线` 的顺序展示。 +- 一张 sparkline 折线图。 + +当前卡片按以下语义渲染: + +| 指标 | 曲线要求 | +| --- | --- | +| `Node CPU` | `Used`、`Capacity`、`Process CPU` 三条线 | +| `Node Memory` | `Used`、`Total`、`Process RSS` 三条线 | +| `Segment Usage` | `Used`、`Capacity` 两条线 | +| `GPU Memory` | `Used`、`Total` 两条线 | +| `Process Network` | `TX`、`RX` 两条线 | +| `Node Network` | `TX`、`RX` 两条线 | +| `Cache Hit %` | 一条命中率曲线,选择后可看 per owner 命中率 | +| 其他单值指标 | 一条主曲线 | + +CPU 指标按核堆叠展示,GPU 百分比指标按设备聚合展示;这类按资源实例求和的百分比聚合值都可以超过 `100%`。折线图的 Y 轴起点固定为 `0`,避免资源曲线因为局部波动被视觉放大。 + +## 3. 多线折线图 + +折线图由 `buildSparklineSvg(...)` 生成。输入统一归并为 `data-lines`: + +```text +primary series +comparison series +additional series... +``` + +渲染规则: + +- 第一条线是主线。 +- comparison line 用于容量、总量或反方向指标。 +- additional line 用于同图补充进程资源,例如 `Process CPU`、`Process RSS`。 +- 多线图必须显示 legend,legend 文案使用 `series_label`。 +- 没有有效 series 时显示 `N/A`,不生成空白 SVG。 + +这个规则保证 `Node Memory` 这类指标能在一张图里同时看节点用量、节点总量和 Fluxon 进程 RSS。 + +## 4. Hover 交互 + +鼠标悬浮在折线图上时,UI 必须显示: + +- 垂直 hover 辅助线:`.metric_chart_hover_line` +- 每条曲线的对齐辅助点:`.metric_chart_hover_point_ring` 和 `.metric_chart_hover_point` +- tooltip:时间戳和当前 x 位置上每条曲线的格式化数值 + +辅助点和 tooltip 都从同一份 `data-lines` 取值。这样点位、颜色、legend 和 tooltip 数值保持一致。 + +离开图表时,tooltip 和所有辅助点隐藏。 + +## 5. 多选展开 + +`Metric Trends` 支持同时展开多个指标。状态保存在: + +```text +selectedMetricKeys: string[] +``` + +交互规则: + +- 点击未选中的 metric card:加入 `selectedMetricKeys`,创建对应 owner drilldown block。 +- 点击已选中的 metric card:从 `selectedMetricKeys` 删除,同时删除该指标的 owner 展开状态。 +- 多个选中指标按 `selectedMetricKeys` 顺序逐个渲染,不互相覆盖。 + +这意味着用户可以同时查看 `Node CPU`、`Node Memory`、`Cache Hit %` 等多个指标的 per owner 视图。 + +## 6. Owner Drilldown + +每个选中指标对应一个 owner drilldown block。block 里每个 owner 用 `
` 渲染。 + +owner card 的内容: + +- owner id 和 node key。 +- owner 汇总最新值。 +- owner 汇总折线图。 +- 展开后的成员行。 + +owner 展开状态按指标分别保存: + +```text +expandedOwnersByMetric[metric_key] = [owner_id, ...] +``` + +因此,同一个 owner 在 `Node CPU` 里展开,不会强制影响 `Node Memory` 里的展开状态。 + +## 7. 刷新稳定性 + +`Metric Trends` 会随页面周期刷新。刷新时必须满足: + +- 不清空整个 `#metric_grid` 后重建。 +- 不清空整个 owner drilldown section 后重建。 +- 已存在卡片按 `data-patch-key` 复用 DOM。 +- 已展开的 owner 继续保持展开。 +- 初次加载后不反复写回 `Loading metric panel...`,避免高度跳动。 + +当前实现用 `patchChildrenByKey(...)` 复用 metric card 和 owner card。刷新只更新必要 HTML,保留卡片节点本身。 + +## 8. 关键结论 + +- `Metric Trends` 的核心 contract 是“多线趋势 + 多选 owner drilldown + 稳定刷新”。 +- `Node CPU`、`Node Memory`、`Segment Usage`、`GPU Memory`、网络指标必须保持多线展示;CPU/GPU 这类资源实例聚合百分比允许超过 `100%`。 +- hover 辅助点是趋势图可读性的一部分,不能只显示 tooltip。 +- 多选展开状态和 owner 展开状态必须分别持久化,避免用户刷新或轮询后丢失上下文。 diff --git a/fluxon_doc_cn/roadmap.md b/fluxon_doc_cn/roadmap.md index ddbd735..6af4d6e 100644 --- a/fluxon_doc_cn/roadmap.md +++ b/fluxon_doc_cn/roadmap.md @@ -11,6 +11,7 @@ ### 0.2.1 - [PERF] 优化 `RPC`、`KV`、`FS` 性能 +- [TOOL] KV 监控面板支持 GPU 资源列和趋势曲线 - [MQ] 修复 `MQ` 控制面可扩展性问题 - [ETCD] 修复 `etcd` 前缀获取时 `gRPC` 限制大小问题 - [OSS] 完善开源相关工作 diff --git "a/fluxon_doc_cn/user_doc/\347\224\250\346\210\267 - 3 - KV-RPC\346\216\245\345\217\243.md" "b/fluxon_doc_cn/user_doc/\347\224\250\346\210\267 - 3 - KV-RPC\346\216\245\345\217\243.md" index 9a8c8e1..fea34dc 100644 --- "a/fluxon_doc_cn/user_doc/\347\224\250\346\210\267 - 3 - KV-RPC\346\216\245\345\217\243.md" +++ "b/fluxon_doc_cn/user_doc/\347\224\250\346\210\267 - 3 - KV-RPC\346\216\245\345\217\243.md" @@ -200,6 +200,29 @@ master_ui: http://:18080/view?cluster_name=demo-kv-cluster&member_kind=kv ``` +KV Web UI 会直接展示 KV 成员表和 `Metric Trends` 曲线面板。成员表的 `gpu` 列展示每个采样节点上 GPU 的显存、利用率、温度和 GPU 进程摘要;`Metric Trends` 里可以查看 `GPU Memory Used`、`GPU Memory Total`、`GPU Util %`、`GPU Temp`、`GPU Proc Count`、`GPU Proc SM %` 和 `GPU Proc Mem %` 的可视化曲线。 + +GPU 监控使用 Fluxon 的标准监控链路: + +```text +owner/master 系统指标采样 + -> nvidia-smi + -> Prometheus remote-write + -> Greptime / Prometheus query API + -> KV Web UI +``` + +使用 GPU 曲线需要满足这些前置条件: + +| 项 | 要求 | +| --- | --- | +| GPU 可见性 | 采样进程所在机器能执行 `nvidia-smi`,并能看到目标 GPU。 | +| 监控写入 | `monitoring.prom_remote_write_url` 指向可写的 Greptime/Prometheus remote-write 接口。 | +| 监控查询 | `monitoring.prometheus_base_url` 指向可查询的 Prometheus API,例如 Greptime 的 `/v1/prometheus`。 | +| 采样范围 | 当前 GPU 指标跟随系统指标采样角色,覆盖 master 和 owner/client 进程可见的节点资源;external client 不重复采样系统资源。 | + +如果机器没有 GPU、没有 `nvidia-smi`,或进程没有访问 GPU 的权限,KV 进程仍会继续运行;对应成员的 `gpu` 列显示 `N/A`,GPU 趋势卡片没有样本。 + `owner` 把共享内存池和 `shared.json` 准备好之后,再运行下面的业务最小示例。 ### 生命周期与调用流程(Call Flow) diff --git a/fluxon_doc_en/roadmap.md b/fluxon_doc_en/roadmap.md index d2f79c6..e23f233 100644 --- a/fluxon_doc_en/roadmap.md +++ b/fluxon_doc_en/roadmap.md @@ -11,6 +11,7 @@ ### 0.2.1 - [PERF] Optimize `RPC`, `KV`, and `FS` performance +- [TOOL] Add GPU resource columns and trend curves to the KV monitoring panel - [MQ] Fix MQ control-plane scalability issues - [ETCD] Fix the gRPC size limit issue when listing etcd prefixes - [OSS] Improve open-source readiness and related workflows diff --git a/fluxon_doc_en/user_doc/User - 3 - KV and RPC Interface.md b/fluxon_doc_en/user_doc/User - 3 - KV and RPC Interface.md index b3f003d..65b6213 100644 --- a/fluxon_doc_en/user_doc/User - 3 - KV and RPC Interface.md +++ b/fluxon_doc_en/user_doc/User - 3 - KV and RPC Interface.md @@ -160,6 +160,35 @@ python3 examples/start_master_owner.py python3 examples/start_master_owner.py --without-master ``` +The `master_ui` block starts the KV Web UI from the master process. With the example above, open: + +```text +http://:18080/view?cluster_name=demo-kv-cluster&member_kind=kv +``` + +The KV Web UI shows the KV member table and the `Metric Trends` chart panel. The table's `gpu` column summarizes GPU memory, utilization, temperature, and GPU process activity for each sampled node. `Metric Trends` can plot `GPU Memory Used`, `GPU Memory Total`, `GPU Util %`, `GPU Temp`, `GPU Proc Count`, `GPU Proc SM %`, and `GPU Proc Mem %`. + +GPU monitoring uses the standard Fluxon observability path: + +```text +owner/master system-metric sampling + -> nvidia-smi + -> Prometheus remote-write + -> Greptime / Prometheus query API + -> KV Web UI +``` + +GPU curves require: + +| Item | Requirement | +| --- | --- | +| GPU visibility | The sampling process can run `nvidia-smi` and can see the target GPUs. | +| Metric write path | `monitoring.prom_remote_write_url` points to a writable Greptime/Prometheus remote-write endpoint. | +| Metric query path | `monitoring.prometheus_base_url` points to a queryable Prometheus API, for example Greptime's `/v1/prometheus`. | +| Sampling scope | GPU metrics follow the system-metric sampling roles: master and owner/client processes report visible node resources; external clients do not duplicate system-resource sampling. | + +If a machine has no GPU, lacks `nvidia-smi`, or the process cannot access the GPU, the KV process keeps running. The member table shows `N/A` for `gpu`, and GPU trend cards have no samples. + ### Lifecycle and Call Flow ```text diff --git a/fluxon_rs/fluxon_cli/src/lib.rs b/fluxon_rs/fluxon_cli/src/lib.rs index 11c9fcd..2adb85d 100644 --- a/fluxon_rs/fluxon_cli/src/lib.rs +++ b/fluxon_rs/fluxon_cli/src/lib.rs @@ -30,7 +30,7 @@ pub const OPS_PANEL_SERVICE_NAME: &str = "ops"; use crate::config::{AVAILABLE_MEMBER_KINDS, MemberKind, MonitorConfig}; use crate::model::{ - ClusterMember, ClusterSnapshot, ClustersResponse, MemberRdmaDeviceSnapshot, + ClusterMember, ClusterSnapshot, ClustersResponse, GpuSnapshot, MemberRdmaDeviceSnapshot, MemberRdmaPortSnapshot, MemberSnapshot, NodeSnapshot, RdmaNetdevRateSnapshot, TransferEngineEdge, }; @@ -1359,6 +1359,7 @@ async fn build_fs_cluster_snapshot( node_memory_total_bytes: None, container_memory_usage_bytes: None, container_memory_limit_bytes: None, + gpus: Vec::new(), process_resident_memory_bytes: None, process_cpu_usage_percent: None, tokio_num_workers: None, @@ -1864,6 +1865,38 @@ pub async fn build_cluster_snapshot_with_prom_query_time( let seg_used_bytes_by_node = sum_segment_bytes_by_node(&prom_maps.seg_used_bytes_by_node_device); + fn gpu_snapshots_for_member( + prom_maps: &crate::prom::PromSnapshotMaps, + member_id: &str, + ) -> Vec { + let mut out = Vec::new(); + for ((node, index, name), gpu) in &prom_maps.gpu_by_node_index_name { + if node != member_id { + continue; + } + out.push(GpuSnapshot { + index: index.clone(), + name: name.clone(), + memory_used_bytes: gpu.memory_used_bytes, + memory_total_bytes: gpu.memory_total_bytes, + utilization_percent: gpu.utilization_percent, + temperature_celsius: gpu.temperature_celsius, + process_count: gpu.process_count, + process_sm_utilization_percent: gpu.process_sm_utilization_percent, + process_memory_utilization_percent: gpu.process_memory_utilization_percent, + }); + } + out.sort_by(|a, b| { + let ai = a.index.parse::(); + let bi = b.index.parse::(); + match (ai, bi) { + (Ok(ai), Ok(bi)) => ai.cmp(&bi), + _ => a.index.cmp(&b.index), + } + }); + out + } + async fn prom_scalar_best_effort( warnings: &mut Vec, prom: &PromClient, @@ -2068,6 +2101,7 @@ pub async fn build_cluster_snapshot_with_prom_query_time( .container_memory_limit_bytes .get(&member_id) .copied(), + gpus: gpu_snapshots_for_member(&prom_maps, &member_id), process_resident_memory_bytes: prom_maps .process_resident_memory_bytes .get(&member_id) diff --git a/fluxon_rs/fluxon_cli/src/model.rs b/fluxon_rs/fluxon_cli/src/model.rs index 4b9e14a..934a09f 100644 --- a/fluxon_rs/fluxon_cli/src/model.rs +++ b/fluxon_rs/fluxon_cli/src/model.rs @@ -56,6 +56,26 @@ pub struct RdmaNetdevRateSnapshot { pub rx_mbps: Option, } +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct GpuSnapshot { + pub index: String, + pub name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub memory_used_bytes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub memory_total_bytes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub utilization_percent: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub temperature_celsius: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub process_count: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub process_sm_utilization_percent: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub process_memory_utilization_percent: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct KvTopologyOwnerExternalMaxSnapshot { pub owner_id: String, @@ -379,6 +399,79 @@ fn fmt_bytes_per_sec_from_mbps(v_mbps: Option) -> (String, UiPillStatus) { fmt_bytes_auto(Some(bytes_per_sec), true) } +fn fmt_bytes_gib_short(v: Option) -> String { + match v { + Some(bytes) => format!("{:.1}G", bytes / 1024.0 / 1024.0 / 1024.0), + None => "-".to_string(), + } +} + +fn fmt_percent_short(v: Option) -> String { + match v { + Some(v) => format!("{:.0}%", v), + None => "-".to_string(), + } +} + +fn fmt_temp_short(v: Option) -> String { + match v { + Some(v) => format!("{:.0}C", v), + None => "-".to_string(), + } +} + +fn fmt_count_short(v: Option) -> String { + match v { + Some(v) => format!("{:.0}", v), + None => "-".to_string(), + } +} + +fn render_gpu_summary(gpus: &[GpuSnapshot]) -> String { + if gpus.is_empty() { + return "N/A".to_string(); + } + + let mut rows: Vec<&GpuSnapshot> = gpus.iter().collect(); + rows.sort_by(|a, b| { + let ai = a.index.parse::(); + let bi = b.index.parse::(); + match (ai, bi) { + (Ok(ai), Ok(bi)) => ai.cmp(&bi), + _ => a.index.cmp(&b.index), + } + }); + + rows.into_iter() + .map(|g| { + format!( + "{}: {}/{} util={} temp={} p={} sm={} pmem={}", + g.index, + fmt_bytes_gib_short(g.memory_used_bytes), + fmt_bytes_gib_short(g.memory_total_bytes), + fmt_percent_short(g.utilization_percent), + fmt_temp_short(g.temperature_celsius), + fmt_count_short(g.process_count), + fmt_percent_short(g.process_sm_utilization_percent), + fmt_percent_short(g.process_memory_utilization_percent), + ) + }) + .collect::>() + .join(" | ") +} + +fn max_gpu_memory_used(gpus: &[GpuSnapshot]) -> Option { + gpus.iter() + .filter_map(|g| g.memory_used_bytes) + .fold(None, |acc, v| Some(acc.map(|a| a.max(v)).unwrap_or(v))) +} + +fn max_gpu_utilization(gpus: &[GpuSnapshot]) -> Option { + gpus.iter() + .filter_map(|g| g.utilization_percent) + .fold(None, |acc, v| Some(acc.map(|a| a.max(v)).unwrap_or(v))) +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct NetworkConfigSnapshot { pub subnet_whitelist: Vec, @@ -528,6 +621,7 @@ pub struct MemberSnapshot { pub node_memory_total_bytes: Option, pub container_memory_usage_bytes: Option, pub container_memory_limit_bytes: Option, + pub gpus: Vec, pub process_resident_memory_bytes: Option, pub process_cpu_usage_percent: Option, pub tokio_num_workers: Option, @@ -1531,6 +1625,9 @@ pub struct MemberTableRowView { pub shared_mem_dir_text: String, pub p2p_listen_port_text: String, pub rdma_text: String, + pub gpu_text: String, + pub gpu_memory_used_sort: String, + pub gpu_utilization_sort: String, pub search_text: String, pub cpu_text: String, pub cpu_sort: String, @@ -1937,6 +2034,7 @@ pub fn build_member_table_rows(snapshot: &ClusterSnapshot) -> Vec Vec Vec, } +#[derive(Debug, Clone, Default)] +pub struct GpuPromSnapshot { + pub memory_used_bytes: Option, + pub memory_total_bytes: Option, + pub utilization_percent: Option, + pub temperature_celsius: Option, + pub process_count: Option, + pub process_sm_utilization_percent: Option, + pub process_memory_utilization_percent: Option, +} + pub struct PromSnapshotMaps { pub node_cpu_usage_percent: HashMap, pub node_cpu_logical_cores: HashMap, @@ -241,6 +256,7 @@ pub struct PromSnapshotMaps { pub node_memory_total_bytes: HashMap, pub container_memory_usage_bytes: HashMap, pub container_memory_limit_bytes: HashMap, + pub gpu_by_node_index_name: HashMap<(String, String, String), GpuPromSnapshot>, pub process_resident_memory_bytes: HashMap, pub process_cpu_usage_percent: HashMap, pub tokio_num_workers: HashMap, @@ -291,6 +307,7 @@ impl PromSnapshotMaps { node_memory_total_bytes: HashMap::new(), container_memory_usage_bytes: HashMap::new(), container_memory_limit_bytes: HashMap::new(), + gpu_by_node_index_name: HashMap::new(), process_resident_memory_bytes: HashMap::new(), process_cpu_usage_percent: HashMap::new(), tokio_num_workers: HashMap::new(), @@ -459,6 +476,39 @@ fn take_node_device_metric(samples: &[PromSample]) -> HashMap<(String, String), out } +fn take_gpu_metric(samples: &[PromSample]) -> HashMap<(String, String, String), f64> { + let mut out = HashMap::new(); + for s in samples { + let Some(node) = s.metric.get(PROM_LABEL_NODE) else { + continue; + }; + let Some(gpu_index) = s.metric.get(PROM_LABEL_GPU_INDEX) else { + continue; + }; + let Some(gpu_name) = s.metric.get(PROM_LABEL_GPU_NAME) else { + continue; + }; + let Some(v) = s.value_f64() else { + continue; + }; + out.insert((node.clone(), gpu_index.clone(), gpu_name.clone()), v); + } + out +} + +fn merge_gpu_metric( + dst: &mut HashMap<(String, String, String), GpuPromSnapshot>, + src: HashMap<(String, String, String), f64>, + mut set: F, +) where + F: FnMut(&mut GpuPromSnapshot, f64), +{ + for (key, value) in src { + let gpu = dst.entry(key).or_default(); + set(gpu, value); + } +} + pub fn role_from_member_metadata(meta: &BTreeMap) -> MemberRole { if meta.get("master").map(|v| v == "true").unwrap_or(false) { return MemberRole::Master; @@ -1090,6 +1140,97 @@ pub async fn collect_prom_snapshot( ) .await, ); + merge_gpu_metric( + &mut out.gpu_by_node_index_name, + take_gpu_metric( + &q( + prom, + warnings, + PROM_METRIC_GPU_MEMORY_USED_BYTES, + PROM_METRIC_GPU_MEMORY_USED_BYTES, + ) + .await, + ), + |gpu, v| gpu.memory_used_bytes = Some(v), + ); + merge_gpu_metric( + &mut out.gpu_by_node_index_name, + take_gpu_metric( + &q( + prom, + warnings, + PROM_METRIC_GPU_MEMORY_TOTAL_BYTES, + PROM_METRIC_GPU_MEMORY_TOTAL_BYTES, + ) + .await, + ), + |gpu, v| gpu.memory_total_bytes = Some(v), + ); + merge_gpu_metric( + &mut out.gpu_by_node_index_name, + take_gpu_metric( + &q( + prom, + warnings, + PROM_METRIC_GPU_UTILIZATION_PERCENT, + PROM_METRIC_GPU_UTILIZATION_PERCENT, + ) + .await, + ), + |gpu, v| gpu.utilization_percent = Some(v), + ); + merge_gpu_metric( + &mut out.gpu_by_node_index_name, + take_gpu_metric( + &q( + prom, + warnings, + PROM_METRIC_GPU_TEMPERATURE_CELSIUS, + PROM_METRIC_GPU_TEMPERATURE_CELSIUS, + ) + .await, + ), + |gpu, v| gpu.temperature_celsius = Some(v), + ); + merge_gpu_metric( + &mut out.gpu_by_node_index_name, + take_gpu_metric( + &q( + prom, + warnings, + PROM_METRIC_GPU_PROCESS_COUNT, + PROM_METRIC_GPU_PROCESS_COUNT, + ) + .await, + ), + |gpu, v| gpu.process_count = Some(v), + ); + merge_gpu_metric( + &mut out.gpu_by_node_index_name, + take_gpu_metric( + &q( + prom, + warnings, + PROM_METRIC_GPU_PROCESS_SM_UTILIZATION_PERCENT, + PROM_METRIC_GPU_PROCESS_SM_UTILIZATION_PERCENT, + ) + .await, + ), + |gpu, v| gpu.process_sm_utilization_percent = Some(v), + ); + merge_gpu_metric( + &mut out.gpu_by_node_index_name, + take_gpu_metric( + &q( + prom, + warnings, + PROM_METRIC_GPU_PROCESS_MEMORY_UTILIZATION_PERCENT, + PROM_METRIC_GPU_PROCESS_MEMORY_UTILIZATION_PERCENT, + ) + .await, + ), + |gpu, v| gpu.process_memory_utilization_percent = Some(v), + ); out.process_resident_memory_bytes = take_node_metric( &q( prom, @@ -1168,7 +1309,7 @@ pub async fn collect_prom_snapshot( prom, warnings, "process_network_tx_mbps", - "sum by (node) (rate(client_network_bytes_total{direction=\"tx\"}[2m])) * 8 / 1000000", + &format!("{PROM_METRIC_CLIENT_NETWORK_MBPS}{{direction=\"tx\"}}"), ) .await, ); @@ -1177,7 +1318,7 @@ pub async fn collect_prom_snapshot( prom, warnings, "process_network_rx_mbps", - "sum by (node) (rate(client_network_bytes_total{direction=\"rx\"}[2m])) * 8 / 1000000", + &format!("{PROM_METRIC_CLIENT_NETWORK_MBPS}{{direction=\"rx\"}}"), ) .await, ); diff --git a/fluxon_rs/fluxon_cli/src/server.rs b/fluxon_rs/fluxon_cli/src/server.rs index ed6baec..e1dee90 100644 --- a/fluxon_rs/fluxon_cli/src/server.rs +++ b/fluxon_rs/fluxon_cli/src/server.rs @@ -19,11 +19,13 @@ use hyper::Uri; use hyper::client::HttpConnector; use hyper_rustls::HttpsConnectorBuilder; use serde::Serialize; +use std::collections::{BTreeMap, BTreeSet}; use std::net::SocketAddr; use std::pin::Pin; use std::sync::Arc; use tokio::sync::{RwLock, watch}; +use fluxon_observability::keys::PROM_METRIC_CLIENT_NETWORK_MBPS; use fluxon_util::{ FluxonCliProxyDescriptorV2, FluxonCliProxyTransportV2, fluxon_cli_proxy_desc_etcd_key_v2, fluxon_cli_proxy_desc_etcd_service_prefix_v2, @@ -245,6 +247,8 @@ const FLUXON_CLI_AUTO_REFRESH_TOOL_JS: &str = r#" if (inFlightRef.inFlight) return; inFlightRef.inFlight = true; const state = hooks.captureState(); + const scrollX = window.scrollX; + const scrollY = window.scrollY; try { const resp = await fetch(cfg.url, { cache: 'no-store' }); if (!resp.ok) { @@ -262,6 +266,7 @@ const FLUXON_CLI_AUTO_REFRESH_TOOL_JS: &str = r#" curApp.innerHTML = nextApp.innerHTML; hooks.restoreState(state); hooks.afterReplace(); + window.scrollTo(scrollX, scrollY); } catch (e) { console.warn('auto_refresh: refresh failed:', e); } finally { @@ -479,16 +484,30 @@ struct KvMetricPanelResponse { #[serde(rename_all = "snake_case")] struct KvMetricMembersResponse { metric: KvMetricMetaWire, + comparison_metric: Option, + additional_metrics: Vec, range: KvMetricRangeWire, members: Vec, warnings: Vec, } +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "snake_case")] +struct KvMetricOwnersResponse { + metric: KvMetricMetaWire, + comparison_metric: Option, + additional_metrics: Vec, + range: KvMetricRangeWire, + owners: Vec, + warnings: Vec, +} + #[derive(Debug, Clone, Serialize)] #[serde(rename_all = "snake_case")] struct KvMetricMetaWire { key: String, label: String, + series_label: String, unit: String, aggregate: String, } @@ -504,8 +523,20 @@ struct KvMetricRangeWire { #[serde(rename_all = "snake_case")] struct KvAggregateMetricCardWire { metric: KvMetricMetaWire, + comparison_metric: Option, latest: Option, aggregate_series: Vec<(f64, f64)>, + comparison_latest: Option, + comparison_series: Vec<(f64, f64)>, + additional_series: Vec, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "snake_case")] +struct KvMetricAdditionalSeriesWire { + metric: KvMetricMetaWire, + latest: Option, + series: Vec<(f64, f64)>, } #[derive(Debug, Clone, Serialize)] @@ -516,12 +547,29 @@ struct KvMemberSeriesWire { node_key: String, latest: Option, series: Vec<(f64, f64)>, + comparison_latest: Option, + comparison_series: Vec<(f64, f64)>, + additional_series: Vec, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "snake_case")] +struct KvOwnerSeriesWire { + owner_id: String, + node_key: String, + latest: Option, + series: Vec<(f64, f64)>, + comparison_latest: Option, + comparison_series: Vec<(f64, f64)>, + additional_series: Vec, + members: Vec, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum KvMetricAggregate { Sum, Max, + Mean, } impl KvMetricAggregate { @@ -529,6 +577,7 @@ impl KvMetricAggregate { match self { Self::Sum => "sum", Self::Max => "max", + Self::Mean => "mean", } } } @@ -537,93 +586,427 @@ impl KvMetricAggregate { enum KvMetricValueField { PutRps, GetRps, + GetCacheHitRatePercent, PutBps, GetBps, + ProcessCpuUsagePercent, + NodeCpuUsagePercent, + NodeCpuCapacityPercent, + ProcessNetworkTxMbps, + ProcessNetworkRxMbps, + NodeMemoryUsageBytes, + NodeMemoryTotalBytes, + NodeNetworkTxMbps, + NodeNetworkRxMbps, ProcessRss, + GpuMemoryUsed, + GpuMemoryTotal, + GpuUtilizationPercent, + GpuTemperatureCelsius, + GpuProcessCount, + GpuProcessSmUtilizationPercent, + GpuProcessMemoryUtilizationPercent, SegUsedBytes, + SegCapacityBytes, TokioGlobalQueueDepth, TokioBusyPercent, } +#[derive(Debug, Clone, Copy)] +struct KvMetricSeriesSpec { + key: &'static str, + label: &'static str, + series_label: &'static str, + unit: &'static str, + aggregate: KvMetricAggregate, + field: KvMetricValueField, + roles: Option<&'static [MemberRole]>, +} + +impl KvMetricSeriesSpec { + const fn new( + key: &'static str, + label: &'static str, + series_label: &'static str, + unit: &'static str, + aggregate: KvMetricAggregate, + field: KvMetricValueField, + ) -> Self { + Self { + key, + label, + series_label, + unit, + aggregate, + field, + roles: None, + } + } + + const fn new_with_roles( + key: &'static str, + label: &'static str, + series_label: &'static str, + unit: &'static str, + aggregate: KvMetricAggregate, + field: KvMetricValueField, + roles: &'static [MemberRole], + ) -> Self { + Self { + key, + label, + series_label, + unit, + aggregate, + field, + roles: Some(roles), + } + } +} + #[derive(Debug, Clone, Copy)] struct KvMetricSpec { key: &'static str, label: &'static str, + series_label: &'static str, unit: &'static str, aggregate: KvMetricAggregate, field: KvMetricValueField, roles: &'static [MemberRole], + comparison: Option, + additional: &'static [KvMetricSeriesSpec], +} + +impl KvMetricSpec { + const fn single( + key: &'static str, + label: &'static str, + unit: &'static str, + aggregate: KvMetricAggregate, + field: KvMetricValueField, + roles: &'static [MemberRole], + ) -> Self { + Self::single_with_series_label(key, label, label, unit, aggregate, field, roles) + } + + const fn single_with_series_label( + key: &'static str, + label: &'static str, + series_label: &'static str, + unit: &'static str, + aggregate: KvMetricAggregate, + field: KvMetricValueField, + roles: &'static [MemberRole], + ) -> Self { + Self { + key, + label, + series_label, + unit, + aggregate, + field, + roles, + comparison: None, + additional: &[], + } + } + + const fn paired( + primary: KvMetricSeriesSpec, + comparison: KvMetricSeriesSpec, + roles: &'static [MemberRole], + ) -> Self { + Self { + key: primary.key, + label: primary.label, + series_label: primary.series_label, + unit: primary.unit, + aggregate: primary.aggregate, + field: primary.field, + roles, + comparison: Some(comparison), + additional: &[], + } + } + + const fn paired_with_additional( + primary: KvMetricSeriesSpec, + comparison: KvMetricSeriesSpec, + additional: &'static [KvMetricSeriesSpec], + roles: &'static [MemberRole], + ) -> Self { + Self { + key: primary.key, + label: primary.label, + series_label: primary.series_label, + unit: primary.unit, + aggregate: primary.aggregate, + field: primary.field, + roles, + comparison: Some(comparison), + additional, + } + } } const KV_METRIC_OWNER_AND_EXTERNAL_ROLES: &[MemberRole] = &[MemberRole::OwnerClient, MemberRole::ExternalClient]; +const KV_METRIC_OWNER_AND_EXTERNAL_LIKE_ROLES: &[MemberRole] = &[ + MemberRole::OwnerClient, + MemberRole::ExternalClient, + MemberRole::SideTransferWorker, +]; +const KV_METRIC_SYSTEM_ROLES: &[MemberRole] = &[MemberRole::Master, MemberRole::OwnerClient]; const KV_METRIC_OWNER_ONLY_ROLES: &[MemberRole] = &[MemberRole::OwnerClient]; +const KV_NODE_MEMORY_ADDITIONAL_SPECS: &[KvMetricSeriesSpec] = + &[KvMetricSeriesSpec::new_with_roles( + "process_rss", + "Process RSS", + "Process RSS", + "bytes", + KvMetricAggregate::Sum, + KvMetricValueField::ProcessRss, + KV_METRIC_OWNER_AND_EXTERNAL_LIKE_ROLES, + )]; +const KV_NODE_CPU_ADDITIONAL_SPECS: &[KvMetricSeriesSpec] = &[KvMetricSeriesSpec::new_with_roles( + "process_cpu_usage_percent", + "Process CPU", + "Process CPU", + "percent", + KvMetricAggregate::Sum, + KvMetricValueField::ProcessCpuUsagePercent, + KV_METRIC_OWNER_AND_EXTERNAL_LIKE_ROLES, +)]; const KV_METRIC_SPECS: &[KvMetricSpec] = &[ - KvMetricSpec { - key: "put_rps", - label: "Put RPS", - unit: "rps", - aggregate: KvMetricAggregate::Sum, - field: KvMetricValueField::PutRps, - roles: KV_METRIC_OWNER_AND_EXTERNAL_ROLES, - }, - KvMetricSpec { - key: "get_rps", - label: "Get RPS", - unit: "rps", - aggregate: KvMetricAggregate::Sum, - field: KvMetricValueField::GetRps, - roles: KV_METRIC_OWNER_AND_EXTERNAL_ROLES, - }, - KvMetricSpec { - key: "put_bps", - label: "Put B/s", - unit: "B/s", - aggregate: KvMetricAggregate::Sum, - field: KvMetricValueField::PutBps, - roles: KV_METRIC_OWNER_AND_EXTERNAL_ROLES, - }, - KvMetricSpec { - key: "get_bps", - label: "Get B/s", - unit: "B/s", - aggregate: KvMetricAggregate::Sum, - field: KvMetricValueField::GetBps, - roles: KV_METRIC_OWNER_AND_EXTERNAL_ROLES, - }, - KvMetricSpec { - key: "process_rss", - label: "Process RSS", - unit: "bytes", - aggregate: KvMetricAggregate::Sum, - field: KvMetricValueField::ProcessRss, - roles: KV_METRIC_OWNER_AND_EXTERNAL_ROLES, - }, - KvMetricSpec { - key: "seg_used_bytes", - label: "Segment Used", - unit: "bytes", - aggregate: KvMetricAggregate::Sum, - field: KvMetricValueField::SegUsedBytes, - roles: KV_METRIC_OWNER_ONLY_ROLES, - }, - KvMetricSpec { - key: "tokio_global_queue_depth", - label: "Tokio Queue Depth", - unit: "count", - aggregate: KvMetricAggregate::Sum, - field: KvMetricValueField::TokioGlobalQueueDepth, - roles: KV_METRIC_OWNER_ONLY_ROLES, - }, - KvMetricSpec { - key: "tokio_busy_percent", - label: "Tokio Busy %", - unit: "percent", - aggregate: KvMetricAggregate::Max, - field: KvMetricValueField::TokioBusyPercent, - roles: KV_METRIC_OWNER_ONLY_ROLES, - }, + KvMetricSpec::single( + "put_rps", + "Put RPS", + "rps", + KvMetricAggregate::Sum, + KvMetricValueField::PutRps, + KV_METRIC_OWNER_AND_EXTERNAL_ROLES, + ), + KvMetricSpec::single( + "get_rps", + "Get RPS", + "rps", + KvMetricAggregate::Sum, + KvMetricValueField::GetRps, + KV_METRIC_OWNER_AND_EXTERNAL_ROLES, + ), + KvMetricSpec::single( + "get_cache_hit_rate_percent", + "Cache Hit %", + "percent", + KvMetricAggregate::Mean, + KvMetricValueField::GetCacheHitRatePercent, + KV_METRIC_OWNER_ONLY_ROLES, + ), + KvMetricSpec::single( + "put_bps", + "Put B/s", + "B/s", + KvMetricAggregate::Sum, + KvMetricValueField::PutBps, + KV_METRIC_OWNER_AND_EXTERNAL_ROLES, + ), + KvMetricSpec::single( + "get_bps", + "Get B/s", + "B/s", + KvMetricAggregate::Sum, + KvMetricValueField::GetBps, + KV_METRIC_OWNER_AND_EXTERNAL_ROLES, + ), + KvMetricSpec::single( + "process_cpu_usage_percent", + "CPU Util %", + "percent", + KvMetricAggregate::Sum, + KvMetricValueField::ProcessCpuUsagePercent, + KV_METRIC_OWNER_AND_EXTERNAL_LIKE_ROLES, + ), + KvMetricSpec::paired_with_additional( + KvMetricSeriesSpec::new( + "node_cpu_usage_percent", + "Node CPU", + "Used", + "percent", + KvMetricAggregate::Sum, + KvMetricValueField::NodeCpuUsagePercent, + ), + KvMetricSeriesSpec::new( + "node_cpu_capacity_percent", + "Capacity", + "Capacity", + "percent", + KvMetricAggregate::Sum, + KvMetricValueField::NodeCpuCapacityPercent, + ), + KV_NODE_CPU_ADDITIONAL_SPECS, + KV_METRIC_OWNER_ONLY_ROLES, + ), + KvMetricSpec::paired( + KvMetricSeriesSpec::new( + "process_network_tx_mbps", + "Process Network", + "TX", + "mbps", + KvMetricAggregate::Sum, + KvMetricValueField::ProcessNetworkTxMbps, + ), + KvMetricSeriesSpec::new( + "process_network_rx_mbps", + "RX", + "RX", + "mbps", + KvMetricAggregate::Sum, + KvMetricValueField::ProcessNetworkRxMbps, + ), + KV_METRIC_OWNER_AND_EXTERNAL_LIKE_ROLES, + ), + KvMetricSpec::paired_with_additional( + KvMetricSeriesSpec::new( + "node_memory_usage_bytes", + "Node Memory", + "Used", + "bytes", + KvMetricAggregate::Sum, + KvMetricValueField::NodeMemoryUsageBytes, + ), + KvMetricSeriesSpec::new( + "node_memory_total_bytes", + "Total", + "Total", + "bytes", + KvMetricAggregate::Sum, + KvMetricValueField::NodeMemoryTotalBytes, + ), + KV_NODE_MEMORY_ADDITIONAL_SPECS, + KV_METRIC_OWNER_ONLY_ROLES, + ), + KvMetricSpec::paired( + KvMetricSeriesSpec::new( + "node_network_tx_mbps", + "Node Network", + "TX", + "mbps", + KvMetricAggregate::Sum, + KvMetricValueField::NodeNetworkTxMbps, + ), + KvMetricSeriesSpec::new( + "node_network_rx_mbps", + "RX", + "RX", + "mbps", + KvMetricAggregate::Sum, + KvMetricValueField::NodeNetworkRxMbps, + ), + KV_METRIC_OWNER_ONLY_ROLES, + ), + KvMetricSpec::single( + "process_rss", + "Process RSS", + "bytes", + KvMetricAggregate::Sum, + KvMetricValueField::ProcessRss, + KV_METRIC_OWNER_AND_EXTERNAL_LIKE_ROLES, + ), + KvMetricSpec::paired( + KvMetricSeriesSpec::new( + "seg_used_bytes", + "Segment Usage", + "Used", + "bytes", + KvMetricAggregate::Sum, + KvMetricValueField::SegUsedBytes, + ), + KvMetricSeriesSpec::new( + "seg_capacity_bytes", + "Capacity", + "Capacity", + "bytes", + KvMetricAggregate::Sum, + KvMetricValueField::SegCapacityBytes, + ), + KV_METRIC_OWNER_ONLY_ROLES, + ), + KvMetricSpec::paired( + KvMetricSeriesSpec::new( + "gpu_memory_used", + "GPU Memory", + "Used", + "bytes", + KvMetricAggregate::Sum, + KvMetricValueField::GpuMemoryUsed, + ), + KvMetricSeriesSpec::new( + "gpu_memory_total", + "Total", + "Total", + "bytes", + KvMetricAggregate::Sum, + KvMetricValueField::GpuMemoryTotal, + ), + KV_METRIC_SYSTEM_ROLES, + ), + KvMetricSpec::single( + "gpu_utilization_percent", + "GPU Util %", + "percent", + KvMetricAggregate::Sum, + KvMetricValueField::GpuUtilizationPercent, + KV_METRIC_SYSTEM_ROLES, + ), + KvMetricSpec::single( + "gpu_temperature_celsius", + "GPU Temp", + "celsius", + KvMetricAggregate::Max, + KvMetricValueField::GpuTemperatureCelsius, + KV_METRIC_SYSTEM_ROLES, + ), + KvMetricSpec::single( + "gpu_process_count", + "GPU Proc Count", + "count", + KvMetricAggregate::Sum, + KvMetricValueField::GpuProcessCount, + KV_METRIC_SYSTEM_ROLES, + ), + KvMetricSpec::single( + "gpu_process_sm_utilization_percent", + "GPU Proc SM %", + "percent", + KvMetricAggregate::Sum, + KvMetricValueField::GpuProcessSmUtilizationPercent, + KV_METRIC_SYSTEM_ROLES, + ), + KvMetricSpec::single( + "gpu_process_memory_utilization_percent", + "GPU Proc Mem %", + "percent", + KvMetricAggregate::Sum, + KvMetricValueField::GpuProcessMemoryUtilizationPercent, + KV_METRIC_SYSTEM_ROLES, + ), + KvMetricSpec::single( + "tokio_global_queue_depth", + "Tokio Queue Depth", + "count", + KvMetricAggregate::Sum, + KvMetricValueField::TokioGlobalQueueDepth, + KV_METRIC_OWNER_ONLY_ROLES, + ), + KvMetricSpec::single( + "tokio_busy_percent", + "Tokio Busy %", + "percent", + KvMetricAggregate::Max, + KvMetricValueField::TokioBusyPercent, + KV_METRIC_OWNER_ONLY_ROLES, + ), ]; fn kv_metric_spec_by_key(key: &str) -> Option { @@ -634,11 +1017,34 @@ fn kv_metric_meta(spec: KvMetricSpec) -> KvMetricMetaWire { KvMetricMetaWire { key: spec.key.to_string(), label: spec.label.to_string(), + series_label: spec.series_label.to_string(), + unit: spec.unit.to_string(), + aggregate: spec.aggregate.as_str().to_string(), + } +} + +fn kv_metric_meta_from_series(spec: KvMetricSeriesSpec) -> KvMetricMetaWire { + KvMetricMetaWire { + key: spec.key.to_string(), + label: spec.label.to_string(), + series_label: spec.series_label.to_string(), unit: spec.unit.to_string(), aggregate: spec.aggregate.as_str().to_string(), } } +fn kv_metric_comparison_meta(spec: KvMetricSpec) -> Option { + spec.comparison.map(kv_metric_meta_from_series) +} + +fn kv_metric_additional_meta(spec: KvMetricSpec) -> Vec { + spec.additional + .iter() + .copied() + .map(kv_metric_meta_from_series) + .collect() +} + fn parse_kv_metric_window(raw: Option<&str>) -> Result<(String, f64, u64), String> { match raw.unwrap_or("15m") { "5m" => Ok(("5m".to_string(), 5.0 * 60.0, 5)), @@ -683,26 +1089,86 @@ fn parse_member_roles_list(raw: Option<&Vec>) -> Result String { - match spec.field { - KvMetricValueField::PutRps => format!( - "sum_over_time(kv_op_end_event{{node={member_id:?},op=\"put\",status=\"success\"}}[1s])" - ), +fn kv_metric_promql_for_field(field: KvMetricValueField, member_id: &str) -> String { + match field { + KvMetricValueField::PutRps => { + format!("sum(kv_op_end_event_rps{{node={member_id:?},op=\"put\",status=\"success\"}})") + } KvMetricValueField::GetRps => format!( - "sum_over_time(kv_op_end_event{{node={member_id:?},op=\"get\",status=~\"hit|success\"}}[1s])" + "sum(kv_op_end_event_rps{{node={member_id:?},op=\"get\",status=~\"hit|success\"}})" ), + KvMetricValueField::GetCacheHitRatePercent => { + format!("kv_get_cache_hit_rate_percent{{node={member_id:?}}}") + } KvMetricValueField::PutBps => format!( - "sum_over_time(kv_op_end_bytes{{node={member_id:?},op=\"put\",status=\"success\"}}[1s])" + "sum(kv_op_end_bytes_per_sec{{node={member_id:?},op=\"put\",status=\"success\"}})" ), KvMetricValueField::GetBps => format!( - "sum_over_time(kv_op_end_bytes{{node={member_id:?},op=\"get\",status=~\"hit|success\"}}[1s])" + "sum(kv_op_end_bytes_per_sec{{node={member_id:?},op=\"get\",status=~\"hit|success\"}})" ), + KvMetricValueField::ProcessCpuUsagePercent => { + format!("process_cpu_usage_percent{{node={member_id:?}}}") + } + KvMetricValueField::NodeCpuUsagePercent => { + format!( + "node_cpu_usage_percent{{node={member_id:?}}} * node_cpu_logical_cores{{node={member_id:?}}}" + ) + } + KvMetricValueField::NodeCpuCapacityPercent => { + format!("node_cpu_logical_cores{{node={member_id:?}}} * 100") + } + KvMetricValueField::ProcessNetworkTxMbps => { + format!("{PROM_METRIC_CLIENT_NETWORK_MBPS}{{node={member_id:?},direction=\"tx\"}}") + } + KvMetricValueField::ProcessNetworkRxMbps => { + format!("{PROM_METRIC_CLIENT_NETWORK_MBPS}{{node={member_id:?},direction=\"rx\"}}") + } + KvMetricValueField::NodeMemoryUsageBytes => { + format!("node_memory_usage_bytes{{node={member_id:?}}}") + } + KvMetricValueField::NodeMemoryTotalBytes => { + format!("node_memory_total_bytes{{node={member_id:?}}}") + } + KvMetricValueField::NodeNetworkTxMbps => { + format!( + "sum(rate(node_network_transmit_bytes_total{{node={member_id:?}}}[2m])) * 8 / 1000000" + ) + } + KvMetricValueField::NodeNetworkRxMbps => { + format!( + "sum(rate(node_network_receive_bytes_total{{node={member_id:?}}}[2m])) * 8 / 1000000" + ) + } KvMetricValueField::ProcessRss => { format!("process_resident_memory_bytes{{node={member_id:?}}}") } + KvMetricValueField::GpuMemoryUsed => { + format!("sum(gpu_memory_used_bytes{{node={member_id:?}}})") + } + KvMetricValueField::GpuMemoryTotal => { + format!("sum(gpu_memory_total_bytes{{node={member_id:?}}})") + } + KvMetricValueField::GpuUtilizationPercent => { + format!("sum(gpu_utilization_percent{{node={member_id:?}}})") + } + KvMetricValueField::GpuTemperatureCelsius => { + format!("max(gpu_temperature_celsius{{node={member_id:?}}})") + } + KvMetricValueField::GpuProcessCount => { + format!("sum(gpu_process_count{{node={member_id:?}}})") + } + KvMetricValueField::GpuProcessSmUtilizationPercent => { + format!("sum(gpu_process_sm_utilization_percent{{node={member_id:?}}})") + } + KvMetricValueField::GpuProcessMemoryUtilizationPercent => { + format!("sum(gpu_process_memory_utilization_percent{{node={member_id:?}}})") + } KvMetricValueField::SegUsedBytes => { format!("sum(kvcache_segment_used_bytes{{node={member_id:?}}})") } + KvMetricValueField::SegCapacityBytes => { + format!("sum(kvcache_segment_capacity_bytes{{node={member_id:?}}})") + } KvMetricValueField::TokioGlobalQueueDepth => { format!("tokio_global_queue_depth{{node={member_id:?}}}") } @@ -712,6 +1178,10 @@ fn kv_metric_promql_for_member(spec: KvMetricSpec, member_id: &str) -> String { } } +fn kv_metric_promql_for_member(spec: KvMetricSpec, member_id: &str) -> String { + kv_metric_promql_for_field(spec.field, member_id) +} + #[derive(Debug, Clone)] struct KvMetricMemberRef { member_id: String, @@ -719,6 +1189,58 @@ struct KvMetricMemberRef { node_key: String, } +#[derive(Debug, Clone)] +struct KvMetricOwnerRef { + owner_id: String, + node_key: String, + members: Vec, +} + +fn kv_metric_member_matches( + member: &crate::model::MemberSnapshot, + spec: KvMetricSpec, + visible_roles: Option<&Vec>, +) -> bool { + if !spec.roles.contains(&member.role) { + return false; + } + if let Some(v) = visible_roles { + if !v.contains(&member.role) { + return false; + } + } + true +} + +fn kv_metric_series_member_matches( + member: &crate::model::MemberSnapshot, + spec: KvMetricSeriesSpec, + visible_roles: Option<&Vec>, +) -> bool { + if let Some(roles) = spec.roles { + if !roles.contains(&member.role) { + return false; + } + } + if let Some(v) = visible_roles { + if !v.contains(&member.role) { + return false; + } + } + true +} + +fn kv_metric_member_ref( + member: &crate::model::MemberSnapshot, + node_key: &str, +) -> KvMetricMemberRef { + KvMetricMemberRef { + member_id: member.member_id.clone(), + role: member.role, + node_key: node_key.to_string(), + } +} + fn select_kv_metric_members( snapshot: &crate::model::ClusterSnapshot, spec: KvMetricSpec, @@ -727,22 +1249,70 @@ fn select_kv_metric_members( let mut out = Vec::new(); for node in &snapshot.nodes { for member in &node.members { - if !spec.roles.contains(&member.role) { + if !kv_metric_member_matches(member, spec, visible_roles) { continue; } - if let Some(v) = visible_roles { - if !v.contains(&member.role) { - continue; - } + out.push(kv_metric_member_ref(member, &node.node_key)); + } + } + out +} + +fn select_kv_metric_owner_groups( + snapshot: &crate::model::ClusterSnapshot, + spec: KvMetricSpec, + visible_roles: Option<&Vec>, +) -> (Vec, Vec) { + let mut out = Vec::new(); + let mut warnings = Vec::new(); + + for node in &snapshot.nodes { + let mut owners = node + .members + .iter() + .filter(|m| m.role == MemberRole::OwnerClient) + .collect::>(); + owners.sort_by(|a, b| a.member_id.cmp(&b.member_id)); + if owners.is_empty() { + continue; + } + + if owners.len() == 1 { + let members = node + .members + .iter() + .filter(|member| kv_metric_member_matches(member, spec, visible_roles)) + .map(|member| kv_metric_member_ref(member, &node.node_key)) + .collect::>(); + if members.is_empty() { + continue; } - out.push(KvMetricMemberRef { - member_id: member.member_id.clone(), - role: member.role, + out.push(KvMetricOwnerRef { + owner_id: owners[0].member_id.clone(), node_key: node.node_key.clone(), + members, + }); + continue; + } + + warnings.push(format!( + "metric {} owner view: multiple owner_client members under node_key={}, grouping owner_client members only", + spec.key, node.node_key + )); + for owner in owners { + if !kv_metric_member_matches(owner, spec, visible_roles) { + continue; + } + out.push(KvMetricOwnerRef { + owner_id: owner.member_id.clone(), + node_key: node.node_key.clone(), + members: vec![kv_metric_member_ref(owner, &node.node_key)], }); } } - out + + out.sort_by(|a, b| a.owner_id.cmp(&b.owner_id)); + (out, warnings) } fn prom_regex_escape_literal_local(s: &str) -> String { @@ -775,28 +1345,104 @@ fn prom_regex_union_exact_local(ids: &[String]) -> Option { } } -fn kv_metric_aggregate_promql(spec: KvMetricSpec, member_ids: &[String]) -> Result { +fn kv_metric_aggregate_promql_for_field( + field: KvMetricValueField, + aggregate: KvMetricAggregate, + member_ids: &[String], +) -> Result { let member_regex = prom_regex_union_exact_local(member_ids) .ok_or_else(|| "no visible members for metric".to_string())?; - let promql = match spec.field { + let promql = match field { KvMetricValueField::PutRps => format!( - "sum(sum_over_time(kv_op_end_event{{node=~{member_regex:?},op=\"put\",status=\"success\"}}[1s]))" + "sum(kv_op_end_event_rps{{node=~{member_regex:?},op=\"put\",status=\"success\"}})" ), KvMetricValueField::GetRps => format!( - "sum(sum_over_time(kv_op_end_event{{node=~{member_regex:?},op=\"get\",status=~\"hit|success\"}}[1s]))" + "sum(kv_op_end_event_rps{{node=~{member_regex:?},op=\"get\",status=~\"hit|success\"}})" ), + KvMetricValueField::GetCacheHitRatePercent => { + format!("avg(kv_get_cache_hit_rate_percent{{node=~{member_regex:?}}})") + } KvMetricValueField::PutBps => format!( - "sum(sum_over_time(kv_op_end_bytes{{node=~{member_regex:?},op=\"put\",status=\"success\"}}[1s]))" + "sum(kv_op_end_bytes_per_sec{{node=~{member_regex:?},op=\"put\",status=\"success\"}})" ), KvMetricValueField::GetBps => format!( - "sum(sum_over_time(kv_op_end_bytes{{node=~{member_regex:?},op=\"get\",status=~\"hit|success\"}}[1s]))" + "sum(kv_op_end_bytes_per_sec{{node=~{member_regex:?},op=\"get\",status=~\"hit|success\"}})" ), + KvMetricValueField::ProcessCpuUsagePercent => match aggregate { + KvMetricAggregate::Mean => { + format!("avg(process_cpu_usage_percent{{node=~{member_regex:?}}})") + } + KvMetricAggregate::Max => { + format!("max(process_cpu_usage_percent{{node=~{member_regex:?}}})") + } + KvMetricAggregate::Sum => { + format!("sum(process_cpu_usage_percent{{node=~{member_regex:?}}})") + } + }, + KvMetricValueField::NodeCpuUsagePercent => { + format!( + "sum(node_cpu_usage_percent{{node=~{member_regex:?}}} * node_cpu_logical_cores{{node=~{member_regex:?}}})" + ) + } + KvMetricValueField::NodeCpuCapacityPercent => { + format!("sum(node_cpu_logical_cores{{node=~{member_regex:?}}} * 100)") + } + KvMetricValueField::ProcessNetworkTxMbps => { + format!( + "sum({PROM_METRIC_CLIENT_NETWORK_MBPS}{{node=~{member_regex:?},direction=\"tx\"}})" + ) + } + KvMetricValueField::ProcessNetworkRxMbps => { + format!( + "sum({PROM_METRIC_CLIENT_NETWORK_MBPS}{{node=~{member_regex:?},direction=\"rx\"}})" + ) + } + KvMetricValueField::NodeMemoryUsageBytes => { + format!("sum(node_memory_usage_bytes{{node=~{member_regex:?}}})") + } + KvMetricValueField::NodeMemoryTotalBytes => { + format!("sum(node_memory_total_bytes{{node=~{member_regex:?}}})") + } + KvMetricValueField::NodeNetworkTxMbps => { + format!( + "sum(rate(node_network_transmit_bytes_total{{node=~{member_regex:?}}}[2m])) * 8 / 1000000" + ) + } + KvMetricValueField::NodeNetworkRxMbps => { + format!( + "sum(rate(node_network_receive_bytes_total{{node=~{member_regex:?}}}[2m])) * 8 / 1000000" + ) + } KvMetricValueField::ProcessRss => { format!("sum(process_resident_memory_bytes{{node=~{member_regex:?}}})") } + KvMetricValueField::GpuMemoryUsed => { + format!("sum(gpu_memory_used_bytes{{node=~{member_regex:?}}})") + } + KvMetricValueField::GpuMemoryTotal => { + format!("sum(gpu_memory_total_bytes{{node=~{member_regex:?}}})") + } + KvMetricValueField::GpuUtilizationPercent => { + format!("sum(gpu_utilization_percent{{node=~{member_regex:?}}})") + } + KvMetricValueField::GpuTemperatureCelsius => { + format!("max(gpu_temperature_celsius{{node=~{member_regex:?}}})") + } + KvMetricValueField::GpuProcessCount => { + format!("sum(gpu_process_count{{node=~{member_regex:?}}})") + } + KvMetricValueField::GpuProcessSmUtilizationPercent => { + format!("sum(gpu_process_sm_utilization_percent{{node=~{member_regex:?}}})") + } + KvMetricValueField::GpuProcessMemoryUtilizationPercent => { + format!("sum(gpu_process_memory_utilization_percent{{node=~{member_regex:?}}})") + } KvMetricValueField::SegUsedBytes => { format!("sum(kvcache_segment_used_bytes{{node=~{member_regex:?}}})") } + KvMetricValueField::SegCapacityBytes => { + format!("sum(kvcache_segment_capacity_bytes{{node=~{member_regex:?}}})") + } KvMetricValueField::TokioGlobalQueueDepth => { format!("sum(tokio_global_queue_depth{{node=~{member_regex:?}}})") } @@ -807,6 +1453,194 @@ fn kv_metric_aggregate_promql(spec: KvMetricSpec, member_ids: &[String]) -> Resu Ok(promql) } +fn kv_metric_aggregate_promql(spec: KvMetricSpec, member_ids: &[String]) -> Result { + kv_metric_aggregate_promql_for_field(spec.field, spec.aggregate, member_ids) +} + +fn kv_metric_series_from_range(range: Vec) -> Vec<(f64, f64)> { + range + .into_iter() + .flat_map(|series| { + series + .values + .into_iter() + .filter_map(|(ts, value)| value.parse::().ok().map(|v| (ts, v))) + }) + .collect::>() +} + +async fn query_kv_metric_member_series( + prom: &PromClient, + spec: KvMetricSpec, + member: KvMetricMemberRef, + start_s: f64, + end_s: f64, + step: &str, + warnings: &mut Vec, +) -> Option { + let promql = kv_metric_promql_for_member(spec, &member.member_id); + let range = match prom.query_range(&promql, start_s, end_s, step).await { + Ok(v) => v, + Err(e) => { + warnings.push(format!( + "metric {} member {} query_range failed: {}", + spec.key, member.member_id, e + )); + return None; + } + }; + let series = kv_metric_series_from_range(range); + Some(KvMemberSeriesWire { + member_id: member.member_id, + role: member.role.as_str().to_string(), + node_key: member.node_key, + latest: series.last().map(|(_, v)| *v), + series, + comparison_latest: None, + comparison_series: Vec::new(), + additional_series: Vec::new(), + }) +} + +async fn query_kv_metric_series_for_field( + prom: &PromClient, + field: KvMetricValueField, + member_id: &str, + start_s: f64, + end_s: f64, + step: &str, +) -> Result, String> { + let promql = kv_metric_promql_for_field(field, member_id); + prom.query_range(&promql, start_s, end_s, step) + .await + .map(kv_metric_series_from_range) + .map_err(|e| e.to_string()) +} + +async fn query_kv_metric_additional_aggregate_series( + prom: &PromClient, + spec: KvMetricSeriesSpec, + member_ids: &[String], + start_s: f64, + end_s: f64, + step: &str, +) -> Result { + let promql = kv_metric_aggregate_promql_for_field(spec.field, spec.aggregate, member_ids)?; + let series = prom + .query_range(&promql, start_s, end_s, step) + .await + .map(kv_metric_series_from_range) + .map_err(|e| e.to_string())?; + Ok(KvMetricAdditionalSeriesWire { + metric: kv_metric_meta_from_series(spec), + latest: series.last().map(|(_, v)| *v), + series, + }) +} + +async fn query_kv_metric_additional_member_series( + prom: &PromClient, + spec: KvMetricSeriesSpec, + member_id: &str, + start_s: f64, + end_s: f64, + step: &str, +) -> Result { + let series = + query_kv_metric_series_for_field(prom, spec.field, member_id, start_s, end_s, step).await?; + Ok(KvMetricAdditionalSeriesWire { + metric: kv_metric_meta_from_series(spec), + latest: series.last().map(|(_, v)| *v), + series, + }) +} + +fn kv_metric_additional_members_for_node( + snapshot: &crate::model::ClusterSnapshot, + node_key: &str, + spec: KvMetricSeriesSpec, + visible_roles: Option<&Vec>, +) -> Vec { + let mut out = Vec::new(); + let mut seen = BTreeSet::new(); + for node in &snapshot.nodes { + if node.node_key != node_key { + continue; + } + for member in &node.members { + if !kv_metric_series_member_matches(member, spec, visible_roles) { + continue; + } + if seen.insert(member.member_id.clone()) { + out.push(kv_metric_member_ref(member, &node.node_key)); + } + } + } + out +} + +fn sort_kv_member_series_rows(rows: &mut [KvMemberSeriesWire]) { + rows.sort_by(|a, b| { + let av = a.latest.unwrap_or(f64::NEG_INFINITY); + let bv = b.latest.unwrap_or(f64::NEG_INFINITY); + bv.partial_cmp(&av) + .unwrap_or(std::cmp::Ordering::Equal) + .then_with(|| a.member_id.cmp(&b.member_id)) + }); +} + +fn aggregate_kv_member_series( + aggregate: KvMetricAggregate, + members: &[KvMemberSeriesWire], +) -> Vec<(f64, f64)> { + let mut by_ts: BTreeMap = BTreeMap::new(); + let mut counts_by_ts: BTreeMap = BTreeMap::new(); + for member in members { + for (ts, value) in &member.series { + if !ts.is_finite() || !value.is_finite() { + continue; + } + let ts_ms = (*ts * 1000.0).round() as i64; + match aggregate { + KvMetricAggregate::Sum => { + *by_ts.entry(ts_ms).or_insert(0.0) += *value; + } + KvMetricAggregate::Max => { + by_ts + .entry(ts_ms) + .and_modify(|cur| { + if *value > *cur { + *cur = *value; + } + }) + .or_insert(*value); + } + KvMetricAggregate::Mean => { + *by_ts.entry(ts_ms).or_insert(0.0) += *value; + *counts_by_ts.entry(ts_ms).or_insert(0) += 1; + } + } + } + } + by_ts + .into_iter() + .map(|(ts_ms, value)| { + let value = match aggregate { + KvMetricAggregate::Mean => { + let count = counts_by_ts.get(&ts_ms).copied().unwrap_or(0); + if count == 0 { + value + } else { + value / count as f64 + } + } + _ => value, + }; + (ts_ms as f64 / 1000.0, value) + }) + .collect() +} + async fn kv_metric_panel( State(st): State>, Query(q): Query, @@ -863,8 +1697,12 @@ async fn kv_metric_panel( if members.is_empty() { cards.push(KvAggregateMetricCardWire { metric: kv_metric_meta(spec), + comparison_metric: kv_metric_comparison_meta(spec), latest: None, aggregate_series: Vec::new(), + comparison_latest: None, + comparison_series: Vec::new(), + additional_series: Vec::new(), }); continue; } @@ -878,8 +1716,12 @@ async fn kv_metric_panel( warnings.push(format!("metric {} unavailable: {}", spec.key, e)); cards.push(KvAggregateMetricCardWire { metric: kv_metric_meta(spec), + comparison_metric: kv_metric_comparison_meta(spec), latest: None, aggregate_series: Vec::new(), + comparison_latest: None, + comparison_series: Vec::new(), + additional_series: Vec::new(), }); continue; } @@ -890,26 +1732,114 @@ async fn kv_metric_panel( warnings.push(format!("metric {} query_range failed: {}", spec.key, e)); cards.push(KvAggregateMetricCardWire { metric: kv_metric_meta(spec), + comparison_metric: kv_metric_comparison_meta(spec), latest: None, aggregate_series: Vec::new(), + comparison_latest: None, + comparison_series: Vec::new(), + additional_series: Vec::new(), }); continue; } }; - let aggregate_series = range - .into_iter() - .flat_map(|series| { - series - .values - .into_iter() - .filter_map(|(ts, value)| value.parse::().ok().map(|v| (ts, v))) - }) - .collect::>(); + let aggregate_series = kv_metric_series_from_range(range); let latest = aggregate_series.last().map(|(_, v)| *v); + let member_node_keys = members + .iter() + .map(|m| m.node_key.clone()) + .collect::>(); + let (comparison_latest, comparison_series) = match spec.comparison { + Some(comparison) => { + let comparison_promql = match kv_metric_aggregate_promql_for_field( + comparison.field, + comparison.aggregate, + &member_ids, + ) { + Ok(v) => v, + Err(e) => { + warnings.push(format!("metric {} comparison unavailable: {}", spec.key, e)); + String::new() + } + }; + if comparison_promql.is_empty() { + (None, Vec::new()) + } else { + match prom + .query_range(&comparison_promql, start_s, end_s, &step) + .await + { + Ok(v) => { + let series = kv_metric_series_from_range(v); + (series.last().map(|(_, v)| *v), series) + } + Err(e) => { + warnings.push(format!( + "metric {} comparison query_range failed: {}", + spec.key, e + )); + (None, Vec::new()) + } + } + } + } + None => (None, Vec::new()), + }; + let mut additional_series = Vec::with_capacity(spec.additional.len()); + for additional in spec.additional.iter().copied() { + let mut additional_member_ids = Vec::new(); + let mut seen = BTreeSet::new(); + for node_key in &member_node_keys { + for member in kv_metric_additional_members_for_node( + &snapshot, + node_key, + additional, + visible_member_roles.as_ref(), + ) { + if seen.insert(member.member_id.clone()) { + additional_member_ids.push(member.member_id); + } + } + } + if additional_member_ids.is_empty() { + additional_series.push(KvMetricAdditionalSeriesWire { + metric: kv_metric_meta_from_series(additional), + latest: None, + series: Vec::new(), + }); + continue; + } + match query_kv_metric_additional_aggregate_series( + &prom, + additional, + &additional_member_ids, + start_s, + end_s, + &step, + ) + .await + { + Ok(row) => additional_series.push(row), + Err(e) => { + warnings.push(format!( + "metric {} additional {} query_range failed: {}", + spec.key, additional.key, e + )); + additional_series.push(KvMetricAdditionalSeriesWire { + metric: kv_metric_meta_from_series(additional), + latest: None, + series: Vec::new(), + }); + } + } + } cards.push(KvAggregateMetricCardWire { metric: kv_metric_meta(spec), + comparison_metric: kv_metric_comparison_meta(spec), latest, aggregate_series, + comparison_latest, + comparison_series, + additional_series, }); } @@ -1000,51 +1930,301 @@ async fn kv_metric_members( let mut warnings = snapshot.warnings.clone(); let mut rows = Vec::with_capacity(members.len()); for member in members { - let promql = kv_metric_promql_for_member(spec, &member.member_id); - let range = match prom.query_range(&promql, start_s, end_s, &step).await { - Ok(v) => v, - Err(e) => { - warnings.push(format!( - "metric {} member {} query_range failed: {}", - spec.key, member.member_id, e - )); - continue; + if let Some(mut row) = + query_kv_metric_member_series(&prom, spec, member, start_s, end_s, &step, &mut warnings) + .await + { + if let Some(comparison) = spec.comparison { + match query_kv_metric_series_for_field( + &prom, + comparison.field, + &row.member_id, + start_s, + end_s, + &step, + ) + .await + { + Ok(series) => { + row.comparison_latest = series.last().map(|(_, v)| *v); + row.comparison_series = series; + } + Err(e) => warnings.push(format!( + "metric {} member {} comparison query_range failed: {}", + spec.key, row.member_id, e + )), + } + } + for additional in spec.additional.iter().copied() { + match query_kv_metric_additional_member_series( + &prom, + additional, + &row.member_id, + start_s, + end_s, + &step, + ) + .await + { + Ok(series) => row.additional_series.push(series), + Err(e) => warnings.push(format!( + "metric {} member {} additional {} query_range failed: {}", + spec.key, row.member_id, additional.key, e + )), + } + } + rows.push(row); + } + } + sort_kv_member_series_rows(&mut rows); + + let mut resp = ( + StatusCode::OK, + Json(KvMetricMembersResponse { + metric: kv_metric_meta(spec), + comparison_metric: kv_metric_comparison_meta(spec), + additional_metrics: kv_metric_additional_meta(spec), + range: KvMetricRangeWire { + window: window_label, + step_s, + }, + members: rows, + warnings, + }), + ) + .into_response(); + resp.headers_mut().insert( + header::CONTENT_TYPE, + "application/json; charset=utf-8".parse().unwrap(), + ); + resp +} + +async fn kv_metric_owners( + State(st): State>, + Query(q): Query, +) -> Response { + let Some(cluster_name) = q.cluster_name.as_ref() else { + return text_response( + StatusCode::BAD_REQUEST, + "missing query param: cluster_name".to_string(), + ); + }; + let Some(metric_key) = q.metric_key.as_ref() else { + return text_response( + StatusCode::BAD_REQUEST, + "missing query param: metric_key".to_string(), + ); + }; + let spec = match kv_metric_spec_by_key(metric_key) { + Some(v) => v, + None => { + return text_response( + StatusCode::BAD_REQUEST, + format!("invalid metric_key: {}", metric_key), + ); + } + }; + let visible_member_roles = match parse_member_roles_list(q.member_roles.as_ref()) { + Ok(v) => v, + Err(e) => return text_response(StatusCode::BAD_REQUEST, e), + }; + let (window_label, window_secs, step_s) = match parse_kv_metric_window(q.window.as_deref()) { + Ok(v) => v, + Err(e) => return text_response(StatusCode::BAD_REQUEST, e), + }; + let cfg = MonitorConfig { + etcd_endpoints: st.cfg.etcd_endpoints.clone(), + prometheus_base_url: st.cfg.prometheus_base_url.clone(), + cluster_name: cluster_name.clone(), + member_kind: MemberKind::Kv, + output: OutputFormat::Web, + mq_unique_key_prefixes: st.cfg.mq_unique_key_prefixes.clone(), + http_listen_addr: st.cfg.http_listen_addr.clone(), + greptime_sql: st.cfg.greptime_sql.clone(), + }; + let snapshot = match crate::build_cluster_snapshot(&cfg).await { + Ok(v) => v, + Err(e) => { + return text_response( + StatusCode::BAD_GATEWAY, + format!("snapshot build failed: {}", e), + ); + } + }; + let (groups, mut owner_warnings) = + select_kv_metric_owner_groups(&snapshot, spec, visible_member_roles.as_ref()); + let prom = PromClient::new(st.cfg.prometheus_base_url.clone()); + let end_s = match prom.effective_query_time_s() { + Ok(v) => v, + Err(e) => { + return text_response( + StatusCode::BAD_GATEWAY, + format!("resolve query time failed: {}", e), + ); + } + }; + let start_s = (end_s - window_secs).max(0.0); + let step = format!("{}s", step_s); + let mut warnings = snapshot.warnings.clone(); + warnings.append(&mut owner_warnings); + + let mut owners = Vec::with_capacity(groups.len()); + for group in groups { + let mut members = Vec::with_capacity(group.members.len()); + for member in group.members.iter().cloned() { + if let Some(mut row) = query_kv_metric_member_series( + &prom, + spec, + member, + start_s, + end_s, + &step, + &mut warnings, + ) + .await + { + if let Some(comparison) = spec.comparison { + match query_kv_metric_series_for_field( + &prom, + comparison.field, + &row.member_id, + start_s, + end_s, + &step, + ) + .await + { + Ok(series) => { + row.comparison_latest = series.last().map(|(_, v)| *v); + row.comparison_series = series; + } + Err(e) => warnings.push(format!( + "metric {} member {} comparison query_range failed: {}", + spec.key, row.member_id, e + )), + } + } + members.push(row); } + } + sort_kv_member_series_rows(&mut members); + let series = aggregate_kv_member_series(spec.aggregate, &members); + let (comparison_latest, comparison_series) = match spec.comparison { + Some(comparison) => { + let comparison_members = members + .iter() + .map(|member| KvMemberSeriesWire { + member_id: member.member_id.clone(), + role: member.role.clone(), + node_key: member.node_key.clone(), + latest: member.comparison_latest, + series: member.comparison_series.clone(), + comparison_latest: None, + comparison_series: Vec::new(), + additional_series: Vec::new(), + }) + .collect::>(); + let comparison_series = + aggregate_kv_member_series(comparison.aggregate, &comparison_members); + (comparison_series.last().map(|(_, v)| *v), comparison_series) + } + None => (None, Vec::new()), }; - let series = range - .into_iter() - .flat_map(|series| { - series - .values - .into_iter() - .filter_map(|(ts, value)| value.parse::().ok().map(|v| (ts, v))) - }) - .collect::>(); - rows.push(KvMemberSeriesWire { - member_id: member.member_id, - role: member.role.as_str().to_string(), - node_key: member.node_key, + for member in &mut members { + for additional in spec.additional.iter().copied() { + match query_kv_metric_additional_member_series( + &prom, + additional, + &member.member_id, + start_s, + end_s, + &step, + ) + .await + { + Ok(row) => member.additional_series.push(row), + Err(e) => warnings.push(format!( + "metric {} member {} additional {} query_range failed: {}", + spec.key, member.member_id, additional.key, e + )), + } + } + } + let mut owner_additional_series = Vec::with_capacity(spec.additional.len()); + for additional in spec.additional.iter().copied() { + let additional_members = kv_metric_additional_members_for_node( + &snapshot, + &group.node_key, + additional, + visible_member_roles.as_ref(), + ); + let additional_member_ids = additional_members + .into_iter() + .map(|m| m.member_id) + .collect::>(); + if additional_member_ids.is_empty() { + owner_additional_series.push(KvMetricAdditionalSeriesWire { + metric: kv_metric_meta_from_series(additional), + latest: None, + series: Vec::new(), + }); + continue; + } + match query_kv_metric_additional_aggregate_series( + &prom, + additional, + &additional_member_ids, + start_s, + end_s, + &step, + ) + .await + { + Ok(row) => owner_additional_series.push(row), + Err(e) => { + warnings.push(format!( + "metric {} owner {} additional {} query_range failed: {}", + spec.key, group.owner_id, additional.key, e + )); + owner_additional_series.push(KvMetricAdditionalSeriesWire { + metric: kv_metric_meta_from_series(additional), + latest: None, + series: Vec::new(), + }); + } + } + } + owners.push(KvOwnerSeriesWire { + owner_id: group.owner_id, + node_key: group.node_key, latest: series.last().map(|(_, v)| *v), series, + comparison_latest, + comparison_series, + additional_series: owner_additional_series, + members, }); } - rows.sort_by(|a, b| { + owners.sort_by(|a, b| { let av = a.latest.unwrap_or(f64::NEG_INFINITY); let bv = b.latest.unwrap_or(f64::NEG_INFINITY); bv.partial_cmp(&av) .unwrap_or(std::cmp::Ordering::Equal) - .then_with(|| a.member_id.cmp(&b.member_id)) + .then_with(|| a.owner_id.cmp(&b.owner_id)) }); let mut resp = ( StatusCode::OK, - Json(KvMetricMembersResponse { + Json(KvMetricOwnersResponse { metric: kv_metric_meta(spec), + comparison_metric: kv_metric_comparison_meta(spec), + additional_metrics: kv_metric_additional_meta(spec), range: KvMetricRangeWire { window: window_label, step_s, }, - members: rows, + owners, warnings, }), ) @@ -2771,6 +3951,7 @@ fn build_router(st: Arc) -> Router { .route("/api/clusters", get(api_clusters)) .route("/api/kv_metric_panel", get(kv_metric_panel)) .route("/api/kv_metric_members", get(kv_metric_members)) + .route("/api/kv_metric_owners", get(kv_metric_owners)) .route("/view", get(view)) .route("/topology", get(topology_page)) .route("/cli", get(cli)) @@ -3434,3 +4615,360 @@ where .await .with_context(|| format!("http serve failed at {}", listen_addr)) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::MemberKind; + use crate::model::{ClusterSnapshot, MemberSnapshot, NodeSnapshot}; + + fn test_member(member_id: &str, role: MemberRole, node_start_time: i64) -> MemberSnapshot { + MemberSnapshot { + member_id: member_id.to_string(), + role, + is_p2p_relay: false, + is_side_transfer_worker: false, + node_start_time, + hostname: None, + accessible_ip: None, + shared_mem_dir: None, + p2p_listen_port: None, + rdma_runtime_reported: false, + rdma_probe_error: None, + rdma_devices: Vec::new(), + rdma_ports: Vec::new(), + rdma_transfer_engine: None, + pid: None, + cmd: None, + sub_cluster: None, + product_uuid: None, + node_cpu_usage_percent: None, + node_cpu_logical_cores: None, + node_memory_usage_bytes: None, + node_memory_total_bytes: None, + container_memory_usage_bytes: None, + container_memory_limit_bytes: None, + gpus: Vec::new(), + process_resident_memory_bytes: None, + process_cpu_usage_percent: None, + tokio_num_workers: None, + tokio_alive_tasks: None, + tokio_global_queue_depth: None, + tokio_busy_percent: None, + tokio_max_worker_busy_percent: None, + tokio_park_unpark_rate_hz: None, + process_net_tx_mbps: None, + process_net_rx_mbps: None, + kv_put_rps: None, + kv_get_rps: None, + kv_put_bps: None, + kv_get_bps: None, + kv_put_latency_mean_us: None, + kv_put_latency_p95_us: None, + kv_put_latency_p99_us: None, + kv_get_latency_mean_us: None, + kv_get_latency_p95_us: None, + kv_get_latency_p99_us: None, + seg_capacity_bytes: None, + seg_used_bytes: None, + fs_read_rps: None, + fs_write_rps: None, + } + } + + fn test_snapshot(nodes: Vec) -> ClusterSnapshot { + ClusterSnapshot { + cluster_name: "test_cluster".to_string(), + member_kind: MemberKind::Kv, + etcd_endpoints: Vec::new(), + prometheus_base_url: "http://example.invalid".to_string(), + warnings: Vec::new(), + visible_member_roles: None, + master_id: None, + master_network: None, + transfer_engine_edges: Vec::new(), + kv_peer_network: Vec::new(), + rdma_netdev_network: Vec::new(), + fs_mount_fs: Vec::new(), + shm_files: Vec::new(), + fs_export_registry: Vec::new(), + fs_mount_registry: Vec::new(), + kv_topology_owner_external_max: Vec::new(), + kv_topology_machine_external_max: Vec::new(), + kv_topology_sub_cluster_owner_owner_max: Vec::new(), + nodes, + mq: None, + total_put_rps: None, + total_get_rps: None, + total_put_bps: None, + total_get_bps: None, + total_put_latency_mean_us: None, + total_put_latency_p95_us: None, + total_put_latency_p99_us: None, + total_get_latency_mean_us: None, + total_get_latency_p95_us: None, + total_get_latency_p99_us: None, + } + } + + #[test] + fn cache_hit_metric_uses_windowed_gauge_queries() { + let spec = kv_metric_spec_by_key("get_cache_hit_rate_percent").unwrap(); + let member_promql = kv_metric_promql_for_member(spec, "owner_a"); + let aggregate_promql = + kv_metric_aggregate_promql(spec, &[String::from("owner_a")]).unwrap(); + assert_eq!( + member_promql, + "kv_get_cache_hit_rate_percent{node=\"owner_a\"}" + ); + assert_eq!( + aggregate_promql, + "avg(kv_get_cache_hit_rate_percent{node=~\"^(?:owner_a)$\"})" + ); + } + + #[test] + fn gpu_percent_metrics_use_sum_aggregation_so_totals_can_exceed_100() { + let spec = kv_metric_spec_by_key("gpu_utilization_percent").unwrap(); + assert_eq!(spec.aggregate, KvMetricAggregate::Sum); + let member_promql = kv_metric_promql_for_member(spec, "owner_a"); + let aggregate_promql = + kv_metric_aggregate_promql(spec, &[String::from("owner_a"), String::from("owner_b")]) + .unwrap(); + assert_eq!( + member_promql, + "sum(gpu_utilization_percent{node=\"owner_a\"})" + ); + assert_eq!( + aggregate_promql, + "sum(gpu_utilization_percent{node=~\"^(?:owner_a|owner_b)$\"})" + ); + } + + #[test] + fn owner_grouping_keeps_single_owner_node_members_together() { + let spec = kv_metric_spec_by_key("get_cache_hit_rate_percent").unwrap(); + let snapshot = test_snapshot(vec![ + NodeSnapshot { + node_key: "node_a".to_string(), + hostname: None, + accessible_ip: None, + shared_mem_dir: None, + is_p2p_relay: false, + node_cpu_usage_percent: None, + node_cpu_logical_cores: None, + node_memory_usage_bytes: None, + node_memory_total_bytes: None, + container_memory_usage_bytes: None, + container_memory_limit_bytes: None, + members: vec![ + test_member("owner_a", MemberRole::OwnerClient, 1), + test_member("master_a", MemberRole::Master, 1), + ], + segment_devices: Vec::new(), + }, + NodeSnapshot { + node_key: "node_b".to_string(), + hostname: None, + accessible_ip: None, + shared_mem_dir: None, + is_p2p_relay: false, + node_cpu_usage_percent: None, + node_cpu_logical_cores: None, + node_memory_usage_bytes: None, + node_memory_total_bytes: None, + container_memory_usage_bytes: None, + container_memory_limit_bytes: None, + members: vec![test_member("owner_b", MemberRole::OwnerClient, 1)], + segment_devices: Vec::new(), + }, + ]); + let (groups, warnings) = select_kv_metric_owner_groups(&snapshot, spec, None); + assert!(warnings.is_empty()); + assert_eq!(groups.len(), 2); + assert_eq!(groups[0].owner_id, "owner_a"); + assert_eq!(groups[0].members.len(), 1); + assert_eq!(groups[0].members[0].member_id, "owner_a"); + assert_eq!(groups[1].owner_id, "owner_b"); + } + + #[test] + fn aggregate_mean_series_is_computed_per_timestamp() { + let members = vec![ + KvMemberSeriesWire { + member_id: "a".to_string(), + role: "owner_client".to_string(), + node_key: "node_a".to_string(), + latest: Some(30.0), + series: vec![(1.0, 10.0), (2.0, 30.0)], + comparison_latest: None, + comparison_series: Vec::new(), + additional_series: Vec::new(), + }, + KvMemberSeriesWire { + member_id: "b".to_string(), + role: "owner_client".to_string(), + node_key: "node_b".to_string(), + latest: Some(50.0), + series: vec![(1.0, 20.0), (2.0, 50.0)], + comparison_latest: None, + comparison_series: Vec::new(), + additional_series: Vec::new(), + }, + ]; + let out = aggregate_kv_member_series(KvMetricAggregate::Mean, &members); + assert_eq!(out, vec![(1.0, 15.0), (2.0, 40.0)]); + } + + #[test] + fn cpu_metric_uses_sum_aggregation_so_totals_can_exceed_100() { + let spec = kv_metric_spec_by_key("process_cpu_usage_percent").unwrap(); + assert_eq!(spec.aggregate, KvMetricAggregate::Sum); + assert!(spec.roles.contains(&MemberRole::OwnerClient)); + assert!(spec.roles.contains(&MemberRole::ExternalClient)); + assert!(spec.roles.contains(&MemberRole::SideTransferWorker)); + let member_promql = kv_metric_promql_for_member(spec, "owner_a"); + let aggregate_promql = kv_metric_aggregate_promql( + spec, + &[String::from("owner_a"), String::from("external_a")], + ) + .unwrap(); + assert_eq!(member_promql, "process_cpu_usage_percent{node=\"owner_a\"}"); + assert_eq!( + aggregate_promql, + "sum(process_cpu_usage_percent{node=~\"^(?:owner_a|external_a)$\"})" + ); + + let process_network_spec = kv_metric_spec_by_key("process_network_tx_mbps").unwrap(); + let process_network_rx = process_network_spec + .comparison + .expect("process network rx comparison"); + assert!( + process_network_spec + .roles + .contains(&MemberRole::OwnerClient) + ); + assert!( + process_network_spec + .roles + .contains(&MemberRole::ExternalClient) + ); + assert!( + process_network_spec + .roles + .contains(&MemberRole::SideTransferWorker) + ); + assert_eq!( + kv_metric_promql_for_member(process_network_spec, "external_a"), + "client_network_mbps{node=\"external_a\",direction=\"tx\"}" + ); + assert_eq!( + kv_metric_aggregate_promql_for_field( + process_network_rx.field, + process_network_rx.aggregate, + &[String::from("owner_a"), String::from("external_a")], + ) + .unwrap(), + "sum(client_network_mbps{node=~\"^(?:owner_a|external_a)$\",direction=\"rx\"})" + ); + } + + #[test] + fn memory_segment_and_gpu_memory_cards_keep_capacity_and_process_memory_series() { + let memory_spec = kv_metric_spec_by_key("node_memory_usage_bytes").unwrap(); + let memory_comparison = memory_spec.comparison.expect("memory total comparison"); + assert_eq!(memory_spec.additional.len(), 1); + let memory_process_rss = memory_spec.additional[0]; + assert_eq!(memory_process_rss.key, "process_rss"); + assert!( + memory_process_rss + .roles + .unwrap() + .contains(&MemberRole::ExternalClient) + ); + assert!( + memory_process_rss + .roles + .unwrap() + .contains(&MemberRole::SideTransferWorker) + ); + assert_eq!( + kv_metric_promql_for_member(memory_spec, "owner_a"), + "node_memory_usage_bytes{node=\"owner_a\"}" + ); + assert_eq!( + kv_metric_promql_for_field(memory_comparison.field, "owner_a"), + "node_memory_total_bytes{node=\"owner_a\"}" + ); + assert_eq!( + kv_metric_aggregate_promql_for_field( + memory_comparison.field, + memory_comparison.aggregate, + &[String::from("owner_a"), String::from("owner_b")], + ) + .unwrap(), + "sum(node_memory_total_bytes{node=~\"^(?:owner_a|owner_b)$\"})" + ); + assert_eq!( + kv_metric_promql_for_field(memory_process_rss.field, "owner_a"), + "process_resident_memory_bytes{node=\"owner_a\"}" + ); + assert_eq!( + kv_metric_aggregate_promql_for_field( + memory_process_rss.field, + memory_process_rss.aggregate, + &[String::from("owner_a"), String::from("ops_a")], + ) + .unwrap(), + "sum(process_resident_memory_bytes{node=~\"^(?:owner_a|ops_a)$\"})" + ); + + let network_spec = kv_metric_spec_by_key("node_network_tx_mbps").unwrap(); + let network_rx = network_spec.comparison.expect("network rx comparison"); + assert_eq!(network_spec.label, "Node Network"); + assert_eq!(network_spec.series_label, "TX"); + assert_eq!( + kv_metric_promql_for_member(network_spec, "owner_a"), + "sum(rate(node_network_transmit_bytes_total{node=\"owner_a\"}[2m])) * 8 / 1000000" + ); + assert_eq!( + kv_metric_promql_for_field(network_rx.field, "owner_a"), + "sum(rate(node_network_receive_bytes_total{node=\"owner_a\"}[2m])) * 8 / 1000000" + ); + assert_eq!( + kv_metric_aggregate_promql_for_field( + network_rx.field, + network_rx.aggregate, + &[String::from("owner_a"), String::from("owner_b")], + ) + .unwrap(), + "sum(rate(node_network_receive_bytes_total{node=~\"^(?:owner_a|owner_b)$\"}[2m])) * 8 / 1000000" + ); + + let segment_spec = kv_metric_spec_by_key("seg_used_bytes").unwrap(); + let segment_comparison = segment_spec + .comparison + .expect("segment capacity comparison"); + assert_eq!( + kv_metric_promql_for_member(segment_spec, "owner_a"), + "sum(kvcache_segment_used_bytes{node=\"owner_a\"})" + ); + assert_eq!( + kv_metric_promql_for_field(segment_comparison.field, "owner_a"), + "sum(kvcache_segment_capacity_bytes{node=\"owner_a\"})" + ); + + let gpu_memory_spec = kv_metric_spec_by_key("gpu_memory_used").unwrap(); + let gpu_memory_comparison = gpu_memory_spec + .comparison + .expect("gpu memory total comparison"); + assert_eq!( + kv_metric_promql_for_member(gpu_memory_spec, "owner_a"), + "sum(gpu_memory_used_bytes{node=\"owner_a\"})" + ); + assert_eq!( + kv_metric_promql_for_field(gpu_memory_comparison.field, "owner_a"), + "sum(gpu_memory_total_bytes{node=\"owner_a\"})" + ); + } +} diff --git a/fluxon_rs/fluxon_cli/src/web_renderer.rs b/fluxon_rs/fluxon_cli/src/web_renderer.rs index ec16ad7..76ec9ce 100644 --- a/fluxon_rs/fluxon_cli/src/web_renderer.rs +++ b/fluxon_rs/fluxon_cli/src/web_renderer.rs @@ -3086,6 +3086,7 @@ mod tests { node_memory_total_bytes: None, container_memory_usage_bytes: None, container_memory_limit_bytes: None, + gpus: Vec::new(), process_resident_memory_bytes: None, process_cpu_usage_percent: None, tokio_num_workers: None, diff --git a/fluxon_rs/fluxon_cli/templates/monitor_table.html b/fluxon_rs/fluxon_cli/templates/monitor_table.html index f804278..ac5c30b 100644 --- a/fluxon_rs/fluxon_cli/templates/monitor_table.html +++ b/fluxon_rs/fluxon_cli/templates/monitor_table.html @@ -42,17 +42,48 @@ .metric_card_label { font-weight: 600; color: #0f172a; } .metric_card_value { font-family: ui-monospace, SFMono-Regular, Menlo, monospace; font-size: 12px; color: #334155; } .metric_chart { width: 100%; height: 96px; display: block; } - .metric_chart_path { fill: none; stroke: #0f766e; stroke-width: 2; } + .metric_chart_path { fill: none; stroke: var(--metric-color, #0f766e); stroke-width: 2; } .metric_chart_area { fill: rgba(15,118,110,0.10); } + .metric_chart_path.metric_chart_path_secondary { stroke: var(--metric-color, #2563eb); stroke-width: 2; } + .metric_chart_area.metric_chart_area_secondary { fill: rgba(37,99,235,0.08); } + .metric_chart_wrap { position: relative; width: 100%; height: 96px; cursor: crosshair; } + .metric_chart_legend { display: flex; gap: 10px; flex-wrap: wrap; margin-top: 4px; } + .metric_chart_legend_item { display: inline-flex; align-items: center; gap: 6px; color: #475569; font-size: 11px; } + .metric_chart_legend_swatch { width: 10px; height: 10px; border-radius: 999px; } + .metric_chart_legend_swatch_primary { background: #0f766e; } + .metric_chart_legend_swatch_secondary { background: #2563eb; } + .metric_chart_hover_line { visibility: hidden; stroke: #475569; stroke-width: 1; stroke-dasharray: 3 3; } + .metric_chart_hover_point_ring { visibility: hidden; fill: rgba(255,255,255,0.92); stroke: var(--metric-color, #0f766e); stroke-width: 2; } + .metric_chart_hover_point { visibility: hidden; fill: var(--metric-color, #0f766e); stroke: #fff; stroke-width: 1.5; } + .metric_chart_hover_point_ring.metric_chart_hover_point_ring_secondary { stroke: #2563eb; } + .metric_chart_hover_point.metric_chart_hover_point_secondary { fill: #2563eb; } + .metric_chart_tooltip { position: fixed; z-index: 10000; pointer-events: none; max-width: 320px; padding: 8px 10px; border: 1px solid #cbd5e1; border-radius: 8px; background: rgba(255,255,255,0.98); color: #0f172a; box-shadow: 0 8px 24px rgba(15,23,42,0.16); font-size: 12px; line-height: 1.35; } + .metric_chart_tooltip .mono { display: block; color: #334155; } .metric_chart_empty { display: grid; place-items: center; height: 96px; color: #94a3b8; font-size: 12px; border: 1px dashed #cbd5e1; border-radius: 8px; } .metric_warn_box { margin-top: 8px; display: grid; gap: 4px; } .member_metric_section { margin-top: 12px; display: grid; gap: 8px; } + .member_metric_sections { margin-top: 12px; display: grid; gap: 12px; } + .member_metric_block { border: 1px solid #e5e7eb; border-radius: 10px; background: #fff; padding: 12px; display: grid; gap: 8px; } .member_metric_head { display: flex; align-items: center; justify-content: space-between; gap: 8px; flex-wrap: wrap; } .member_metric_grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(240px, 1fr)); gap: 12px; } .member_metric_card { border: 1px solid #e5e7eb; border-radius: 10px; background: #f8fafc; padding: 12px; display: grid; gap: 8px; } .member_metric_meta { display: grid; gap: 2px; } .member_metric_id { font-weight: 600; font-family: ui-monospace, SFMono-Regular, Menlo, monospace; } .member_metric_sub { color: #64748b; font-size: 12px; font-family: ui-monospace, SFMono-Regular, Menlo, monospace; } + .member_metric_title_row { display: flex; align-items: center; gap: 8px; flex-wrap: wrap; } + .owner_metric_grid { display: grid; gap: 12px; } + .owner_metric_card { border: 1px solid #e5e7eb; border-radius: 10px; background: #f8fafc; padding: 10px 12px; } + .owner_metric_card[open] { background: #fff; border-color: #94a3b8; } + .owner_metric_summary { cursor: pointer; } + .owner_metric_summary_inner { display: grid; grid-template-columns: minmax(220px, 0.9fr) minmax(260px, 1.1fr); gap: 12px; align-items: center; margin-top: 6px; } + .owner_metric_meta { display: grid; gap: 4px; } + .owner_metric_member_grid { display: grid; gap: 8px; margin-top: 10px; padding-top: 10px; border-top: 1px solid #e5e7eb; } + .owner_metric_member_row { display: grid; grid-template-columns: minmax(220px, 0.85fr) minmax(260px, 1.15fr); gap: 12px; align-items: center; padding: 6px 0; } + .member_metric_close_btn { border: 1px solid #cbd5e1; border-radius: 8px; background: #fff; color: #334155; padding: 4px 8px; cursor: pointer; } + @media (max-width: 760px) { + .owner_metric_summary_inner, + .owner_metric_member_row { grid-template-columns: 1fr; } + } {% endblock %} @@ -135,21 +166,14 @@

Cluster {{ header.cluster_name }}

{% endif %} {% if header.member_kind_query == "kv" %} +
Owner RDMA Control ({{ owner_rdma_controls.len() }} owners) {% if owner_rdma_controls.is_empty() %} @@ -320,6 +344,10 @@

Cluster {{ header.cluster_name }} + {% endif %} +
@@ -337,6 +365,7 @@

Cluster {{ header.cluster_name }} p2p_listen_port {% if header.member_kind_query == "kv" %} + {% endif %} {% if header.member_kind_query != "fs" %} @@ -367,6 +396,8 @@

Cluster {{ header.cluster_name }} rss + + @@ -392,6 +423,8 @@

Cluster {{ header.cluster_name }} rss + + @@ -417,6 +450,8 @@

Cluster {{ header.cluster_name }} rss + + @@ -442,6 +477,8 @@

Cluster {{ header.cluster_name }} rss + + @@ -458,6 +495,9 @@

Cluster {{ header.cluster_name }}

+ {% if header.member_kind_query == "kv" %} +
+ {% endif %}
@@ -475,6 +515,7 @@

Cluster {{ header.cluster_name }} p2p_listen_port {% if header.member_kind_query == "kv" %}

+ {% endif %} @@ -518,6 +559,7 @@

Cluster {{ header.cluster_name }} {% if header.member_kind_query == "kv" %}

+ {% endif %} @@ -546,6 +588,7 @@

Cluster {{ header.cluster_name }} Cluster {{ header.cluster_name }} + data-sort-seg-cap="{{ row.seg_cap_sort }}" + data-sort-gpu-mem="{{ row.gpu_memory_used_sort }}" + data-sort-gpu-util="{{ row.gpu_utilization_sort }}">

@@ -594,6 +639,7 @@

Cluster {{ header.cluster_name }} {{ row.rdma_text }} +

{% endif %} @@ -673,6 +719,9 @@

Cluster {{ header.cluster_name }} + {% endif %} {% if header.member_kind_query == "kv" %} @@ -1016,7 +1065,7 @@

Cluster {{ header.cluster_name }} {{ header.cluster_name }} {{ header.cluster_name }} {{ header.cluster_name }} {{ header.cluster_name }} {{ header.cluster_name }} {{ header.cluster_name }} {{ header.cluster_name }} {{ header.cluster_name }} {{ header.cluster_name }} {{ header.cluster_name }} {{ header.cluster_name }} N/A'; + function escapeHtml(s) { + return String(s || '') + .replaceAll('&', '&') + .replaceAll('<', '<') + .replaceAll('>', '>') + .replaceAll('"', '"') + .replaceAll("'", '''); + } + + function metricChartTooltip() { + let el = document.getElementById('metric_chart_tooltip'); + if (!el) { + el = document.createElement('div'); + el.id = 'metric_chart_tooltip'; + el.className = 'metric_chart_tooltip'; + el.style.display = 'none'; + document.body.appendChild(el); + } + return el; + } + + function setMetricChartHoverVisible(el, visible) { + if (!el) return; + el.style.visibility = visible ? 'visible' : 'hidden'; + } + + function hideMetricChartTooltip() { + const tip = document.getElementById('metric_chart_tooltip'); + if (tip) tip.style.display = 'none'; + document.querySelectorAll('.metric_chart_hover_line,.metric_chart_hover_point,.metric_chart_hover_point_ring').forEach((el) => { + setMetricChartHoverVisible(el, false); + }); + } + + function formatMetricTs(ts) { + const n = Number(ts); + if (!Number.isFinite(n)) return '-'; + const d = new Date(n * 1000); + if (Number.isNaN(d.getTime())) return String(ts); + return d.toLocaleString(); + } + + function positionMetricChartTooltip(tip, clientX, clientY) { + const margin = 12; + tip.style.display = ''; + const rect = tip.getBoundingClientRect(); + let left = clientX + margin; + let top = clientY + margin; + if (left + rect.width > window.innerWidth - margin) { + left = clientX - rect.width - margin; + } + if (top + rect.height > window.innerHeight - margin) { + top = clientY - rect.height - margin; + } + tip.style.left = Math.max(margin, left) + 'px'; + tip.style.top = Math.max(margin, top) + 'px'; + } + + function sanitizeMetricSeries(series) { + if (!Array.isArray(series)) return []; + return series + .map((p) => [Number(Array.isArray(p) ? p[0] : NaN), Number(Array.isArray(p) ? p[1] : NaN)]) + .filter((p) => Number.isFinite(p[0]) && Number.isFinite(p[1])); + } + + const metricChartColors = ['#0f766e', '#2563eb', '#dc2626', '#7c3aed', '#d97706', '#0891b2']; + + function buildMetricLineSeries(series, unit, label, comparisonSeries, comparisonUnit, comparisonLabel, additionalSeries) { + const lines = [{ + label: label || 'Primary', + unit: unit || '', + series: sanitizeMetricSeries(series), + color: metricChartColors[0], + primary: true, + }]; + const comparisonClean = sanitizeMetricSeries(comparisonSeries); + if (comparisonClean.length > 0) { + lines.push({ + label: comparisonLabel || 'Comparison', + unit: comparisonUnit || unit || '', + series: comparisonClean, + color: metricChartColors[1], + primary: false, + }); + } + const extras = Array.isArray(additionalSeries) ? additionalSeries : []; + extras.forEach((row) => { + const clean = sanitizeMetricSeries(row?.series || []); + if (clean.length === 0) return; + const metric = row?.metric || {}; + lines.push({ + label: metric.series_label || metric.label || metric.key || 'Series', + unit: metric.unit || unit || '', + series: clean, + color: metricChartColors[lines.length % metricChartColors.length], + primary: false, + }); + }); + return lines; + } + + function metricSeriesLatestText(primaryLatest, primaryUnit, comparisonMetric, comparisonLatest, additionalSeries) { + const parts = [fmtMetricValue(primaryLatest, primaryUnit || '')]; + if (comparisonMetric) { + parts.push(fmtMetricValue(comparisonLatest, comparisonMetric.unit || primaryUnit || '')); + } + const extras = Array.isArray(additionalSeries) ? additionalSeries : []; + extras.forEach((row) => { + parts.push(fmtMetricValue(row?.latest, row?.metric?.unit || primaryUnit || '')); + }); + return escapeHtml(parts.join(' / ')); + } + + function wireMetricChartHover(root) { + const scope = root || document; + scope.querySelectorAll('.metric_chart_wrap[data-lines]').forEach((wrap) => { + if (wrap.__fluxonMetricHoverWired) return; + wrap.__fluxonMetricHoverWired = true; + const svg = wrap.querySelector('svg.metric_chart'); + const hoverLine = wrap.querySelector('.metric_chart_hover_line'); + if (!svg || !hoverLine) return; + let lines = []; + try { + lines = JSON.parse(wrap.getAttribute('data-lines') || '[]') + .map((line) => ({ + label: String(line?.label || ''), + unit: String(line?.unit || ''), + color: String(line?.color || '#0f766e'), + series: sanitizeMetricSeries(line?.series || []), + })) + .filter((line) => line.series.length > 0); + } catch (_e) { + lines = []; + } + const primaryLine = lines[0] || null; + if (!primaryLine) return; + const hoverPoints = Array.from(wrap.querySelectorAll('.metric_chart_hover_points')).map((group) => ({ + group, + ring: group.querySelector('.metric_chart_hover_point_ring'), + point: group.querySelector('.metric_chart_hover_point'), + })); + + const width = Number(wrap.getAttribute('data-chart-width') || '220'); + const height = Number(wrap.getAttribute('data-chart-height') || '96'); + const min = Number(wrap.getAttribute('data-min')); + const max = Number(wrap.getAttribute('data-max')); + const padX = 4; + const padY = 6; + const xOf = (idx) => padX + (primaryLine.series.length <= 1 ? 0 : idx * (width - padX * 2) / (primaryLine.series.length - 1)); + const yOf = (v) => { + const lo = Number.isFinite(min) ? min : 0; + const hi = Number.isFinite(max) && max !== lo ? max : lo + 1; + const ratio = (v - lo) / (hi - lo); + return height - padY - ratio * (height - padY * 2); + }; + + wrap.addEventListener('mousemove', (ev) => { + const rect = svg.getBoundingClientRect(); + if (rect.width <= 0) return; + const rel = Math.min(Math.max(ev.clientX - rect.left, 0), rect.width); + const idx = primaryLine.series.length <= 1 ? 0 : Math.round(rel / rect.width * (primaryLine.series.length - 1)); + const point = primaryLine.series[Math.min(Math.max(idx, 0), primaryLine.series.length - 1)]; + if (!point) return; + const x = xOf(idx); + const y = yOf(point[1]); + hoverLine.setAttribute('x1', x.toFixed(2)); + hoverLine.setAttribute('x2', x.toFixed(2)); + hoverLine.setAttribute('y1', '0'); + hoverLine.setAttribute('y2', String(height)); + setMetricChartHoverVisible(hoverLine, true); + const valueHtml = []; + lines.forEach((line, lineIdx) => { + const lineIdxClamped = Math.min(Math.max(idx, 0), line.series.length - 1); + const linePoint = line.series[lineIdxClamped] || null; + const hover = hoverPoints[lineIdx] || null; + if (!linePoint || !hover?.ring || !hover?.point) { + if (hover?.ring) setMetricChartHoverVisible(hover.ring, false); + if (hover?.point) setMetricChartHoverVisible(hover.point, false); + return; + } + const lineY = yOf(linePoint[1]); + hover.ring.setAttribute('cx', x.toFixed(2)); + hover.ring.setAttribute('cy', lineY.toFixed(2)); + hover.point.setAttribute('cx', x.toFixed(2)); + hover.point.setAttribute('cy', lineY.toFixed(2)); + setMetricChartHoverVisible(hover.ring, true); + setMetricChartHoverVisible(hover.point, true); + valueHtml.push('' + escapeHtml((line.label || 'Series') + ': ' + fmtMetricValue(linePoint[1], line.unit)) + ''); + }); + const tip = metricChartTooltip(); + tip.innerHTML = + '' + escapeHtml(primaryLine.label || 'Metric') + '' + + '' + escapeHtml(formatMetricTs(point[0])) + '' + + valueHtml.join(''); + positionMetricChartTooltip(tip, ev.clientX, ev.clientY); + }); + wrap.addEventListener('mouseleave', () => { + hideMetricChartTooltip(); + }); + }); + } + + function buildSparklineSvg(series, unit, label, comparisonSeries, comparisonUnit, comparisonLabel, additionalSeries) { const width = 220; const height = 96; const padX = 4; const padY = 6; - const vals = series.map((p) => Number(Array.isArray(p) ? p[1] : NaN)).filter((v) => Number.isFinite(v)); - if (vals.length === 0) return '
N/A
'; - let min = Math.min(...vals); - let max = Math.max(...vals); - if (min === max) { - min -= 1; - max += 1; + const lines = buildMetricLineSeries(series, unit, label, comparisonSeries, comparisonUnit, comparisonLabel, additionalSeries); + const primaryLine = lines[0] || null; + if (!primaryLine || primaryLine.series.length === 0) return '
N/A
'; + const vals = lines.flatMap((line) => line.series.map((p) => p[1])); + const min = 0; + let max = Math.max(0, ...vals); + if (max === min) { + max = min + 1; } - const stepX = series.length <= 1 ? 0 : (width - padX * 2) / (series.length - 1); const yOf = (v) => { const ratio = (v - min) / (max - min); return height - padY - ratio * (height - padY * 2); }; - let path = ''; - let area = ''; - series.forEach((point, idx) => { - const value = Number(point[1]); - const x = padX + idx * stepX; - const y = yOf(value); - path += (idx === 0 ? 'M' : 'L') + x.toFixed(2) + ' ' + y.toFixed(2) + ' '; - area += (idx === 0 ? 'M' : 'L') + x.toFixed(2) + ' ' + y.toFixed(2) + ' '; - }); - const lastX = padX + (series.length - 1) * stepX; - area += 'L ' + lastX.toFixed(2) + ' ' + (height - padY).toFixed(2) + ' '; - area += 'L ' + padX.toFixed(2) + ' ' + (height - padY).toFixed(2) + ' Z'; + function buildPathData(inputSeries) { + let path = ''; + let area = ''; + const stepX = inputSeries.length <= 1 ? 0 : (width - padX * 2) / (inputSeries.length - 1); + inputSeries.forEach((point, idx) => { + const value = point[1]; + const x = padX + idx * stepX; + const y = yOf(value); + path += (idx === 0 ? 'M' : 'L') + x.toFixed(2) + ' ' + y.toFixed(2) + ' '; + area += (idx === 0 ? 'M' : 'L') + x.toFixed(2) + ' ' + y.toFixed(2) + ' '; + }); + const lastX = padX + (inputSeries.length - 1) * stepX; + area += 'L ' + lastX.toFixed(2) + ' ' + (height - padY).toFixed(2) + ' '; + area += 'L ' + padX.toFixed(2) + ' ' + (height - padY).toFixed(2) + ' Z'; + return { path, area }; + } + const linePaths = lines.map((line) => ({ line, pathData: buildPathData(line.series) })); + const legendHtml = lines.length > 1 + ? '
' + + lines.map((line) => + '' + escapeHtml(line.label || 'Series') + '' + ).join('') + + '
' + : ''; + const areaHtml = ''; + const pathsHtml = linePaths.slice().reverse().map(({ line, pathData }, revIdx) => { + const className = revIdx === linePaths.length - 1 ? 'metric_chart_path' : 'metric_chart_path metric_chart_path_secondary'; + return ''; + }).join(''); + const hoverHtml = lines.map((line, idx) => + '' + + '' + + '' + + '' + ).join(''); return ( + '
' + '' + - '' + - '' + - '' + areaHtml + + pathsHtml + + '' + + hoverHtml + + '' + + legendHtml + + '
' ); } let metricPanelState = { - selectedMetricKey: null, + selectedMetricKeys: [], window: '15m', - showAllMembers: false, + showAllMembersByMetric: {}, + expandedOwnersByMetric: {}, }; async function fetchJsonText(url) { @@ -1620,23 +1911,142 @@

Cluster {{ header.cluster_name }} ' + - '
' + (metric.label || metric.key || 'metric') + '
' + - '
' + fmtMetricValue(card.latest, metric.unit || '') + '
' + + '
' + escapeHtml(metric.label || metric.key || 'metric') + '
' + + '
' + latestText + '
' + '' + - buildSparklineSvg(card.aggregate_series || []) + buildSparklineSvg( + card.aggregate_series || [], + metric.unit || '', + metric.series_label || metric.label || metric.key || 'metric', + card.comparison_series || [], + comparisonMetric?.unit || metric.unit || '', + comparisonMetric?.series_label || comparisonMetric?.label || '', + card.additional_series || [] + ) ); } - function memberMetricCardHtml(row, unit) { + function selectedMetricKeySet() { + return new Set(Array.isArray(metricPanelState.selectedMetricKeys) ? metricPanelState.selectedMetricKeys : []); + } + + function isMetricSelected(metricKey) { + return metricKey !== '' && selectedMetricKeySet().has(metricKey); + } + + function toggleSelectedMetric(metricKey) { + const next = selectedMetricKeySet(); + if (next.has(metricKey)) { + next.delete(metricKey); + if (metricPanelState.showAllMembersByMetric && typeof metricPanelState.showAllMembersByMetric === 'object') { + delete metricPanelState.showAllMembersByMetric[metricKey]; + } + if (metricPanelState.expandedOwnersByMetric && typeof metricPanelState.expandedOwnersByMetric === 'object') { + delete metricPanelState.expandedOwnersByMetric[metricKey]; + } + } else { + next.add(metricKey); + } + metricPanelState.selectedMetricKeys = Array.from(next); + saveMetricStateToLocalStorage(metricPanelState); + } + + function memberMetricCardHtml(row, metric, comparisonMetric) { + const valueText = metricSeriesLatestText( + row.latest, + metric.unit || '', + comparisonMetric, + row.comparison_latest, + row.additional_series || [] + ); + return ( + '
' + + '
' + escapeHtml(row.member_id) + '
' + + '
' + escapeHtml(row.role) + ' | ' + escapeHtml(row.node_key) + '
' + + '
' + valueText + '
' + + '
' + + buildSparklineSvg( + row.series || [], + metric.unit || '', + metric.series_label || metric.label || row.member_id || '', + row.comparison_series || [], + comparisonMetric?.unit || metric.unit || '', + comparisonMetric?.series_label || comparisonMetric?.label || '', + row.additional_series || [] + ) + ); + } + + function ownerMetricMemberRowHtml(row, metric, comparisonMetric) { + const valueText = metricSeriesLatestText( + row.latest, + metric.unit || '', + comparisonMetric, + row.comparison_latest, + row.additional_series || [] + ); return ( + '
' + '
' + - '
' + row.member_id + '
' + - '
' + row.role + ' | ' + row.node_key + '
' + - '
' + fmtMetricValue(row.latest, unit || '') + '
' + + '
' + escapeHtml(row.member_id) + '
' + + '
' + escapeHtml(row.role) + ' | ' + escapeHtml(row.node_key) + '
' + + '
' + valueText + '
' + '
' + - buildSparklineSvg(row.series || []) + buildSparklineSvg( + row.series || [], + metric.unit || '', + metric.series_label || metric.label || row.member_id || '', + row.comparison_series || [], + comparisonMetric?.unit || metric.unit || '', + comparisonMetric?.series_label || comparisonMetric?.label || '', + row.additional_series || [] + ) + + '
' + ); + } + + function ownerMetricCardHtml(owner, metric, comparisonMetric) { + const members = Array.isArray(owner.members) ? owner.members : []; + const memberHtml = members.length === 0 + ? '
No member series
' + : members.map((row) => ownerMetricMemberRowHtml(row, metric, comparisonMetric)).join(''); + const valueText = metricSeriesLatestText( + owner.latest, + metric.unit || '', + comparisonMetric, + owner.comparison_latest, + owner.additional_series || [] + ); + return ( + '' + + '
' + + '
' + + '
' + escapeHtml(owner.owner_id || '') + '
' + + '
' + escapeHtml(owner.node_key || '') + ' | ' + members.length + ' members
' + + '
' + valueText + '
' + + '
' + + buildSparklineSvg( + owner.series || [], + metric.unit || '', + metric.series_label || metric.label || owner.owner_id || '', + owner.comparison_series || [], + comparisonMetric?.unit || metric.unit || '', + comparisonMetric?.series_label || comparisonMetric?.label || '', + owner.additional_series || [] + ) + + '
' + + '
' + + '
' + memberHtml + '
' ); } @@ -1661,70 +2071,180 @@

Cluster {{ header.cluster_name }} ' + + '
' + + '
' + + '
' + + 'Owner Metric' + + '' + + '
' + + '
' + + '
' + + '' + + '
' + + '
' + + '
' + + '
' + + '' + ); + } + + function bindMemberMetricBlockButtons(block, metricKey) { + const closeBtn = block.querySelector('[data-member-metric-close]'); + if (closeBtn && !closeBtn.__fluxonMetricCloseBound) { + closeBtn.__fluxonMetricCloseBound = true; + closeBtn.addEventListener('click', () => { + const next = selectedMetricKeySet(); + next.delete(metricKey); + metricPanelState.selectedMetricKeys = Array.from(next); + if (metricPanelState.showAllMembersByMetric && typeof metricPanelState.showAllMembersByMetric === 'object') { + delete metricPanelState.showAllMembersByMetric[metricKey]; + } + if (metricPanelState.expandedOwnersByMetric && typeof metricPanelState.expandedOwnersByMetric === 'object') { + delete metricPanelState.expandedOwnersByMetric[metricKey]; + } + saveMetricStateToLocalStorage(metricPanelState); + renderMetricCards(metricPanelState.lastPanelData || {}); + loadSelectedMemberMetrics(); + }); + } + const toggleBtn = block.querySelector('.member_metric_toggle_btn'); + if (toggleBtn && !toggleBtn.__fluxonMetricToggleBound) { + toggleBtn.__fluxonMetricToggleBound = true; + toggleBtn.addEventListener('click', () => { + setShowAllOwnersForMetric(metricKey, !showAllOwnersForMetric(metricKey)); + const cached = metricPanelState.memberMetricDataByKey && metricPanelState.memberMetricDataByKey[metricKey]; + if (cached) { + renderMemberMetric(metricKey, cached); + } else { + loadMemberMetric(metricKey); + } + }); + } + } + + function renderMemberMetric(metricKey, data) { const section = requireEl('member_metric_section'); - const title = requireEl('member_metric_title'); - const subtitle = requireEl('member_metric_subtitle'); - const status = requireEl('member_metric_status'); - const grid = requireEl('member_metric_grid'); - const toggleBtn = requireEl('member_metric_toggle_btn'); - if (!section || !title || !subtitle || !status || !grid || !toggleBtn) return; - const members = Array.isArray(data?.members) ? data.members : []; + const blocks = requireEl('member_metric_sections'); + if (!section || !blocks) return; + let block = blocks.querySelector('[data-patch-key="' + CSS.escape(metricKey) + '"]'); + if (!(block instanceof HTMLElement)) { + block = document.createElement('div'); + block.setAttribute('data-patch-key', metricKey); + blocks.appendChild(block); + } + if (block.__fluxonMemberBlockHtml !== metricDrilldownBlockHtml(metricKey)) { + const nextHtml = metricDrilldownBlockHtml(metricKey); + block.innerHTML = nextHtml; + block.__fluxonMemberBlockHtml = nextHtml; + } + bindMemberMetricBlockButtons(block, metricKey); + const title = block.querySelector('.member_metric_title'); + const subtitle = block.querySelector('.member_metric_subtitle'); + const status = block.querySelector('.member_metric_status'); + const warn = block.querySelector('.member_metric_warn'); + const grid = block.querySelector('.member_metric_grid'); + const toggleBtn = block.querySelector('.member_metric_toggle_btn'); + if (!title || !subtitle || !status || !warn || !grid || !toggleBtn) return; + const owners = Array.isArray(data?.owners) ? data.owners : []; const metric = data?.metric || {}; + const comparisonMetric = data?.comparison_metric || null; section.style.display = ''; - title.textContent = metric.label || metric.key || 'Member Metric'; - subtitle.textContent = (data?.range?.window || metricPanelState.window) + ' | ' + (members.length + ' members'); - renderMetricWarnings('member_metric_warn', data?.warnings || []); - if (members.length === 0) { - status.textContent = 'No members'; - grid.innerHTML = '
No member series
'; + title.textContent = metric.label || metric.key || 'Owner Metric'; + subtitle.textContent = (data?.range?.window || metricPanelState.window) + ' | ' + (owners.length + ' owners'); + const warnHtml = Array.isArray(data?.warnings) && data.warnings.length > 0 + ? data.warnings.slice(0, 6).map((w) => '
' + String(w).replaceAll('<', '<') + '
').join('') + : ''; + warn.innerHTML = warnHtml; + if (owners.length === 0) { + status.textContent = 'No owners'; + grid.innerHTML = '
No owner series
'; toggleBtn.style.display = 'none'; return; } status.textContent = ''; - const rows = visibleMemberRows(members); + const rows = visibleOwnerRows(metricKey, owners); + const expanded = expandedOwnerSet(metricKey); patchChildrenByKey( grid, rows, - (row) => [row.member_id || '', row.role || '', row.node_key || ''].join('|'), - () => document.createElement('div'), - (card, row, _idx, key) => { - card.className = 'member_metric_card'; - card.setAttribute('data-member-id', row.member_id || ''); - card.setAttribute('data-member-key', key); - const nextHtml = memberMetricCardHtml(row, metric.unit || ''); + (row) => row.owner_id || '', + () => document.createElement('details'), + (card, row) => { + const ownerId = row.owner_id || ''; + card.className = 'owner_metric_card'; + card.setAttribute('data-owner-id', ownerId); + const shouldOpen = expanded.has(ownerId); + card.open = shouldOpen; + const nextHtml = ownerMetricCardHtml(row, metric, comparisonMetric); if (card.__fluxonMemberMetricHtml !== nextHtml) { card.innerHTML = nextHtml; card.__fluxonMemberMetricHtml = nextHtml; } + card.ontoggle = () => { + setOwnerExpanded(metricKey, ownerId, card.open); + }; + wireMetricChartHover(card); } ); - if (members.length > 12) { + if (owners.length > 12) { toggleBtn.style.display = ''; - toggleBtn.textContent = metricPanelState.showAllMembers ? 'Show less' : 'Show all'; + toggleBtn.textContent = showAllOwnersForMetric(metricKey) ? 'Show less' : 'Show all owners'; } else { toggleBtn.style.display = 'none'; } @@ -1732,15 +2252,35 @@

Cluster {{ header.cluster_name }} Cluster {{ header.cluster_name }} {{ header.cluster_name }} {{ header.cluster_name }} {{ header.cluster_name }} {{ header.cluster_name }}

rdmagpucpu mem_used {{ row.node_key }} {{ row.member_id }} log{{ row.gpu_text }}{{ row.cpu_text }} {{ row.mem_used_text }}