From a175156de1f3be7bac3c4f5a106ae2df7157f614 Mon Sep 17 00:00:00 2001
From: ActivePeter <1020401660@qq.com>
Date: Tue, 30 Jun 2026 21:38:20 +0800
Subject: [PATCH 1/2] test
---
deployment/manual_dispatch_release.py | 12 +-
deployment/tests/test_gen_bare_deploy_bash.py | 137 ++
...nual_dispatch_release_test_rsc_contract.py | 27 +
.../test_start_test_bed_bootstrap_log.py | 148 +-
deployment/utils/proc_lifecycle_codegen.py | 48 +-
...2\346\235\277\344\270\216Metric_Trends.md" | 138 ++
fluxon_doc_cn/roadmap.md | 1 +
...7 - 3 - KV-RPC\346\216\245\345\217\243.md" | 23 +
fluxon_doc_en/roadmap.md | 1 +
.../User - 3 - KV and RPC Interface.md | 29 +
fluxon_rs/fluxon_cli/src/lib.rs | 36 +-
fluxon_rs/fluxon_cli/src/model.rs | 102 +
fluxon_rs/fluxon_cli/src/prom.rs | 157 +-
fluxon_rs/fluxon_cli/src/server.rs | 1782 +++++++++++++++--
fluxon_rs/fluxon_cli/src/web_renderer.rs | 1 +
.../fluxon_cli/templates/monitor_table.html | 1016 ++++++++--
.../fluxon_kv/src/client_seg_pool/mod.rs | 13 +-
fluxon_rs/fluxon_kv/src/config.rs | 18 +-
.../fluxon_kv/src/external_client_api/mod.rs | 3 +-
fluxon_rs/fluxon_kv/src/kv_test.rs | 18 +-
fluxon_rs/fluxon_kv/src/lib.rs | 4 +-
.../lease_manager_test.rs | 12 +-
.../fluxon_kv/src/metric_reporter/mod.rs | 25 +-
fluxon_rs/fluxon_kv/src/metrics.rs | 20 +-
fluxon_rs/fluxon_observability/src/keys.rs | 23 +
.../src/kv_metrics_actor.rs | 524 ++++-
fluxon_test_stack/start_test_bed.py | 182 +-
27 files changed, 4095 insertions(+), 405 deletions(-)
create mode 100644 "fluxon_doc_cn/design/observ_0_KV\350\265\204\346\272\220\347\233\221\346\216\247\351\235\242\346\235\277\344\270\216Metric_Trends.md"
diff --git a/deployment/manual_dispatch_release.py b/deployment/manual_dispatch_release.py
index 9f24380..e51b0d2 100644
--- a/deployment/manual_dispatch_release.py
+++ b/deployment/manual_dispatch_release.py
@@ -1063,7 +1063,17 @@ def _finalize_remote_staged_dir(
+ sh_quote(stage_dir_s)
+ " "
+ sh_quote(dst_dir_s)
- + " && rm -rf \"$backup\""
+ + " && "
+ + "if [ -n \"${backup:-}\" ] && [ -e \"$backup\" -o -L \"$backup\" ]; then "
+ + "rm -rf \"$backup\" || "
+ + "{ "
+ + "echo "
+ + sh_quote(
+ "[manual_dispatch_release] warning: failed to remove old staged backup; keep it for later cleanup: "
+ )
+ + " \"$backup\" 1>&2; "
+ + "}; "
+ + "fi"
)
),
)
diff --git a/deployment/tests/test_gen_bare_deploy_bash.py b/deployment/tests/test_gen_bare_deploy_bash.py
index 21f11a6..299ba1c 100644
--- a/deployment/tests/test_gen_bare_deploy_bash.py
+++ b/deployment/tests/test_gen_bare_deploy_bash.py
@@ -64,6 +64,7 @@ def _build_checks(selected_test_id: Optional[str]) -> List[Tuple[str, Callable[[
("bootstrap_start_reuses_already_present_selection", test_bootstrap_start_reuses_already_present_selection),
("bare_start_fails_when_child_exits_within_startup_window", test_bare_start_fails_when_child_exits_within_startup_window),
("pid_ready_check_requires_full_stable_window_after_first_child_observation", test_pid_ready_check_requires_full_stable_window_after_first_child_observation),
+ ("pid_ready_check_ignores_nested_selection_supervisor_children", test_pid_ready_check_ignores_nested_selection_supervisor_children),
("atomic_group_start_does_not_auto_stop_on_failure", test_atomic_group_start_does_not_auto_stop_on_failure),
("atomic_group_preserves_nested_heredoc_terminator", test_atomic_group_preserves_nested_heredoc_terminator),
("atomic_group_stop_script_is_shell_valid", test_atomic_group_stop_script_is_shell_valid),
@@ -812,6 +813,142 @@ def _shutdown(_signum, _frame):
print("PASS: test_pid_ready_check_requires_full_stable_window_after_first_child_observation")
+def test_pid_ready_check_ignores_nested_selection_supervisor_children() -> None:
+ proc_lifecycle = _load_python_module(
+ module_name="test_proc_lifecycle_codegen_nested_supervisor_runtime",
+ path=DEPLOYMENT_DIR / "utils" / "proc_lifecycle_codegen.py",
+ )
+ helpers = proc_lifecycle.render_bash_proc_lifecycle_funcs_pid_tree(
+ timeouts=proc_lifecycle.StopTimeouts(term_seconds=60, kill_seconds=10, supersede_seconds=30)
+ )
+ with tempfile.TemporaryDirectory(prefix="test_proc_lifecycle_nested_supervisor_") as td:
+ tmpdir = Path(td)
+ shell_script = tmpdir / "probe.sh"
+ root_script = tmpdir / "root_supervisor.py"
+ child_script = tmpdir / "real_child.py"
+ nested_supervisor_script = tmpdir / "selection_supervisor.py"
+
+ child_script.write_text(
+ textwrap.dedent(
+ """
+ #!/usr/bin/env python3
+ import signal
+ import time
+
+ def _shutdown(_signum, _frame):
+ raise SystemExit(0)
+
+ signal.signal(signal.SIGTERM, _shutdown)
+ signal.signal(signal.SIGINT, _shutdown)
+
+ while True:
+ time.sleep(0.2)
+ """
+ ).strip()
+ + "\n",
+ encoding="utf-8",
+ )
+ nested_supervisor_script.write_text(
+ textwrap.dedent(
+ """
+ #!/usr/bin/env python3
+ import signal
+ import time
+
+ def _shutdown(_signum, _frame):
+ raise SystemExit(0)
+
+ signal.signal(signal.SIGTERM, _shutdown)
+ signal.signal(signal.SIGINT, _shutdown)
+
+ while True:
+ time.sleep(0.2)
+ """
+ ).strip()
+ + "\n",
+ encoding="utf-8",
+ )
+ root_script.write_text(
+ textwrap.dedent(
+ f"""
+ #!/usr/bin/env python3
+ import signal
+ import subprocess
+ import sys
+ import time
+ from pathlib import Path
+
+ procs = []
+
+ def _shutdown(_signum, _frame):
+ for proc in procs:
+ if proc.poll() is None:
+ proc.terminate()
+ deadline = time.time() + 5
+ for proc in procs:
+ if proc.poll() is None:
+ try:
+ proc.wait(timeout=max(0.0, deadline - time.time()))
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ raise SystemExit(0)
+
+ signal.signal(signal.SIGTERM, _shutdown)
+ signal.signal(signal.SIGINT, _shutdown)
+
+ procs.append(subprocess.Popen([sys.executable, str(Path({str(child_script)!r}))]))
+ procs.append(subprocess.Popen([sys.executable, str(Path({str(nested_supervisor_script)!r}))]))
+ while True:
+ for proc in procs:
+ if proc.poll() is not None:
+ raise SystemExit(proc.returncode or 0)
+ time.sleep(0.2)
+ """
+ ).strip()
+ + "\n",
+ encoding="utf-8",
+ )
+
+ shell_script.write_text(
+ textwrap.dedent(
+ f"""\
+ #!/usr/bin/env bash
+ set -euo pipefail
+ {helpers}
+ python3 {shlex.quote(str(root_script))} &
+ root_pid="$!"
+ startup_deadline_seconds=6
+ if ! wait_service_probably_ready_pid_tree "svc_plain" "$root_pid" 4 "$startup_deadline_seconds" "[test]"; then
+ wait_rc="$?"
+ kill "$root_pid" >/dev/null 2>&1 || true
+ wait "$root_pid" >/dev/null 2>&1 || true
+ exit "$wait_rc"
+ fi
+ kill "$root_pid" >/dev/null 2>&1 || true
+ wait "$root_pid" >/dev/null 2>&1 || true
+ exit 0
+ """
+ ),
+ encoding="utf-8",
+ )
+ shell_script.chmod(0o755)
+
+ result = subprocess.run(
+ ["bash", str(shell_script)],
+ check=False,
+ capture_output=True,
+ text=True,
+ cwd=str(DEPLOYMENT_DIR.parent),
+ timeout=20,
+ )
+ assert result.returncode == 0, (
+ f"expected startup gate success rc={result.returncode} stdout={result.stdout!r} stderr={result.stderr!r}"
+ )
+ assert "multiple direct child pids" not in result.stdout, result.stdout
+ assert "probable-ready: ok" in result.stdout, result.stdout
+ print("PASS: test_pid_ready_check_ignores_nested_selection_supervisor_children")
+
+
def test_atomic_group_preserves_nested_heredoc_terminator() -> None:
with tempfile.TemporaryDirectory(prefix="test_gen_bare_deploy_bash_atomic_heredoc_") as td:
tmpdir = Path(td)
diff --git a/deployment/tests/test_manual_dispatch_release_test_rsc_contract.py b/deployment/tests/test_manual_dispatch_release_test_rsc_contract.py
index d8148f4..1e2ca67 100644
--- a/deployment/tests/test_manual_dispatch_release_test_rsc_contract.py
+++ b/deployment/tests/test_manual_dispatch_release_test_rsc_contract.py
@@ -30,6 +30,33 @@ def _load_module():
class TestManualDispatchReleaseTestRscContract(unittest.TestCase):
+ def test_finalize_remote_staged_dir_keeps_backup_when_cleanup_fails(self) -> None:
+ captured: list[tuple[str | None, str]] = []
+
+ def _fake_check_call_bash_with_optional_password(*, password: str | None, cmd: str) -> None:
+ captured.append((password, cmd))
+
+ with mock.patch.object(
+ _DISPATCH,
+ "_check_call_bash_with_optional_password",
+ side_effect=_fake_check_call_bash_with_optional_password,
+ ):
+ _DISPATCH._finalize_remote_staged_dir(
+ stage_dir_s="/remote/.fluxon_release.stage.abc123",
+ dst_dir_s="/remote/fluxon_release",
+ ssh_user="root",
+ ip="203.0.113.7",
+ ssh_port=30245,
+ ssh_password=None,
+ )
+
+ self.assertEqual(len(captured), 1)
+ password, cmd = captured[0]
+ self.assertIsNone(password)
+ self.assertIn('mv \'"\'"\'/remote/.fluxon_release.stage.abc123\'"\'"\' \'"\'"\'/remote/fluxon_release\'"\'"\'', cmd)
+ self.assertIn('rm -rf "$backup" || {', cmd)
+ self.assertIn("[manual_dispatch_release] warning: failed to remove old staged backup; keep it for later cleanup:", cmd)
+
def test_deploy_and_profiles_dispatches_test_rsc_tree(self) -> None:
with tempfile.TemporaryDirectory() as td:
release_dir = Path(td)
diff --git a/deployment/tests/test_start_test_bed_bootstrap_log.py b/deployment/tests/test_start_test_bed_bootstrap_log.py
index 2bd6b00..01fffc6 100644
--- a/deployment/tests/test_start_test_bed_bootstrap_log.py
+++ b/deployment/tests/test_start_test_bed_bootstrap_log.py
@@ -353,6 +353,41 @@ def test_parse_cluster_nodes_accepts_local_execution_mode() -> None:
print("PASS: test_parse_cluster_nodes_accepts_local_execution_mode")
+def test_resolve_local_node_cfg_accepts_remote_only_controller_host() -> None:
+ module = _load_start_test_bed_module()
+ cluster_nodes = {
+ "gpu-a": {
+ "hostname": "gpu-a",
+ "ip": "10.233.111.42",
+ "hostworkdir": "/srv/gpu-a",
+ "execution_mode": "ssh",
+ "ssh_host": "116.238.240.2",
+ "ssh_user": "root",
+ "ssh_port": 30245,
+ },
+ "gpu-b": {
+ "hostname": "gpu-b",
+ "ip": "10.233.114.86",
+ "hostworkdir": "/srv/gpu-b",
+ "execution_mode": "ssh",
+ "ssh_host": "116.238.240.2",
+ "ssh_user": "root",
+ "ssh_port": 31408,
+ },
+ }
+ original_check_output = module.subprocess.check_output
+ try:
+ module.subprocess.check_output = lambda *args, **kwargs: "infra44-ThinkStation-PX\n"
+ resolved = module._resolve_local_node_cfg(
+ cluster_nodes,
+ controller_url="http://10.233.111.42:53180/r/ops/fluxon_gpu_monitor_remote",
+ )
+ finally:
+ module.subprocess.check_output = original_check_output
+ assert resolved is cluster_nodes["gpu-a"], resolved
+ print("PASS: test_resolve_local_node_cfg_accepts_remote_only_controller_host")
+
+
def test_run_bare_waves_treats_local_execution_mode_node_as_local() -> None:
module = _load_start_test_bed_module()
cluster_nodes = {
@@ -444,6 +479,99 @@ def test_run_bare_waves_treats_local_execution_mode_node_as_local() -> None:
print("PASS: test_run_bare_waves_treats_local_execution_mode_node_as_local")
+def test_run_bare_waves_stops_legacy_plain_services_before_atomic_launch() -> None:
+ module = _load_start_test_bed_module()
+ cluster_nodes = {
+ "logic-a": {
+ "hostname": "logic-a",
+ "ip": "127.0.0.1",
+ "hostworkdir": "/tmp/logic-a",
+ "execution_mode": "local",
+ "ssh_user": "tester",
+ "ssh_port": 22,
+ },
+ }
+ deployconf = {
+ "name_prefix": "fluxon-testbed",
+ "service": {
+ "master": {"node_bind": {"node": ["logic-a"]}},
+ "owner": {"node_bind": {"node": ["logic-a"]}},
+ "ops_controller": {"node_bind": {"node": ["logic-a"]}},
+ "ops_agent": {"node_bind": {"node": ["logic-a"]}},
+ },
+ "atomic_groups": {
+ "fluxon_core_controller": {
+ "phase": 1,
+ "nodes": ["logic-a"],
+ "services": ["master", "owner", "ops_controller", "ops_agent"],
+ }
+ },
+ }
+ stop_calls: list[str] = []
+ spawn_calls: list[str] = []
+ original_run_local_stop = module._run_local_stop
+ original_spawn_local = module._spawn_local_start
+ original_join = module._join_bare_launch
+ original_collect = module._collect_bare_runtime_statuses
+ original_bare_script_name = module._selection_bare_script_name
+ original_service_names = module._selection_service_names_for_target_node
+ original_log_path = module._bare_wave_bootstrap_log_path
+ try:
+ module._run_local_stop = lambda *, local_node_cfg, service_name: stop_calls.append(
+ f"{local_node_cfg['hostname']}:{service_name}"
+ )
+ module._spawn_local_start = lambda **kwargs: spawn_calls.append(kwargs["selection_name"]) or {
+ "mode": "local",
+ "node_name": kwargs["local_node_cfg"]["hostname"],
+ "selection_name": kwargs["selection_name"],
+ "bare_script_name": kwargs["bare_script_name"],
+ "bootstrap_log_path": kwargs["bootstrap_log_path"],
+ "expected_service_names": kwargs["expected_service_names"],
+ "launch_error": None,
+ "launcher_rc": 0,
+ "runtime_statuses": [],
+ }
+ module._join_bare_launch = lambda result: None
+ module._collect_bare_runtime_statuses = lambda **kwargs: []
+ module._selection_bare_script_name = lambda **kwargs: "fluxon_core_controller"
+ module._selection_service_names_for_target_node = (
+ lambda **kwargs: ["master", "owner", "ops_controller", "ops_agent"]
+ )
+ module._bare_wave_bootstrap_log_path = (
+ lambda **kwargs: Path("/tmp") / f"{kwargs['node_name']}_{kwargs['selection_name']}.log"
+ )
+ module._run_bare_waves(
+ workdir=Path("/tmp"),
+ deployconf=deployconf,
+ cluster_nodes=cluster_nodes,
+ local_node_cfg=cluster_nodes["logic-a"],
+ waves=[
+ {
+ "launches": [
+ {"node": "logic-a", "selection_name": "fluxon_core_controller"},
+ ]
+ }
+ ],
+ bootstrap_bare_services=set(),
+ )
+ finally:
+ module._run_local_stop = original_run_local_stop
+ module._spawn_local_start = original_spawn_local
+ module._join_bare_launch = original_join
+ module._collect_bare_runtime_statuses = original_collect
+ module._selection_bare_script_name = original_bare_script_name
+ module._selection_service_names_for_target_node = original_service_names
+ module._bare_wave_bootstrap_log_path = original_log_path
+ assert stop_calls == [
+ "logic-a:master",
+ "logic-a:owner",
+ "logic-a:ops_controller",
+ "logic-a:ops_agent",
+ ], stop_calls
+ assert spawn_calls == ["fluxon_core_controller"], spawn_calls
+ print("PASS: test_run_bare_waves_stops_legacy_plain_services_before_atomic_launch")
+
+
def test_local_coverage_bootstrap_excludes_duplicate_local_control_plane_selection() -> None:
module = _load_start_test_bed_module()
deployconf = {
@@ -1094,7 +1222,7 @@ def test_bare_then_apply_success_path_does_not_run_post_apply_stop() -> None:
encoding="utf-8",
)
- original_read_local_release_manifest_sha256 = module._read_local_release_manifest_sha256
+ original_read_release_manifest_sha256 = module._read_release_manifest_sha256
original_with_release_manifest_sha256_env = module._with_release_manifest_sha256_env
original_generate_daemonset_artifacts = module._generate_daemonset_artifacts
original_refresh_cluster_bare_deploy_scripts = module._refresh_cluster_bare_deploy_scripts
@@ -1120,7 +1248,7 @@ def test_bare_then_apply_success_path_does_not_run_post_apply_stop() -> None:
call_sequence: list[str] = []
try:
- module._read_local_release_manifest_sha256 = lambda **_: "sha256"
+ module._read_release_manifest_sha256 = lambda **_: "sha256"
module._with_release_manifest_sha256_env = lambda **kwargs: kwargs["deployconf"]
module._generate_daemonset_artifacts = lambda **_: None
module._refresh_cluster_bare_deploy_scripts = lambda **_: None
@@ -1194,7 +1322,7 @@ def _fail_remote_stop(*, node_name: str, service_name: str, **_: object) -> None
finally:
sys.argv = original_argv
finally:
- module._read_local_release_manifest_sha256 = original_read_local_release_manifest_sha256
+ module._read_release_manifest_sha256 = original_read_release_manifest_sha256
module._with_release_manifest_sha256_env = original_with_release_manifest_sha256_env
module._generate_daemonset_artifacts = original_generate_daemonset_artifacts
module._refresh_cluster_bare_deploy_scripts = original_refresh_cluster_bare_deploy_scripts
@@ -1321,7 +1449,7 @@ def test_bare_only_stops_after_controller_ready() -> None:
encoding="utf-8",
)
- original_read_local_release_manifest_sha256 = module._read_local_release_manifest_sha256
+ original_read_release_manifest_sha256 = module._read_release_manifest_sha256
original_with_release_manifest_sha256_env = module._with_release_manifest_sha256_env
original_generate_daemonset_artifacts = module._generate_daemonset_artifacts
original_refresh_cluster_bare_deploy_scripts = module._refresh_cluster_bare_deploy_scripts
@@ -1339,7 +1467,7 @@ def test_bare_only_stops_after_controller_ready() -> None:
run_calls: list[tuple[str, object]] = []
try:
- module._read_local_release_manifest_sha256 = lambda **_: "sha256"
+ module._read_release_manifest_sha256 = lambda **_: "sha256"
module._with_release_manifest_sha256_env = lambda **kwargs: kwargs["deployconf"]
module._generate_daemonset_artifacts = lambda **_: run_calls.append(("generate", None))
module._refresh_cluster_bare_deploy_scripts = lambda **_: run_calls.append(("refresh_bare", None))
@@ -1390,7 +1518,7 @@ def _fail_deploy_or_wait(**_: object) -> dict[str, str]:
finally:
sys.argv = original_argv
finally:
- module._read_local_release_manifest_sha256 = original_read_local_release_manifest_sha256
+ module._read_release_manifest_sha256 = original_read_release_manifest_sha256
module._with_release_manifest_sha256_env = original_with_release_manifest_sha256_env
module._generate_daemonset_artifacts = original_generate_daemonset_artifacts
module._refresh_cluster_bare_deploy_scripts = original_refresh_cluster_bare_deploy_scripts
@@ -1518,11 +1646,19 @@ def main() -> int:
"run_bare_waves_treats_local_execution_mode_node_as_local",
test_run_bare_waves_treats_local_execution_mode_node_as_local,
),
+ (
+ "run_bare_waves_stops_legacy_plain_services_before_atomic_launch",
+ test_run_bare_waves_stops_legacy_plain_services_before_atomic_launch,
+ ),
(
"local_coverage_bootstrap_excludes_duplicate_local_control_plane_selection",
test_local_coverage_bootstrap_excludes_duplicate_local_control_plane_selection,
),
("parse_test_runner_ui_config_resolves_paths", test_parse_test_runner_ui_config_resolves_paths),
+ (
+ "resolve_local_node_cfg_accepts_remote_only_controller_host",
+ test_resolve_local_node_cfg_accepts_remote_only_controller_host,
+ ),
(
"normalize_bootstrap_deployconf_strips_legacy_master_p2p_listen_port",
test_normalize_bootstrap_deployconf_strips_legacy_master_p2p_listen_port,
diff --git a/deployment/utils/proc_lifecycle_codegen.py b/deployment/utils/proc_lifecycle_codegen.py
index 197829f..d93ece6 100644
--- a/deployment/utils/proc_lifecycle_codegen.py
+++ b/deployment/utils/proc_lifecycle_codegen.py
@@ -164,6 +164,52 @@ def render_bash_proc_lifecycle_funcs_pid_tree(*, timeouts: StopTimeouts) -> str:
'
}}
+_cmdline_contains_selection_supervisor_entry() {{
+ pid="$1"
+ if [[ ! "$pid" =~ ^[0-9]+$ ]]; then
+ return 1
+ fi
+ cmdline_path="/proc/$pid/cmdline"
+ if [ ! -r "$cmdline_path" ]; then
+ return 1
+ fi
+ tr '\\0' '\\n' < "$cmdline_path" 2>/dev/null | awk '
+ {{
+ if ($0 ~ /(^|\\/)selection_supervisor\\.py$/) {{
+ found=1;
+ }}
+ }}
+ END {{
+ exit(found ? 0 : 1);
+ }}
+ '
+}}
+
+_pid_tree_ready_candidate_child_pids() {{
+ root_pid="$1"
+ if [[ ! "$root_pid" =~ ^[0-9]+$ ]]; then
+ return 1
+ fi
+ if ! _pid_exists "$root_pid"; then
+ return 1
+ fi
+
+ direct_child_pids="$(_pid_tree_direct_child_pids "$root_pid" 2>/dev/null || true)"
+ if [ -z "$direct_child_pids" ]; then
+ echo ""
+ return 0
+ fi
+
+ ready_child_pids=""
+ for child_pid in $direct_child_pids; do
+ if _cmdline_contains_selection_supervisor_entry "$child_pid"; then
+ continue
+ fi
+ ready_child_pids="$ready_child_pids $child_pid"
+ done
+ echo "${{ready_child_pids# }}"
+}}
+
_now_monotonic_ms() {{
python3 - <<'__FLUXON_MONOTONIC_MS__'
import time
@@ -206,7 +252,7 @@ def render_bash_proc_lifecycle_funcs_pid_tree(*, timeouts: StopTimeouts) -> str:
return 1
fi
- current_child_pids="$(_pid_tree_direct_child_pids "$root_pid" 2>/dev/null || true)"
+ current_child_pids="$(_pid_tree_ready_candidate_child_pids "$root_pid" 2>/dev/null || true)"
current_child_pid=""
if [ -n "$current_child_pids" ]; then
set -- $current_child_pids
diff --git "a/fluxon_doc_cn/design/observ_0_KV\350\265\204\346\272\220\347\233\221\346\216\247\351\235\242\346\235\277\344\270\216Metric_Trends.md" "b/fluxon_doc_cn/design/observ_0_KV\350\265\204\346\272\220\347\233\221\346\216\247\351\235\242\346\235\277\344\270\216Metric_Trends.md"
new file mode 100644
index 0000000..9a6d545
--- /dev/null
+++ "b/fluxon_doc_cn/design/observ_0_KV\350\265\204\346\272\220\347\233\221\346\216\247\351\235\242\346\235\277\344\270\216Metric_Trends.md"
@@ -0,0 +1,138 @@
+# Observ 设计 0 - 监控面板 Metric Trends
+
+## 0. 总起
+
+本文只定义 KV 监控面板里的 `Metric Trends` 区域。对应页面模板在 `fluxon_rs/fluxon_cli/templates/monitor_table.html`,页面上显示为:
+
+```html
+Metric Trends (KV aggregate + member drill-down)
+```
+
+稳定结论:
+
+- `Metric Trends` 是 KV 面板里的趋势图区域,负责展示聚合指标卡片和 per owner 展开视图。
+- 指标卡片支持多线折线图。容量型指标必须把用量和总量放在同一张图里。
+- 用户可以多选指标卡片,同时展开多个 owner drilldown block。
+- 折线 hover 时必须显示 tooltip、垂直辅助线和折线上的对齐辅助点。
+- 周期刷新必须复用已有 DOM,避免页面跳动、展开状态丢失和 hover 中断。
+
+## 1. 区域结构
+
+`Metric Trends` 区域由三层组成:
+
+| 层级 | 页面元素 | 职责 |
+| --- | --- | --- |
+| 顶部控制 | window selector、role filters | 控制趋势窗口和可见成员角色 |
+| 聚合卡片 | `#metric_grid` 下的 `.metric_card` | 展示每个指标的最新值和聚合曲线 |
+| 展开视图 | `#member_metric_sections` 下的 owner blocks | 展示选中指标的 per owner 曲线和成员行 |
+
+用户进入 KV 面板时先看到聚合卡片。点击一个指标卡片后,该指标会进入选中集合,并在下方生成一个 owner drilldown block;再次点击同一卡片会关闭该指标的展开视图。
+
+## 2. 指标卡片
+
+每个 `.metric_card` 展示三类信息:
+
+- 指标名,例如 `Node CPU`、`Node Memory`、`GPU Memory`。
+- 最新值。多线指标用 `主线 / 对比线 / 附加线` 的顺序展示。
+- 一张 sparkline 折线图。
+
+当前卡片按以下语义渲染:
+
+| 指标 | 曲线要求 |
+| --- | --- |
+| `Node CPU` | `Used`、`Capacity`、`Process CPU` 三条线 |
+| `Node Memory` | `Used`、`Total`、`Process RSS` 三条线 |
+| `Segment Usage` | `Used`、`Capacity` 两条线 |
+| `GPU Memory` | `Used`、`Total` 两条线 |
+| `Process Network` | `TX`、`RX` 两条线 |
+| `Node Network` | `TX`、`RX` 两条线 |
+| `Cache Hit %` | 一条命中率曲线,选择后可看 per owner 命中率 |
+| 其他单值指标 | 一条主曲线 |
+
+CPU 指标按核堆叠展示,GPU 百分比指标按设备聚合展示;这类按资源实例求和的百分比聚合值都可以超过 `100%`。折线图的 Y 轴起点固定为 `0`,避免资源曲线因为局部波动被视觉放大。
+
+## 3. 多线折线图
+
+折线图由 `buildSparklineSvg(...)` 生成。输入统一归并为 `data-lines`:
+
+```text
+primary series
+comparison series
+additional series...
+```
+
+渲染规则:
+
+- 第一条线是主线。
+- comparison line 用于容量、总量或反方向指标。
+- additional line 用于同图补充进程资源,例如 `Process CPU`、`Process RSS`。
+- 多线图必须显示 legend,legend 文案使用 `series_label`。
+- 没有有效 series 时显示 `N/A`,不生成空白 SVG。
+
+这个规则保证 `Node Memory` 这类指标能在一张图里同时看节点用量、节点总量和 Fluxon 进程 RSS。
+
+## 4. Hover 交互
+
+鼠标悬浮在折线图上时,UI 必须显示:
+
+- 垂直 hover 辅助线:`.metric_chart_hover_line`
+- 每条曲线的对齐辅助点:`.metric_chart_hover_point_ring` 和 `.metric_chart_hover_point`
+- tooltip:时间戳和当前 x 位置上每条曲线的格式化数值
+
+辅助点和 tooltip 都从同一份 `data-lines` 取值。这样点位、颜色、legend 和 tooltip 数值保持一致。
+
+离开图表时,tooltip 和所有辅助点隐藏。
+
+## 5. 多选展开
+
+`Metric Trends` 支持同时展开多个指标。状态保存在:
+
+```text
+selectedMetricKeys: string[]
+```
+
+交互规则:
+
+- 点击未选中的 metric card:加入 `selectedMetricKeys`,创建对应 owner drilldown block。
+- 点击已选中的 metric card:从 `selectedMetricKeys` 删除,同时删除该指标的 owner 展开状态。
+- 多个选中指标按 `selectedMetricKeys` 顺序逐个渲染,不互相覆盖。
+
+这意味着用户可以同时查看 `Node CPU`、`Node Memory`、`Cache Hit %` 等多个指标的 per owner 视图。
+
+## 6. Owner Drilldown
+
+每个选中指标对应一个 owner drilldown block。block 里每个 owner 用 `` 渲染。
+
+owner card 的内容:
+
+- owner id 和 node key。
+- owner 汇总最新值。
+- owner 汇总折线图。
+- 展开后的成员行。
+
+owner 展开状态按指标分别保存:
+
+```text
+expandedOwnersByMetric[metric_key] = [owner_id, ...]
+```
+
+因此,同一个 owner 在 `Node CPU` 里展开,不会强制影响 `Node Memory` 里的展开状态。
+
+## 7. 刷新稳定性
+
+`Metric Trends` 会随页面周期刷新。刷新时必须满足:
+
+- 不清空整个 `#metric_grid` 后重建。
+- 不清空整个 owner drilldown section 后重建。
+- 已存在卡片按 `data-patch-key` 复用 DOM。
+- 已展开的 owner 继续保持展开。
+- 初次加载后不反复写回 `Loading metric panel...`,避免高度跳动。
+
+当前实现用 `patchChildrenByKey(...)` 复用 metric card 和 owner card。刷新只更新必要 HTML,保留卡片节点本身。
+
+## 8. 关键结论
+
+- `Metric Trends` 的核心 contract 是“多线趋势 + 多选 owner drilldown + 稳定刷新”。
+- `Node CPU`、`Node Memory`、`Segment Usage`、`GPU Memory`、网络指标必须保持多线展示;CPU/GPU 这类资源实例聚合百分比允许超过 `100%`。
+- hover 辅助点是趋势图可读性的一部分,不能只显示 tooltip。
+- 多选展开状态和 owner 展开状态必须分别持久化,避免用户刷新或轮询后丢失上下文。
diff --git a/fluxon_doc_cn/roadmap.md b/fluxon_doc_cn/roadmap.md
index ddbd735..6af4d6e 100644
--- a/fluxon_doc_cn/roadmap.md
+++ b/fluxon_doc_cn/roadmap.md
@@ -11,6 +11,7 @@
### 0.2.1
- [PERF] 优化 `RPC`、`KV`、`FS` 性能
+- [TOOL] KV 监控面板支持 GPU 资源列和趋势曲线
- [MQ] 修复 `MQ` 控制面可扩展性问题
- [ETCD] 修复 `etcd` 前缀获取时 `gRPC` 限制大小问题
- [OSS] 完善开源相关工作
diff --git "a/fluxon_doc_cn/user_doc/\347\224\250\346\210\267 - 3 - KV-RPC\346\216\245\345\217\243.md" "b/fluxon_doc_cn/user_doc/\347\224\250\346\210\267 - 3 - KV-RPC\346\216\245\345\217\243.md"
index 9a8c8e1..fea34dc 100644
--- "a/fluxon_doc_cn/user_doc/\347\224\250\346\210\267 - 3 - KV-RPC\346\216\245\345\217\243.md"
+++ "b/fluxon_doc_cn/user_doc/\347\224\250\346\210\267 - 3 - KV-RPC\346\216\245\345\217\243.md"
@@ -200,6 +200,29 @@ master_ui:
http://:18080/view?cluster_name=demo-kv-cluster&member_kind=kv
```
+KV Web UI 会直接展示 KV 成员表和 `Metric Trends` 曲线面板。成员表的 `gpu` 列展示每个采样节点上 GPU 的显存、利用率、温度和 GPU 进程摘要;`Metric Trends` 里可以查看 `GPU Memory Used`、`GPU Memory Total`、`GPU Util %`、`GPU Temp`、`GPU Proc Count`、`GPU Proc SM %` 和 `GPU Proc Mem %` 的可视化曲线。
+
+GPU 监控使用 Fluxon 的标准监控链路:
+
+```text
+owner/master 系统指标采样
+ -> nvidia-smi
+ -> Prometheus remote-write
+ -> Greptime / Prometheus query API
+ -> KV Web UI
+```
+
+使用 GPU 曲线需要满足这些前置条件:
+
+| 项 | 要求 |
+| --- | --- |
+| GPU 可见性 | 采样进程所在机器能执行 `nvidia-smi`,并能看到目标 GPU。 |
+| 监控写入 | `monitoring.prom_remote_write_url` 指向可写的 Greptime/Prometheus remote-write 接口。 |
+| 监控查询 | `monitoring.prometheus_base_url` 指向可查询的 Prometheus API,例如 Greptime 的 `/v1/prometheus`。 |
+| 采样范围 | 当前 GPU 指标跟随系统指标采样角色,覆盖 master 和 owner/client 进程可见的节点资源;external client 不重复采样系统资源。 |
+
+如果机器没有 GPU、没有 `nvidia-smi`,或进程没有访问 GPU 的权限,KV 进程仍会继续运行;对应成员的 `gpu` 列显示 `N/A`,GPU 趋势卡片没有样本。
+
`owner` 把共享内存池和 `shared.json` 准备好之后,再运行下面的业务最小示例。
### 生命周期与调用流程(Call Flow)
diff --git a/fluxon_doc_en/roadmap.md b/fluxon_doc_en/roadmap.md
index d2f79c6..e23f233 100644
--- a/fluxon_doc_en/roadmap.md
+++ b/fluxon_doc_en/roadmap.md
@@ -11,6 +11,7 @@
### 0.2.1
- [PERF] Optimize `RPC`, `KV`, and `FS` performance
+- [TOOL] Add GPU resource columns and trend curves to the KV monitoring panel
- [MQ] Fix MQ control-plane scalability issues
- [ETCD] Fix the gRPC size limit issue when listing etcd prefixes
- [OSS] Improve open-source readiness and related workflows
diff --git a/fluxon_doc_en/user_doc/User - 3 - KV and RPC Interface.md b/fluxon_doc_en/user_doc/User - 3 - KV and RPC Interface.md
index b3f003d..65b6213 100644
--- a/fluxon_doc_en/user_doc/User - 3 - KV and RPC Interface.md
+++ b/fluxon_doc_en/user_doc/User - 3 - KV and RPC Interface.md
@@ -160,6 +160,35 @@ python3 examples/start_master_owner.py
python3 examples/start_master_owner.py --without-master
```
+The `master_ui` block starts the KV Web UI from the master process. With the example above, open:
+
+```text
+http://:18080/view?cluster_name=demo-kv-cluster&member_kind=kv
+```
+
+The KV Web UI shows the KV member table and the `Metric Trends` chart panel. The table's `gpu` column summarizes GPU memory, utilization, temperature, and GPU process activity for each sampled node. `Metric Trends` can plot `GPU Memory Used`, `GPU Memory Total`, `GPU Util %`, `GPU Temp`, `GPU Proc Count`, `GPU Proc SM %`, and `GPU Proc Mem %`.
+
+GPU monitoring uses the standard Fluxon observability path:
+
+```text
+owner/master system-metric sampling
+ -> nvidia-smi
+ -> Prometheus remote-write
+ -> Greptime / Prometheus query API
+ -> KV Web UI
+```
+
+GPU curves require:
+
+| Item | Requirement |
+| --- | --- |
+| GPU visibility | The sampling process can run `nvidia-smi` and can see the target GPUs. |
+| Metric write path | `monitoring.prom_remote_write_url` points to a writable Greptime/Prometheus remote-write endpoint. |
+| Metric query path | `monitoring.prometheus_base_url` points to a queryable Prometheus API, for example Greptime's `/v1/prometheus`. |
+| Sampling scope | GPU metrics follow the system-metric sampling roles: master and owner/client processes report visible node resources; external clients do not duplicate system-resource sampling. |
+
+If a machine has no GPU, lacks `nvidia-smi`, or the process cannot access the GPU, the KV process keeps running. The member table shows `N/A` for `gpu`, and GPU trend cards have no samples.
+
### Lifecycle and Call Flow
```text
diff --git a/fluxon_rs/fluxon_cli/src/lib.rs b/fluxon_rs/fluxon_cli/src/lib.rs
index 11c9fcd..2adb85d 100644
--- a/fluxon_rs/fluxon_cli/src/lib.rs
+++ b/fluxon_rs/fluxon_cli/src/lib.rs
@@ -30,7 +30,7 @@ pub const OPS_PANEL_SERVICE_NAME: &str = "ops";
use crate::config::{AVAILABLE_MEMBER_KINDS, MemberKind, MonitorConfig};
use crate::model::{
- ClusterMember, ClusterSnapshot, ClustersResponse, MemberRdmaDeviceSnapshot,
+ ClusterMember, ClusterSnapshot, ClustersResponse, GpuSnapshot, MemberRdmaDeviceSnapshot,
MemberRdmaPortSnapshot, MemberSnapshot, NodeSnapshot, RdmaNetdevRateSnapshot,
TransferEngineEdge,
};
@@ -1359,6 +1359,7 @@ async fn build_fs_cluster_snapshot(
node_memory_total_bytes: None,
container_memory_usage_bytes: None,
container_memory_limit_bytes: None,
+ gpus: Vec::new(),
process_resident_memory_bytes: None,
process_cpu_usage_percent: None,
tokio_num_workers: None,
@@ -1864,6 +1865,38 @@ pub async fn build_cluster_snapshot_with_prom_query_time(
let seg_used_bytes_by_node =
sum_segment_bytes_by_node(&prom_maps.seg_used_bytes_by_node_device);
+ fn gpu_snapshots_for_member(
+ prom_maps: &crate::prom::PromSnapshotMaps,
+ member_id: &str,
+ ) -> Vec {
+ let mut out = Vec::new();
+ for ((node, index, name), gpu) in &prom_maps.gpu_by_node_index_name {
+ if node != member_id {
+ continue;
+ }
+ out.push(GpuSnapshot {
+ index: index.clone(),
+ name: name.clone(),
+ memory_used_bytes: gpu.memory_used_bytes,
+ memory_total_bytes: gpu.memory_total_bytes,
+ utilization_percent: gpu.utilization_percent,
+ temperature_celsius: gpu.temperature_celsius,
+ process_count: gpu.process_count,
+ process_sm_utilization_percent: gpu.process_sm_utilization_percent,
+ process_memory_utilization_percent: gpu.process_memory_utilization_percent,
+ });
+ }
+ out.sort_by(|a, b| {
+ let ai = a.index.parse::();
+ let bi = b.index.parse::();
+ match (ai, bi) {
+ (Ok(ai), Ok(bi)) => ai.cmp(&bi),
+ _ => a.index.cmp(&b.index),
+ }
+ });
+ out
+ }
+
async fn prom_scalar_best_effort(
warnings: &mut Vec,
prom: &PromClient,
@@ -2068,6 +2101,7 @@ pub async fn build_cluster_snapshot_with_prom_query_time(
.container_memory_limit_bytes
.get(&member_id)
.copied(),
+ gpus: gpu_snapshots_for_member(&prom_maps, &member_id),
process_resident_memory_bytes: prom_maps
.process_resident_memory_bytes
.get(&member_id)
diff --git a/fluxon_rs/fluxon_cli/src/model.rs b/fluxon_rs/fluxon_cli/src/model.rs
index 4b9e14a..934a09f 100644
--- a/fluxon_rs/fluxon_cli/src/model.rs
+++ b/fluxon_rs/fluxon_cli/src/model.rs
@@ -56,6 +56,26 @@ pub struct RdmaNetdevRateSnapshot {
pub rx_mbps: Option,
}
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
+pub struct GpuSnapshot {
+ pub index: String,
+ pub name: String,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub memory_used_bytes: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub memory_total_bytes: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub utilization_percent: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub temperature_celsius: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub process_count: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub process_sm_utilization_percent: Option,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub process_memory_utilization_percent: Option,
+}
+
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KvTopologyOwnerExternalMaxSnapshot {
pub owner_id: String,
@@ -379,6 +399,79 @@ fn fmt_bytes_per_sec_from_mbps(v_mbps: Option) -> (String, UiPillStatus) {
fmt_bytes_auto(Some(bytes_per_sec), true)
}
+fn fmt_bytes_gib_short(v: Option) -> String {
+ match v {
+ Some(bytes) => format!("{:.1}G", bytes / 1024.0 / 1024.0 / 1024.0),
+ None => "-".to_string(),
+ }
+}
+
+fn fmt_percent_short(v: Option) -> String {
+ match v {
+ Some(v) => format!("{:.0}%", v),
+ None => "-".to_string(),
+ }
+}
+
+fn fmt_temp_short(v: Option) -> String {
+ match v {
+ Some(v) => format!("{:.0}C", v),
+ None => "-".to_string(),
+ }
+}
+
+fn fmt_count_short(v: Option) -> String {
+ match v {
+ Some(v) => format!("{:.0}", v),
+ None => "-".to_string(),
+ }
+}
+
+fn render_gpu_summary(gpus: &[GpuSnapshot]) -> String {
+ if gpus.is_empty() {
+ return "N/A".to_string();
+ }
+
+ let mut rows: Vec<&GpuSnapshot> = gpus.iter().collect();
+ rows.sort_by(|a, b| {
+ let ai = a.index.parse::();
+ let bi = b.index.parse::();
+ match (ai, bi) {
+ (Ok(ai), Ok(bi)) => ai.cmp(&bi),
+ _ => a.index.cmp(&b.index),
+ }
+ });
+
+ rows.into_iter()
+ .map(|g| {
+ format!(
+ "{}: {}/{} util={} temp={} p={} sm={} pmem={}",
+ g.index,
+ fmt_bytes_gib_short(g.memory_used_bytes),
+ fmt_bytes_gib_short(g.memory_total_bytes),
+ fmt_percent_short(g.utilization_percent),
+ fmt_temp_short(g.temperature_celsius),
+ fmt_count_short(g.process_count),
+ fmt_percent_short(g.process_sm_utilization_percent),
+ fmt_percent_short(g.process_memory_utilization_percent),
+ )
+ })
+ .collect::>()
+ .join(" | ")
+}
+
+fn max_gpu_memory_used(gpus: &[GpuSnapshot]) -> Option {
+ gpus.iter()
+ .filter_map(|g| g.memory_used_bytes)
+ .fold(None, |acc, v| Some(acc.map(|a| a.max(v)).unwrap_or(v)))
+}
+
+fn max_gpu_utilization(gpus: &[GpuSnapshot]) -> Option {
+ gpus.iter()
+ .filter_map(|g| g.utilization_percent)
+ .fold(None, |acc, v| Some(acc.map(|a| a.max(v)).unwrap_or(v)))
+}
+
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NetworkConfigSnapshot {
pub subnet_whitelist: Vec,
@@ -528,6 +621,7 @@ pub struct MemberSnapshot {
pub node_memory_total_bytes: Option,
pub container_memory_usage_bytes: Option,
pub container_memory_limit_bytes: Option,
+ pub gpus: Vec,
pub process_resident_memory_bytes: Option,
pub process_cpu_usage_percent: Option,
pub tokio_num_workers: Option,
@@ -1531,6 +1625,9 @@ pub struct MemberTableRowView {
pub shared_mem_dir_text: String,
pub p2p_listen_port_text: String,
pub rdma_text: String,
+ pub gpu_text: String,
+ pub gpu_memory_used_sort: String,
+ pub gpu_utilization_sort: String,
pub search_text: String,
pub cpu_text: String,
pub cpu_sort: String,
@@ -1937,6 +2034,7 @@ pub fn build_member_table_rows(snapshot: &ClusterSnapshot) -> Vec Vec Vec,
}
+#[derive(Debug, Clone, Default)]
+pub struct GpuPromSnapshot {
+ pub memory_used_bytes: Option,
+ pub memory_total_bytes: Option,
+ pub utilization_percent: Option,
+ pub temperature_celsius: Option,
+ pub process_count: Option,
+ pub process_sm_utilization_percent: Option,
+ pub process_memory_utilization_percent: Option,
+}
+
pub struct PromSnapshotMaps {
pub node_cpu_usage_percent: HashMap,
pub node_cpu_logical_cores: HashMap,
@@ -241,6 +256,7 @@ pub struct PromSnapshotMaps {
pub node_memory_total_bytes: HashMap,
pub container_memory_usage_bytes: HashMap,
pub container_memory_limit_bytes: HashMap,
+ pub gpu_by_node_index_name: HashMap<(String, String, String), GpuPromSnapshot>,
pub process_resident_memory_bytes: HashMap,
pub process_cpu_usage_percent: HashMap,
pub tokio_num_workers: HashMap,
@@ -291,6 +307,7 @@ impl PromSnapshotMaps {
node_memory_total_bytes: HashMap::new(),
container_memory_usage_bytes: HashMap::new(),
container_memory_limit_bytes: HashMap::new(),
+ gpu_by_node_index_name: HashMap::new(),
process_resident_memory_bytes: HashMap::new(),
process_cpu_usage_percent: HashMap::new(),
tokio_num_workers: HashMap::new(),
@@ -459,6 +476,39 @@ fn take_node_device_metric(samples: &[PromSample]) -> HashMap<(String, String),
out
}
+fn take_gpu_metric(samples: &[PromSample]) -> HashMap<(String, String, String), f64> {
+ let mut out = HashMap::new();
+ for s in samples {
+ let Some(node) = s.metric.get(PROM_LABEL_NODE) else {
+ continue;
+ };
+ let Some(gpu_index) = s.metric.get(PROM_LABEL_GPU_INDEX) else {
+ continue;
+ };
+ let Some(gpu_name) = s.metric.get(PROM_LABEL_GPU_NAME) else {
+ continue;
+ };
+ let Some(v) = s.value_f64() else {
+ continue;
+ };
+ out.insert((node.clone(), gpu_index.clone(), gpu_name.clone()), v);
+ }
+ out
+}
+
+fn merge_gpu_metric(
+ dst: &mut HashMap<(String, String, String), GpuPromSnapshot>,
+ src: HashMap<(String, String, String), f64>,
+ mut set: F,
+) where
+ F: FnMut(&mut GpuPromSnapshot, f64),
+{
+ for (key, value) in src {
+ let gpu = dst.entry(key).or_default();
+ set(gpu, value);
+ }
+}
+
pub fn role_from_member_metadata(meta: &BTreeMap) -> MemberRole {
if meta.get("master").map(|v| v == "true").unwrap_or(false) {
return MemberRole::Master;
@@ -1090,6 +1140,97 @@ pub async fn collect_prom_snapshot(
)
.await,
);
+ merge_gpu_metric(
+ &mut out.gpu_by_node_index_name,
+ take_gpu_metric(
+ &q(
+ prom,
+ warnings,
+ PROM_METRIC_GPU_MEMORY_USED_BYTES,
+ PROM_METRIC_GPU_MEMORY_USED_BYTES,
+ )
+ .await,
+ ),
+ |gpu, v| gpu.memory_used_bytes = Some(v),
+ );
+ merge_gpu_metric(
+ &mut out.gpu_by_node_index_name,
+ take_gpu_metric(
+ &q(
+ prom,
+ warnings,
+ PROM_METRIC_GPU_MEMORY_TOTAL_BYTES,
+ PROM_METRIC_GPU_MEMORY_TOTAL_BYTES,
+ )
+ .await,
+ ),
+ |gpu, v| gpu.memory_total_bytes = Some(v),
+ );
+ merge_gpu_metric(
+ &mut out.gpu_by_node_index_name,
+ take_gpu_metric(
+ &q(
+ prom,
+ warnings,
+ PROM_METRIC_GPU_UTILIZATION_PERCENT,
+ PROM_METRIC_GPU_UTILIZATION_PERCENT,
+ )
+ .await,
+ ),
+ |gpu, v| gpu.utilization_percent = Some(v),
+ );
+ merge_gpu_metric(
+ &mut out.gpu_by_node_index_name,
+ take_gpu_metric(
+ &q(
+ prom,
+ warnings,
+ PROM_METRIC_GPU_TEMPERATURE_CELSIUS,
+ PROM_METRIC_GPU_TEMPERATURE_CELSIUS,
+ )
+ .await,
+ ),
+ |gpu, v| gpu.temperature_celsius = Some(v),
+ );
+ merge_gpu_metric(
+ &mut out.gpu_by_node_index_name,
+ take_gpu_metric(
+ &q(
+ prom,
+ warnings,
+ PROM_METRIC_GPU_PROCESS_COUNT,
+ PROM_METRIC_GPU_PROCESS_COUNT,
+ )
+ .await,
+ ),
+ |gpu, v| gpu.process_count = Some(v),
+ );
+ merge_gpu_metric(
+ &mut out.gpu_by_node_index_name,
+ take_gpu_metric(
+ &q(
+ prom,
+ warnings,
+ PROM_METRIC_GPU_PROCESS_SM_UTILIZATION_PERCENT,
+ PROM_METRIC_GPU_PROCESS_SM_UTILIZATION_PERCENT,
+ )
+ .await,
+ ),
+ |gpu, v| gpu.process_sm_utilization_percent = Some(v),
+ );
+ merge_gpu_metric(
+ &mut out.gpu_by_node_index_name,
+ take_gpu_metric(
+ &q(
+ prom,
+ warnings,
+ PROM_METRIC_GPU_PROCESS_MEMORY_UTILIZATION_PERCENT,
+ PROM_METRIC_GPU_PROCESS_MEMORY_UTILIZATION_PERCENT,
+ )
+ .await,
+ ),
+ |gpu, v| gpu.process_memory_utilization_percent = Some(v),
+ );
out.process_resident_memory_bytes = take_node_metric(
&q(
prom,
@@ -1168,7 +1309,7 @@ pub async fn collect_prom_snapshot(
prom,
warnings,
"process_network_tx_mbps",
- "sum by (node) (rate(client_network_bytes_total{direction=\"tx\"}[2m])) * 8 / 1000000",
+ &format!("{PROM_METRIC_CLIENT_NETWORK_MBPS}{{direction=\"tx\"}}"),
)
.await,
);
@@ -1177,7 +1318,7 @@ pub async fn collect_prom_snapshot(
prom,
warnings,
"process_network_rx_mbps",
- "sum by (node) (rate(client_network_bytes_total{direction=\"rx\"}[2m])) * 8 / 1000000",
+ &format!("{PROM_METRIC_CLIENT_NETWORK_MBPS}{{direction=\"rx\"}}"),
)
.await,
);
diff --git a/fluxon_rs/fluxon_cli/src/server.rs b/fluxon_rs/fluxon_cli/src/server.rs
index ed6baec..e1dee90 100644
--- a/fluxon_rs/fluxon_cli/src/server.rs
+++ b/fluxon_rs/fluxon_cli/src/server.rs
@@ -19,11 +19,13 @@ use hyper::Uri;
use hyper::client::HttpConnector;
use hyper_rustls::HttpsConnectorBuilder;
use serde::Serialize;
+use std::collections::{BTreeMap, BTreeSet};
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::{RwLock, watch};
+use fluxon_observability::keys::PROM_METRIC_CLIENT_NETWORK_MBPS;
use fluxon_util::{
FluxonCliProxyDescriptorV2, FluxonCliProxyTransportV2, fluxon_cli_proxy_desc_etcd_key_v2,
fluxon_cli_proxy_desc_etcd_service_prefix_v2,
@@ -245,6 +247,8 @@ const FLUXON_CLI_AUTO_REFRESH_TOOL_JS: &str = r#"
if (inFlightRef.inFlight) return;
inFlightRef.inFlight = true;
const state = hooks.captureState();
+ const scrollX = window.scrollX;
+ const scrollY = window.scrollY;
try {
const resp = await fetch(cfg.url, { cache: 'no-store' });
if (!resp.ok) {
@@ -262,6 +266,7 @@ const FLUXON_CLI_AUTO_REFRESH_TOOL_JS: &str = r#"
curApp.innerHTML = nextApp.innerHTML;
hooks.restoreState(state);
hooks.afterReplace();
+ window.scrollTo(scrollX, scrollY);
} catch (e) {
console.warn('auto_refresh: refresh failed:', e);
} finally {
@@ -479,16 +484,30 @@ struct KvMetricPanelResponse {
#[serde(rename_all = "snake_case")]
struct KvMetricMembersResponse {
metric: KvMetricMetaWire,
+ comparison_metric: Option,
+ additional_metrics: Vec,
range: KvMetricRangeWire,
members: Vec,
warnings: Vec,
}
+#[derive(Debug, Clone, Serialize)]
+#[serde(rename_all = "snake_case")]
+struct KvMetricOwnersResponse {
+ metric: KvMetricMetaWire,
+ comparison_metric: Option,
+ additional_metrics: Vec,
+ range: KvMetricRangeWire,
+ owners: Vec,
+ warnings: Vec,
+}
+
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "snake_case")]
struct KvMetricMetaWire {
key: String,
label: String,
+ series_label: String,
unit: String,
aggregate: String,
}
@@ -504,8 +523,20 @@ struct KvMetricRangeWire {
#[serde(rename_all = "snake_case")]
struct KvAggregateMetricCardWire {
metric: KvMetricMetaWire,
+ comparison_metric: Option,
latest: Option,
aggregate_series: Vec<(f64, f64)>,
+ comparison_latest: Option,
+ comparison_series: Vec<(f64, f64)>,
+ additional_series: Vec,
+}
+
+#[derive(Debug, Clone, Serialize)]
+#[serde(rename_all = "snake_case")]
+struct KvMetricAdditionalSeriesWire {
+ metric: KvMetricMetaWire,
+ latest: Option,
+ series: Vec<(f64, f64)>,
}
#[derive(Debug, Clone, Serialize)]
@@ -516,12 +547,29 @@ struct KvMemberSeriesWire {
node_key: String,
latest: Option,
series: Vec<(f64, f64)>,
+ comparison_latest: Option,
+ comparison_series: Vec<(f64, f64)>,
+ additional_series: Vec,
+}
+
+#[derive(Debug, Clone, Serialize)]
+#[serde(rename_all = "snake_case")]
+struct KvOwnerSeriesWire {
+ owner_id: String,
+ node_key: String,
+ latest: Option,
+ series: Vec<(f64, f64)>,
+ comparison_latest: Option,
+ comparison_series: Vec<(f64, f64)>,
+ additional_series: Vec,
+ members: Vec,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum KvMetricAggregate {
Sum,
Max,
+ Mean,
}
impl KvMetricAggregate {
@@ -529,6 +577,7 @@ impl KvMetricAggregate {
match self {
Self::Sum => "sum",
Self::Max => "max",
+ Self::Mean => "mean",
}
}
}
@@ -537,93 +586,427 @@ impl KvMetricAggregate {
enum KvMetricValueField {
PutRps,
GetRps,
+ GetCacheHitRatePercent,
PutBps,
GetBps,
+ ProcessCpuUsagePercent,
+ NodeCpuUsagePercent,
+ NodeCpuCapacityPercent,
+ ProcessNetworkTxMbps,
+ ProcessNetworkRxMbps,
+ NodeMemoryUsageBytes,
+ NodeMemoryTotalBytes,
+ NodeNetworkTxMbps,
+ NodeNetworkRxMbps,
ProcessRss,
+ GpuMemoryUsed,
+ GpuMemoryTotal,
+ GpuUtilizationPercent,
+ GpuTemperatureCelsius,
+ GpuProcessCount,
+ GpuProcessSmUtilizationPercent,
+ GpuProcessMemoryUtilizationPercent,
SegUsedBytes,
+ SegCapacityBytes,
TokioGlobalQueueDepth,
TokioBusyPercent,
}
+#[derive(Debug, Clone, Copy)]
+struct KvMetricSeriesSpec {
+ key: &'static str,
+ label: &'static str,
+ series_label: &'static str,
+ unit: &'static str,
+ aggregate: KvMetricAggregate,
+ field: KvMetricValueField,
+ roles: Option<&'static [MemberRole]>,
+}
+
+impl KvMetricSeriesSpec {
+ const fn new(
+ key: &'static str,
+ label: &'static str,
+ series_label: &'static str,
+ unit: &'static str,
+ aggregate: KvMetricAggregate,
+ field: KvMetricValueField,
+ ) -> Self {
+ Self {
+ key,
+ label,
+ series_label,
+ unit,
+ aggregate,
+ field,
+ roles: None,
+ }
+ }
+
+ const fn new_with_roles(
+ key: &'static str,
+ label: &'static str,
+ series_label: &'static str,
+ unit: &'static str,
+ aggregate: KvMetricAggregate,
+ field: KvMetricValueField,
+ roles: &'static [MemberRole],
+ ) -> Self {
+ Self {
+ key,
+ label,
+ series_label,
+ unit,
+ aggregate,
+ field,
+ roles: Some(roles),
+ }
+ }
+}
+
#[derive(Debug, Clone, Copy)]
struct KvMetricSpec {
key: &'static str,
label: &'static str,
+ series_label: &'static str,
unit: &'static str,
aggregate: KvMetricAggregate,
field: KvMetricValueField,
roles: &'static [MemberRole],
+ comparison: Option,
+ additional: &'static [KvMetricSeriesSpec],
+}
+
+impl KvMetricSpec {
+ const fn single(
+ key: &'static str,
+ label: &'static str,
+ unit: &'static str,
+ aggregate: KvMetricAggregate,
+ field: KvMetricValueField,
+ roles: &'static [MemberRole],
+ ) -> Self {
+ Self::single_with_series_label(key, label, label, unit, aggregate, field, roles)
+ }
+
+ const fn single_with_series_label(
+ key: &'static str,
+ label: &'static str,
+ series_label: &'static str,
+ unit: &'static str,
+ aggregate: KvMetricAggregate,
+ field: KvMetricValueField,
+ roles: &'static [MemberRole],
+ ) -> Self {
+ Self {
+ key,
+ label,
+ series_label,
+ unit,
+ aggregate,
+ field,
+ roles,
+ comparison: None,
+ additional: &[],
+ }
+ }
+
+ const fn paired(
+ primary: KvMetricSeriesSpec,
+ comparison: KvMetricSeriesSpec,
+ roles: &'static [MemberRole],
+ ) -> Self {
+ Self {
+ key: primary.key,
+ label: primary.label,
+ series_label: primary.series_label,
+ unit: primary.unit,
+ aggregate: primary.aggregate,
+ field: primary.field,
+ roles,
+ comparison: Some(comparison),
+ additional: &[],
+ }
+ }
+
+ const fn paired_with_additional(
+ primary: KvMetricSeriesSpec,
+ comparison: KvMetricSeriesSpec,
+ additional: &'static [KvMetricSeriesSpec],
+ roles: &'static [MemberRole],
+ ) -> Self {
+ Self {
+ key: primary.key,
+ label: primary.label,
+ series_label: primary.series_label,
+ unit: primary.unit,
+ aggregate: primary.aggregate,
+ field: primary.field,
+ roles,
+ comparison: Some(comparison),
+ additional,
+ }
+ }
}
const KV_METRIC_OWNER_AND_EXTERNAL_ROLES: &[MemberRole] =
&[MemberRole::OwnerClient, MemberRole::ExternalClient];
+const KV_METRIC_OWNER_AND_EXTERNAL_LIKE_ROLES: &[MemberRole] = &[
+ MemberRole::OwnerClient,
+ MemberRole::ExternalClient,
+ MemberRole::SideTransferWorker,
+];
+const KV_METRIC_SYSTEM_ROLES: &[MemberRole] = &[MemberRole::Master, MemberRole::OwnerClient];
const KV_METRIC_OWNER_ONLY_ROLES: &[MemberRole] = &[MemberRole::OwnerClient];
+const KV_NODE_MEMORY_ADDITIONAL_SPECS: &[KvMetricSeriesSpec] =
+ &[KvMetricSeriesSpec::new_with_roles(
+ "process_rss",
+ "Process RSS",
+ "Process RSS",
+ "bytes",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::ProcessRss,
+ KV_METRIC_OWNER_AND_EXTERNAL_LIKE_ROLES,
+ )];
+const KV_NODE_CPU_ADDITIONAL_SPECS: &[KvMetricSeriesSpec] = &[KvMetricSeriesSpec::new_with_roles(
+ "process_cpu_usage_percent",
+ "Process CPU",
+ "Process CPU",
+ "percent",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::ProcessCpuUsagePercent,
+ KV_METRIC_OWNER_AND_EXTERNAL_LIKE_ROLES,
+)];
const KV_METRIC_SPECS: &[KvMetricSpec] = &[
- KvMetricSpec {
- key: "put_rps",
- label: "Put RPS",
- unit: "rps",
- aggregate: KvMetricAggregate::Sum,
- field: KvMetricValueField::PutRps,
- roles: KV_METRIC_OWNER_AND_EXTERNAL_ROLES,
- },
- KvMetricSpec {
- key: "get_rps",
- label: "Get RPS",
- unit: "rps",
- aggregate: KvMetricAggregate::Sum,
- field: KvMetricValueField::GetRps,
- roles: KV_METRIC_OWNER_AND_EXTERNAL_ROLES,
- },
- KvMetricSpec {
- key: "put_bps",
- label: "Put B/s",
- unit: "B/s",
- aggregate: KvMetricAggregate::Sum,
- field: KvMetricValueField::PutBps,
- roles: KV_METRIC_OWNER_AND_EXTERNAL_ROLES,
- },
- KvMetricSpec {
- key: "get_bps",
- label: "Get B/s",
- unit: "B/s",
- aggregate: KvMetricAggregate::Sum,
- field: KvMetricValueField::GetBps,
- roles: KV_METRIC_OWNER_AND_EXTERNAL_ROLES,
- },
- KvMetricSpec {
- key: "process_rss",
- label: "Process RSS",
- unit: "bytes",
- aggregate: KvMetricAggregate::Sum,
- field: KvMetricValueField::ProcessRss,
- roles: KV_METRIC_OWNER_AND_EXTERNAL_ROLES,
- },
- KvMetricSpec {
- key: "seg_used_bytes",
- label: "Segment Used",
- unit: "bytes",
- aggregate: KvMetricAggregate::Sum,
- field: KvMetricValueField::SegUsedBytes,
- roles: KV_METRIC_OWNER_ONLY_ROLES,
- },
- KvMetricSpec {
- key: "tokio_global_queue_depth",
- label: "Tokio Queue Depth",
- unit: "count",
- aggregate: KvMetricAggregate::Sum,
- field: KvMetricValueField::TokioGlobalQueueDepth,
- roles: KV_METRIC_OWNER_ONLY_ROLES,
- },
- KvMetricSpec {
- key: "tokio_busy_percent",
- label: "Tokio Busy %",
- unit: "percent",
- aggregate: KvMetricAggregate::Max,
- field: KvMetricValueField::TokioBusyPercent,
- roles: KV_METRIC_OWNER_ONLY_ROLES,
- },
+ KvMetricSpec::single(
+ "put_rps",
+ "Put RPS",
+ "rps",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::PutRps,
+ KV_METRIC_OWNER_AND_EXTERNAL_ROLES,
+ ),
+ KvMetricSpec::single(
+ "get_rps",
+ "Get RPS",
+ "rps",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::GetRps,
+ KV_METRIC_OWNER_AND_EXTERNAL_ROLES,
+ ),
+ KvMetricSpec::single(
+ "get_cache_hit_rate_percent",
+ "Cache Hit %",
+ "percent",
+ KvMetricAggregate::Mean,
+ KvMetricValueField::GetCacheHitRatePercent,
+ KV_METRIC_OWNER_ONLY_ROLES,
+ ),
+ KvMetricSpec::single(
+ "put_bps",
+ "Put B/s",
+ "B/s",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::PutBps,
+ KV_METRIC_OWNER_AND_EXTERNAL_ROLES,
+ ),
+ KvMetricSpec::single(
+ "get_bps",
+ "Get B/s",
+ "B/s",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::GetBps,
+ KV_METRIC_OWNER_AND_EXTERNAL_ROLES,
+ ),
+ KvMetricSpec::single(
+ "process_cpu_usage_percent",
+ "CPU Util %",
+ "percent",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::ProcessCpuUsagePercent,
+ KV_METRIC_OWNER_AND_EXTERNAL_LIKE_ROLES,
+ ),
+ KvMetricSpec::paired_with_additional(
+ KvMetricSeriesSpec::new(
+ "node_cpu_usage_percent",
+ "Node CPU",
+ "Used",
+ "percent",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::NodeCpuUsagePercent,
+ ),
+ KvMetricSeriesSpec::new(
+ "node_cpu_capacity_percent",
+ "Capacity",
+ "Capacity",
+ "percent",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::NodeCpuCapacityPercent,
+ ),
+ KV_NODE_CPU_ADDITIONAL_SPECS,
+ KV_METRIC_OWNER_ONLY_ROLES,
+ ),
+ KvMetricSpec::paired(
+ KvMetricSeriesSpec::new(
+ "process_network_tx_mbps",
+ "Process Network",
+ "TX",
+ "mbps",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::ProcessNetworkTxMbps,
+ ),
+ KvMetricSeriesSpec::new(
+ "process_network_rx_mbps",
+ "RX",
+ "RX",
+ "mbps",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::ProcessNetworkRxMbps,
+ ),
+ KV_METRIC_OWNER_AND_EXTERNAL_LIKE_ROLES,
+ ),
+ KvMetricSpec::paired_with_additional(
+ KvMetricSeriesSpec::new(
+ "node_memory_usage_bytes",
+ "Node Memory",
+ "Used",
+ "bytes",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::NodeMemoryUsageBytes,
+ ),
+ KvMetricSeriesSpec::new(
+ "node_memory_total_bytes",
+ "Total",
+ "Total",
+ "bytes",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::NodeMemoryTotalBytes,
+ ),
+ KV_NODE_MEMORY_ADDITIONAL_SPECS,
+ KV_METRIC_OWNER_ONLY_ROLES,
+ ),
+ KvMetricSpec::paired(
+ KvMetricSeriesSpec::new(
+ "node_network_tx_mbps",
+ "Node Network",
+ "TX",
+ "mbps",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::NodeNetworkTxMbps,
+ ),
+ KvMetricSeriesSpec::new(
+ "node_network_rx_mbps",
+ "RX",
+ "RX",
+ "mbps",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::NodeNetworkRxMbps,
+ ),
+ KV_METRIC_OWNER_ONLY_ROLES,
+ ),
+ KvMetricSpec::single(
+ "process_rss",
+ "Process RSS",
+ "bytes",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::ProcessRss,
+ KV_METRIC_OWNER_AND_EXTERNAL_LIKE_ROLES,
+ ),
+ KvMetricSpec::paired(
+ KvMetricSeriesSpec::new(
+ "seg_used_bytes",
+ "Segment Usage",
+ "Used",
+ "bytes",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::SegUsedBytes,
+ ),
+ KvMetricSeriesSpec::new(
+ "seg_capacity_bytes",
+ "Capacity",
+ "Capacity",
+ "bytes",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::SegCapacityBytes,
+ ),
+ KV_METRIC_OWNER_ONLY_ROLES,
+ ),
+ KvMetricSpec::paired(
+ KvMetricSeriesSpec::new(
+ "gpu_memory_used",
+ "GPU Memory",
+ "Used",
+ "bytes",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::GpuMemoryUsed,
+ ),
+ KvMetricSeriesSpec::new(
+ "gpu_memory_total",
+ "Total",
+ "Total",
+ "bytes",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::GpuMemoryTotal,
+ ),
+ KV_METRIC_SYSTEM_ROLES,
+ ),
+ KvMetricSpec::single(
+ "gpu_utilization_percent",
+ "GPU Util %",
+ "percent",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::GpuUtilizationPercent,
+ KV_METRIC_SYSTEM_ROLES,
+ ),
+ KvMetricSpec::single(
+ "gpu_temperature_celsius",
+ "GPU Temp",
+ "celsius",
+ KvMetricAggregate::Max,
+ KvMetricValueField::GpuTemperatureCelsius,
+ KV_METRIC_SYSTEM_ROLES,
+ ),
+ KvMetricSpec::single(
+ "gpu_process_count",
+ "GPU Proc Count",
+ "count",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::GpuProcessCount,
+ KV_METRIC_SYSTEM_ROLES,
+ ),
+ KvMetricSpec::single(
+ "gpu_process_sm_utilization_percent",
+ "GPU Proc SM %",
+ "percent",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::GpuProcessSmUtilizationPercent,
+ KV_METRIC_SYSTEM_ROLES,
+ ),
+ KvMetricSpec::single(
+ "gpu_process_memory_utilization_percent",
+ "GPU Proc Mem %",
+ "percent",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::GpuProcessMemoryUtilizationPercent,
+ KV_METRIC_SYSTEM_ROLES,
+ ),
+ KvMetricSpec::single(
+ "tokio_global_queue_depth",
+ "Tokio Queue Depth",
+ "count",
+ KvMetricAggregate::Sum,
+ KvMetricValueField::TokioGlobalQueueDepth,
+ KV_METRIC_OWNER_ONLY_ROLES,
+ ),
+ KvMetricSpec::single(
+ "tokio_busy_percent",
+ "Tokio Busy %",
+ "percent",
+ KvMetricAggregate::Max,
+ KvMetricValueField::TokioBusyPercent,
+ KV_METRIC_OWNER_ONLY_ROLES,
+ ),
];
fn kv_metric_spec_by_key(key: &str) -> Option {
@@ -634,11 +1017,34 @@ fn kv_metric_meta(spec: KvMetricSpec) -> KvMetricMetaWire {
KvMetricMetaWire {
key: spec.key.to_string(),
label: spec.label.to_string(),
+ series_label: spec.series_label.to_string(),
+ unit: spec.unit.to_string(),
+ aggregate: spec.aggregate.as_str().to_string(),
+ }
+}
+
+fn kv_metric_meta_from_series(spec: KvMetricSeriesSpec) -> KvMetricMetaWire {
+ KvMetricMetaWire {
+ key: spec.key.to_string(),
+ label: spec.label.to_string(),
+ series_label: spec.series_label.to_string(),
unit: spec.unit.to_string(),
aggregate: spec.aggregate.as_str().to_string(),
}
}
+fn kv_metric_comparison_meta(spec: KvMetricSpec) -> Option {
+ spec.comparison.map(kv_metric_meta_from_series)
+}
+
+fn kv_metric_additional_meta(spec: KvMetricSpec) -> Vec {
+ spec.additional
+ .iter()
+ .copied()
+ .map(kv_metric_meta_from_series)
+ .collect()
+}
+
fn parse_kv_metric_window(raw: Option<&str>) -> Result<(String, f64, u64), String> {
match raw.unwrap_or("15m") {
"5m" => Ok(("5m".to_string(), 5.0 * 60.0, 5)),
@@ -683,26 +1089,86 @@ fn parse_member_roles_list(raw: Option<&Vec>) -> Result
{% endif %}
{% if header.member_kind_query == "kv" %}
+
Owner RDMA Control ({{ owner_rdma_controls.len() }} owners)
{% if owner_rdma_controls.is_empty() %}
@@ -320,6 +344,10 @@ Cluster {{ header.cluster_name }}