From 9ab66ed92a875915ba0cce6f17446a6ac12f56ad Mon Sep 17 00:00:00 2001 From: luohua13 Date: Thu, 21 May 2026 13:39:49 +0000 Subject: [PATCH] fix: make /metrics scrape-safe and cut page-side query fanout When HAMi-WebUI is wired to a large Prometheus / VictoriaMetrics backend, two classes of problems showed up that this change addresses end to end. 1. /metrics caused vmselect OOM and request timeouts ------------------------------------------------------ The /metrics handler called GenerateMetrics synchronously on every scrape, and GenerateMetrics fanned out one PromQL per device/container metric. At ~150 GPUs that is ~1800 instant queries per scrape, all sharing the incoming request's context. With the chart default of server.http.timeout=1s, that context was cancelled after one second, so most queries were aborted mid-flight; the burst also pushed vmselect into OOM. Prometheus pages and the platform's own monitor dashboards do not behave this way because they issue a few aggregated queries, not a per-entity fanout. * server/internal/exporter: MetricsGenerator now implements the kratos transport.Server interface. A single background goroutine refreshes the in-memory Prometheus registry on a configurable interval (default 30s) using its own bounded context (default 60s), and refuses to overlap cycles so a slow refresh can never stack parallel fanouts onto the upstream. * server/internal/server/http.go: /metrics now only serializes the registry (promhttp.Handler). Scrape latency is O(1) and no longer tied to server.http.timeout or to upstream query latency. * server/cmd/server/main.go: the collector is registered as a kratos server so it starts and drains with the app lifecycle. 2. Metric series flickered in and out (charts intermittently empty) ------------------------------------------------------------------- GenerateMetrics began each cycle with reset(), which wipes every GaugeVec. That was safe only while generation ran inside the scrape handler (scrape saw the fully repopulated state). Once generation moved to the background, every scrape landing between reset() and the end of repopulation observed partial or empty data, so series flickered at scrape boundaries and UI charts went to 0. * server/internal/exporter/cells.go (new): each Set now goes through a helper that records the (gauge, label tuple) it wrote. After a cycle completes, tuples present in the previous cycle but not the current one are removed via DeleteLabelValues; a cycle cut short by ctx cancellation is dropped so the last known-good snapshot is preserved. Existing series are overwritten in place, so there is no window where a known device is missing. * server/internal/exporter/metrics.go: reset() removed. 3. GPU / node list endpoints issued an N+1 query fanout ------------------------------------------------------- GetAllGPUs and GetAllNodes queried Prometheus once or twice per entity inside their loops and called StatisticsByDeviceId (which re-lists every container on each call) once per device. At scale the GPU list issued 300+ serial instant queries; this is why the compute-management page spun for ~10s. * server/internal/service/card.go, node.go: fetch the container list once and reuse biz.ContainersStatisticsInfo; batch the per-entity hami_core_size / hami_memory_size lookups into one query each, grouped by device_uuid (cards) or node (nodes), then map by label in memory. Per-endpoint query count drops from O(N) to O(1) while preserving the original avg/sum-by-instance values. 4. Informer-backed caches were read without locking --------------------------------------------------- The node and pod caches are filled by informer event handlers / a background rebuild, but the list/get readers iterated the maps without holding the mutex. Now that the list endpoints route their hot path through these reads, that race could trigger Go's "concurrent map read and map write" fatal error. * server/internal/data/node.go, pod.go: take r.mutex.RLock in every reader and Lock around the node map swap. Also harden podRepo.FindOne, which previously dereferenced a missing map entry and panicked for an unknown podUID. 5. Timeouts are no longer hardcoded ----------------------------------- * charts/hami-webui: server.http/grpc.timeout default to 60s (the old 1s killed legitimate page-side APIs that take a few seconds), and the new metricsExporter.{interval,timeout} / externalPrometheus.timeout are exposed. * server/internal/conf/conf.proto, config/config.yaml: add the exporter config block with safe defaults (interval 30s, timeout 60s). * packages/web: the axios global timeout is configurable via VUE_APP_REQUEST_TIMEOUT (default 60000) instead of the hardcoded 5000, which was the direct cause of UI timeouts even after the backend was fixed. 6. Frontend got "connection refused" before the backend was listening ---------------------------------------------------------------------- The backend only starts listening after its informer caches finish syncing, which can take seconds on a large cluster. The frontend BFF comes up first, so early UI requests proxied to the not-yet-listening backend and surfaced a wave of connection-refused popups until the backend caught up. * server/internal/server/http.go: add a /readyz endpoint that returns 200 once the server is accepting requests (reaching the handler implies informer caches have synced). * charts/hami-webui: add a configurable readinessProbe on the backend container hitting /readyz, so the pod stays out of the Service until the backend can serve. Deliberately gated on "listening", not on the metrics collector's first cycle, to avoid coupling UI availability to Prometheus. Signed-off-by: luohua13 --- charts/hami-webui/templates/configmap.yaml | 9 +- charts/hami-webui/templates/deployment.yaml | 10 + charts/hami-webui/values.yaml | 42 +++- packages/web/.env.development | 3 + packages/web/.env.production | 3 + packages/web/src/utils/request.js | 10 +- server/cmd/server/main.go | 6 +- server/config/config.yaml | 9 +- server/internal/conf/conf.proto | 12 + server/internal/data/node.go | 15 +- server/internal/data/pod.go | 10 +- server/internal/exporter/cells.go | 97 ++++++++ server/internal/exporter/exporter.go | 243 ++++++++++++++++---- server/internal/exporter/metrics.go | 32 --- server/internal/server/http.go | 26 ++- server/internal/service/card.go | 57 +++-- server/internal/service/node.go | 77 ++++--- 17 files changed, 520 insertions(+), 141 deletions(-) create mode 100644 server/internal/exporter/cells.go diff --git a/charts/hami-webui/templates/configmap.yaml b/charts/hami-webui/templates/configmap.yaml index 31dceb93..3b6b3280 100644 --- a/charts/hami-webui/templates/configmap.yaml +++ b/charts/hami-webui/templates/configmap.yaml @@ -8,13 +8,16 @@ data: server: http: addr: 0.0.0.0:8000 - timeout: 1s + timeout: {{ .Values.backend.http.timeout | default "60s" }} grpc: addr: 0.0.0.0:9000 - timeout: 1s + timeout: {{ .Values.backend.grpc.timeout | default "60s" }} prometheus: address: {{ ternary .Values.externalPrometheus.address (printf "http://%s-kube-prometh-prometheus.%s.svc.cluster.local:9090" (include "hami-webui.fullname" .) (include "hami-webui.namespace" .)) .Values.externalPrometheus.enabled }} - timeout: 1m + timeout: {{ .Values.externalPrometheus.timeout | default "1m" }} + exporter: + interval: {{ .Values.metricsExporter.interval | default "30s" }} + timeout: {{ .Values.metricsExporter.timeout | default "60s" }} node_selectors: {{- range $key, $value := .Values.vendorNodeSelectors }} {{ $key }}: {{ $value }} diff --git a/charts/hami-webui/templates/deployment.yaml b/charts/hami-webui/templates/deployment.yaml index a0d77b2e..83219b90 100644 --- a/charts/hami-webui/templates/deployment.yaml +++ b/charts/hami-webui/templates/deployment.yaml @@ -59,6 +59,16 @@ spec: args: - "--conf" - "/apps/config/config.yaml" + {{- if .Values.backend.readinessProbe.enabled }} + readinessProbe: + httpGet: + path: /readyz + port: metrics + initialDelaySeconds: {{ .Values.backend.readinessProbe.initialDelaySeconds }} + periodSeconds: {{ .Values.backend.readinessProbe.periodSeconds }} + timeoutSeconds: {{ .Values.backend.readinessProbe.timeoutSeconds }} + failureThreshold: {{ .Values.backend.readinessProbe.failureThreshold }} + {{- end }} resources: {{- toYaml .Values.resources.backend | nindent 12 }} volumeMounts: diff --git a/charts/hami-webui/values.yaml b/charts/hami-webui/values.yaml index c18ffca9..e7c93b43 100644 --- a/charts/hami-webui/values.yaml +++ b/charts/hami-webui/values.yaml @@ -162,4 +162,44 @@ kube-prometheus-stack: externalPrometheus: enabled: false # If externalPrometheus.enabled is true, this address will be used - address: "http://prometheus-kube-prometheus-prometheus.prometheus.svc.cluster.local:9090" \ No newline at end of file + address: "http://prometheus-kube-prometheus-prometheus.prometheus.svc.cluster.local:9090" + # Single PromQL upstream timeout (sent to Prometheus / VictoriaMetrics as the &timeout= parameter). + timeout: "1m" + +# Kratos server timeouts. These bound the deadline placed on each incoming HTTP/gRPC +# request context. They no longer affect /metrics generation (which runs in the background), +# but they still gate the page-side APIs, some of which legitimately take a few seconds +# against a large Prometheus / VictoriaMetrics cluster. +backend: + http: + timeout: "60s" + grpc: + timeout: "60s" + # Readiness probe on the backend's /readyz. The backend only starts listening + # after its k8s informer caches have synced, so gating the pod on /readyz keeps + # it out of the Service until the backend can serve — this prevents the frontend + # BFF from hitting "connection refused" against the not-yet-listening backend + # during startup. failureThreshold * periodSeconds should comfortably exceed the + # worst-case informer sync time on a large cluster. + readinessProbe: + enabled: true + initialDelaySeconds: 5 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 60 + +# Background /metrics collector. The collector runs independently of Prometheus scrape; +# scrapes only serialize the in-memory registry, so they are O(1) and immune to scrape +# timeouts. Tune interval if you need fresher data or want to reduce upstream load further. +metricsExporter: + # How often the collector refreshes hami_* metrics from the upstream Prometheus / VictoriaMetrics. + interval: "30s" + # Hard cap on a single refresh cycle. Should be >= externalPrometheus.timeout and < interval * 2. + timeout: "60s" + +# The frontend axios client timeout (default 60000ms) is compiled into the JS +# bundle at image build time from the VUE_APP_REQUEST_TIMEOUT env var, as Vite +# evaluates process.env at build, not at runtime. To change it for a deployed +# cluster you need to rebuild packages/web with a different VUE_APP_REQUEST_TIMEOUT +# (set in packages/web/.env.production or as a build-arg). It cannot be tuned +# through this values.yaml. \ No newline at end of file diff --git a/packages/web/.env.development b/packages/web/.env.development index e4017861..0db5fb6b 100644 --- a/packages/web/.env.development +++ b/packages/web/.env.development @@ -4,3 +4,6 @@ ENV = 'development' # base api VUE_APP_BASE_API = '/' +# axios global timeout in ms; override via build env or chart values.frontend.requestTimeout +VUE_APP_REQUEST_TIMEOUT = 60000 + diff --git a/packages/web/.env.production b/packages/web/.env.production index 30313bf1..d889738f 100644 --- a/packages/web/.env.production +++ b/packages/web/.env.production @@ -3,3 +3,6 @@ ENV = 'production' # base api VUE_APP_BASE_API = './' + +# axios global timeout in ms; override via build env or chart values.frontend.requestTimeout +VUE_APP_REQUEST_TIMEOUT = 60000 diff --git a/packages/web/src/utils/request.js b/packages/web/src/utils/request.js index f41db603..bd62594a 100644 --- a/packages/web/src/utils/request.js +++ b/packages/web/src/utils/request.js @@ -3,9 +3,17 @@ import axios from 'axios'; import { ElMessage, ElMessageBox, ElNotification } from 'element-plus'; import i18n from '@/locales'; +// Default request timeout in ms. Override at build time via VUE_APP_REQUEST_TIMEOUT +// (injected through .env.* or chart values.frontend.requestTimeout). 60s is large +// enough for the slowest known page-side API (/v1/nodes can take a few seconds +// against a large VictoriaMetrics cluster) while still bounding hung requests. +const DEFAULT_REQUEST_TIMEOUT = 60000; +const requestTimeout = + Number.parseInt(process.env.VUE_APP_REQUEST_TIMEOUT, 10) || DEFAULT_REQUEST_TIMEOUT; + const service = axios.create({ baseURL: process.env.VUE_APP_BASE_API, // url = base url + request url - timeout: 5000, + timeout: requestTimeout, validateStatus: function (status) { return (status >= 200 && status < 300) || status > 520; }, diff --git a/server/cmd/server/main.go b/server/cmd/server/main.go index 59feda46..2ff8fdc2 100644 --- a/server/cmd/server/main.go +++ b/server/cmd/server/main.go @@ -9,6 +9,7 @@ import ( "github.com/go-kratos/kratos/v2/transport/http" "os" "vgpu/internal/conf" + "vgpu/internal/exporter" _ "go.uber.org/automaxprocs" ) @@ -44,7 +45,9 @@ func main() { } } -func newApp(ctx context.Context, logger log.Logger, gs *grpc.Server, hs *http.Server) *kratos.App { +// newApp wires the background metrics collector in as a kratos transport.Server so +// its goroutine is started and stopped by the app lifecycle, alongside HTTP and gRPC. +func newApp(ctx context.Context, logger log.Logger, gs *grpc.Server, hs *http.Server, mc *exporter.MetricsGenerator) *kratos.App { return kratos.New( kratos.Context(ctx), kratos.ID(id), @@ -55,6 +58,7 @@ func newApp(ctx context.Context, logger log.Logger, gs *grpc.Server, hs *http.Se kratos.Server( gs, hs, + mc, ), ) } diff --git a/server/config/config.yaml b/server/config/config.yaml index 4f481e77..e46fe58a 100644 --- a/server/config/config.yaml +++ b/server/config/config.yaml @@ -1,16 +1,19 @@ server: http: addr: 0.0.0.0:8000 - timeout: 1s + timeout: 60s grpc: addr: 0.0.0.0:9000 - timeout: 1s + timeout: 60s prometheus: address: http://localhost:9090 timeout: 1m +exporter: + interval: 30s + timeout: 60s node_selectors: NVIDIA: gpu=on Ascend: ascend=on DCU: dcu=on MLU: mlu=on - Metax: metax-tech.com/gpu.installed=true \ No newline at end of file + Metax: metax-tech.com/gpu.installed=true diff --git a/server/internal/conf/conf.proto b/server/internal/conf/conf.proto index 7eb44e68..9ca428fe 100644 --- a/server/internal/conf/conf.proto +++ b/server/internal/conf/conf.proto @@ -9,6 +9,7 @@ message Bootstrap { Server server = 1; Prometheus prometheus = 2; map node_selectors = 3; + Exporter exporter = 4; } message Server { @@ -31,3 +32,14 @@ message Prometheus { string timeout = 2; string auth = 3; } + +// Exporter controls the background /metrics collector that periodically refreshes the +// in-memory Prometheus registry from upstream Prometheus / VictoriaMetrics. The /metrics +// HTTP handler only serializes that registry, so it is no longer tied to the per-request +// HTTP timeout and no longer fans out PromQL on every scrape. +message Exporter { + // Interval between two refresh cycles. Defaults to 30s if unset. + google.protobuf.Duration interval = 1; + // Hard cap on a single refresh cycle. Defaults to 60s if unset. + google.protobuf.Duration timeout = 2; +} diff --git a/server/internal/data/node.go b/server/internal/data/node.go index 5a1eef27..4f59ab6e 100644 --- a/server/internal/data/node.go +++ b/server/internal/data/node.go @@ -98,7 +98,9 @@ func (r *nodeRepo) updateLocalNodes() { } } } + r.mutex.Lock() r.nodes = n + r.mutex.Unlock() } } @@ -156,6 +158,8 @@ func (r *nodeRepo) fetchNodeInfo(node *corev1.Node) *biz.Node { } func (r *nodeRepo) ListAll(context.Context) ([]*biz.Node, error) { + r.mutex.RLock() + defer r.mutex.RUnlock() var nodeList []*biz.Node for _, node := range r.nodes { nodeList = append(nodeList, node) @@ -164,13 +168,18 @@ func (r *nodeRepo) ListAll(context.Context) ([]*biz.Node, error) { } func (r *nodeRepo) GetNode(_ context.Context, uid string) (*biz.Node, error) { - if _, ok := r.nodes[k8stypes.UID(uid)]; !ok { + r.mutex.RLock() + defer r.mutex.RUnlock() + node, ok := r.nodes[k8stypes.UID(uid)] + if !ok { return nil, errors.New("node not found") } - return r.nodes[k8stypes.UID(uid)], nil + return node, nil } func (r *nodeRepo) ListAllDevices(context.Context) ([]*biz.DeviceInfo, error) { + r.mutex.RLock() + defer r.mutex.RUnlock() var deviceList []*biz.DeviceInfo for _, node := range r.nodes { deviceList = append(deviceList, node.Devices...) @@ -179,6 +188,8 @@ func (r *nodeRepo) ListAllDevices(context.Context) ([]*biz.DeviceInfo, error) { } func (r *nodeRepo) FindDeviceByAliasId(aliasId string) (*biz.DeviceInfo, error) { + r.mutex.RLock() + defer r.mutex.RUnlock() for _, node := range r.nodes { for _, d := range node.Devices { if d.AliasId == aliasId { diff --git a/server/internal/data/pod.go b/server/internal/data/pod.go index 1cb2db78..492e26af 100644 --- a/server/internal/data/pod.go +++ b/server/internal/data/pod.go @@ -184,6 +184,8 @@ func (r *podRepo) GetStartTime(pod *corev1.Pod) time.Time { } func (r *podRepo) ListAll(context.Context) ([]*biz.Container, error) { + r.mutex.RLock() + defer r.mutex.RUnlock() var containerList []*biz.Container for _, pod := range r.pods { containerList = append(containerList, pod.Ctrs...) @@ -196,7 +198,13 @@ func (r *podRepo) FindOne(_ context.Context, podUID string, name string) (*biz.C return nil, fmt.Errorf("podUID or name is empty") } - for _, container := range r.pods[k8stypes.UID(podUID)].Ctrs { + r.mutex.RLock() + defer r.mutex.RUnlock() + pod, ok := r.pods[k8stypes.UID(podUID)] + if !ok { + return nil, fmt.Errorf("not found") + } + for _, container := range pod.Ctrs { if container.Name == name { return container, nil } diff --git a/server/internal/exporter/cells.go b/server/internal/exporter/cells.go new file mode 100644 index 00000000..07882b33 --- /dev/null +++ b/server/internal/exporter/cells.go @@ -0,0 +1,97 @@ +package exporter + +import ( + "strings" + + "github.com/prometheus/client_golang/prometheus" +) + +// Diff-based cell tracking for the background /metrics collector. +// +// Why this exists: the old reset()+populate cycle (called from a synchronous HTTP +// handler) was safe because scrape only ever observed the fully-populated state. +// Once the cycle moved to a background goroutine, every Prometheus scrape that +// landed between reset() and the end of populate saw partial / empty data, which +// surfaces in the UI as "vGPU 分配率有时有数据,有时没有数据" — series flickering +// in and out at scrape boundaries. +// +// The fix: instead of wiping the GaugeVec at the start of each cycle, every Set +// goes through MetricsGenerator.set, which both writes the value AND records the +// (gauge, label tuple) it touched. After a cycle completes successfully, we walk +// the previous cycle's recorded set and DeleteLabelValues for any tuple that +// disappeared this round. Existing series are atomically overwritten in place, +// brand-new series appear when their Set runs, vanished series disappear at the +// end-of-cycle prune. There is no window where a known device is missing. + +// cellKey identifies a single observation (gauge vector + concrete label tuple). +// The joined string is just a map-key encoding of the labels; the original +// []string is kept on the cell so we can pass it to DeleteLabelValues. +type cellKey struct { + gauge *prometheus.GaugeVec + joined string +} + +type cell struct { + gauge *prometheus.GaugeVec + labels []string +} + +// labelSep is a 0-byte separator that cannot appear in normal Prometheus label +// values, so strings.Join produces an unambiguous key. +const labelSep = "\x00" + +// set writes value into the gauge and records the (gauge, labels) tuple in the +// current-cycle map. Safe for concurrent use; the collector itself only calls +// it from one goroutine, but we lock anyway in case callers add a parallel pass. +func (s *MetricsGenerator) set(g *prometheus.GaugeVec, value float64, labels ...string) { + g.WithLabelValues(labels...).Set(value) + + k := cellKey{gauge: g, joined: strings.Join(labels, labelSep)} + s.cellMu.Lock() + defer s.cellMu.Unlock() + if s.current == nil { + s.current = make(map[cellKey]cell) + } + // Copy the labels slice — callers reuse the underlying array between iterations. + s.current[k] = cell{gauge: g, labels: append([]string(nil), labels...)} +} + +// commitCycle promotes the current cycle to "previous" and removes any label +// tuple that existed in the previous cycle but not this one. Call ONLY when the +// cycle completed without being cut short by ctx cancellation: pruning on a +// partial map would erroneously delete cells that just weren't re-Set this time. +func (s *MetricsGenerator) commitCycle() { + s.cellMu.Lock() + defer s.cellMu.Unlock() + if s.current == nil { + // Nothing was written this cycle; leave prev intact. + return + } + deleted := 0 + for k, c := range s.prev { + if _, ok := s.current[k]; ok { + continue + } + if c.gauge.DeleteLabelValues(c.labels...) { + deleted++ + } + } + if deleted > 0 { + s.log.Debugw("msg", "pruned stale metric cells", "count", deleted) + } + s.prev = s.current + s.current = nil +} + +// dropCurrentCycle discards the in-progress map without promoting it. Use this +// when a cycle ran into ctx cancellation or any other partial-completion path, +// so the next cycle's prune still references the last KNOWN-GOOD snapshot. +// +// Any partial Set() calls that did land remain in the GaugeVec as freshly +// overwritten cells — that is intentional and harmless, since they only update +// values on label tuples that already existed. +func (s *MetricsGenerator) dropCurrentCycle() { + s.cellMu.Lock() + s.current = nil + s.cellMu.Unlock() +} diff --git a/server/internal/exporter/exporter.go b/server/internal/exporter/exporter.go index 51361457..636831d4 100644 --- a/server/internal/exporter/exporter.go +++ b/server/internal/exporter/exporter.go @@ -6,14 +6,17 @@ import ( "fmt" "math" "strings" + "sync" "time" pb "vgpu/api/v1" "vgpu/internal/biz" + "vgpu/internal/conf" "vgpu/internal/data/prom" "vgpu/internal/provider/metax" "vgpu/internal/provider/mlu" "vgpu/internal/service" + "github.com/go-kratos/kratos/v2/log" "github.com/google/wire" ) @@ -22,12 +25,48 @@ var ProviderSet = wire.NewSet( NewMetricsGenerator, ) +const ( + defaultGenerateInterval = 30 * time.Second + defaultGenerateTimeout = 60 * time.Second +) + +// MetricsGenerator owns the lifecycle of the background /metrics collector. +// +// It satisfies kratos transport.Server: Start spins up a single refresh goroutine that +// periodically re-fans out PromQL into the in-memory Prometheus registry; Stop waits +// for the goroutine to drain. +// +// The HTTP /metrics handler does NOT call GenerateMetrics directly anymore: scrapes +// only serialize whatever the registry currently holds, which is what makes the +// endpoint scrape-timeout safe even at customer-scale fanout (~1800 PromQL/cycle). type MetricsGenerator struct { promClient *prom.Client nodeUsecase *biz.NodeUsecase podUsecase *biz.PodUseCase monitorService *service.MonitorService - cacheTime time.Time + + interval time.Duration + timeout time.Duration + + log *log.Helper + + startOnce sync.Once + stopOnce sync.Once + cancel context.CancelFunc + done chan struct{} + + runMu sync.Mutex + generating bool + + // Diff-based cell tracking. See cells.go for the full rationale. + // current holds tuples written during the in-progress cycle; prev holds the + // last successfully-committed cycle so we know which tuples to delete when a + // device or container disappears. + cellMu sync.Mutex + current map[cellKey]cell + prev map[cellKey]cell + + cacheTime time.Time } // roundToTwoDecimal 将浮点数保留两位小数 @@ -41,38 +80,144 @@ func roundToOneDecimal(value float64) float64 { } func NewMetricsGenerator( + bc *conf.Bootstrap, promClient *prom.Client, nodeUsecase *biz.NodeUsecase, podUsecase *biz.PodUseCase, monitorService *service.MonitorService, + logger log.Logger, ) *MetricsGenerator { + interval, timeout := resolveCollectorIntervals(bc) return &MetricsGenerator{ promClient: promClient, nodeUsecase: nodeUsecase, podUsecase: podUsecase, monitorService: monitorService, + interval: interval, + timeout: timeout, + log: log.NewHelper(log.With(logger, "module", "exporter")), } } -func (s *MetricsGenerator) generatorCache() time.Time { - now := time.Now() - return time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), 0, 0, now.Location()) + +func resolveCollectorIntervals(bc *conf.Bootstrap) (interval, timeout time.Duration) { + interval, timeout = defaultGenerateInterval, defaultGenerateTimeout + if bc == nil || bc.Exporter == nil { + return + } + if d := bc.Exporter.Interval.AsDuration(); d > 0 { + interval = d + } + if d := bc.Exporter.Timeout.AsDuration(); d > 0 { + timeout = d + } + return +} + +// Start launches the background collector loop. It is invoked once by the kratos +// app lifecycle. The loop deliberately runs against context.Background() so a +// graceful shutdown is the only thing that can cancel it; per-cycle ctx is bounded +// by s.timeout. +func (s *MetricsGenerator) Start(_ context.Context) error { + s.startOnce.Do(func() { + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + s.done = make(chan struct{}) + s.log.Infow("msg", "starting metrics collector", "interval", s.interval, "timeout", s.timeout) + go s.loop(ctx) + }) + return nil +} + +// Stop signals the loop to exit and waits for it to drain or for the caller's +// ctx to expire. +func (s *MetricsGenerator) Stop(ctx context.Context) error { + s.stopOnce.Do(func() { + if s.cancel != nil { + s.cancel() + } + if s.done != nil { + select { + case <-s.done: + case <-ctx.Done(): + s.log.Warn("msg", "metrics collector did not stop before shutdown deadline") + } + } + }) + return nil +} + +func (s *MetricsGenerator) loop(ctx context.Context) { + defer close(s.done) + s.runOnce(ctx) + t := time.NewTicker(s.interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + s.runOnce(ctx) + } + } } -func (s *MetricsGenerator) cacheIsValidate() bool { - if s.cacheTime == s.generatorCache() { - return true +// runOnce executes a single refresh cycle with a bounded context. It refuses to +// overlap cycles: a cycle that runs longer than s.interval simply skips the next +// tick instead of piling up parallel PromQL fanouts onto vmselect. +func (s *MetricsGenerator) runOnce(parent context.Context) { + s.runMu.Lock() + if s.generating { + s.runMu.Unlock() + s.log.Warnw("msg", "skip metrics refresh: previous cycle still running") + return + } + s.generating = true + s.runMu.Unlock() + defer func() { + s.runMu.Lock() + s.generating = false + s.runMu.Unlock() + }() + + ctx, cancel := context.WithTimeout(parent, s.timeout) + defer cancel() + start := time.Now() + if err := s.GenerateMetrics(ctx); err != nil { + // Partial cycle (ctx canceled, upstream error, etc.): keep last known good + // snapshot intact so the registry doesn't briefly lose cells that simply + // could not be re-queried this round. + s.dropCurrentCycle() + s.log.Errorw("msg", "metrics refresh failed (partial cycle, last-known cells retained)", "err", err, "elapsed", time.Since(start)) + return } - return false + s.commitCycle() + s.log.Debugw("msg", "metrics refresh complete", "elapsed", time.Since(start)) } +// GenerateMetrics runs one full fanout cycle (device + container dimensions) into +// the in-memory Prometheus registry. It is exported so it can be unit-tested and +// triggered manually if needed, but on the request path it must never be called +// synchronously: the registry is the only thing /metrics serves. +// +// The function intentionally does NOT clear existing gauges. Each Set goes +// through s.set, which both writes the value and records the (gauge, labels) +// tuple. runOnce decides whether to commit the new snapshot (delete tuples that +// disappeared) or drop it (keep the previous snapshot intact). See cells.go. func (s *MetricsGenerator) GenerateMetrics(ctx context.Context) error { - //if s.cacheIsValidate() { - // return nil - //} - reset() // 重置所有指标缓存值 - s.GenerateDeviceMetrics(ctx) // 卡维度指标 - s.GenerateContainerMetrics(ctx) // 任务维度指标 - s.cacheTime = s.generatorCache() + s.cellMu.Lock() + s.current = make(map[cellKey]cell) + s.cellMu.Unlock() + + s.GenerateDeviceMetrics(ctx) + s.GenerateContainerMetrics(ctx) + + // Surface ctx cancellation so runOnce treats this as a partial cycle and + // preserves the prior snapshot. Without this guard, a mid-cycle timeout + // would silently commit an incomplete map and erase real series. + if err := ctx.Err(); err != nil { + return err + } + s.cacheTime = time.Now() return nil } @@ -93,56 +238,56 @@ func (s *MetricsGenerator) GenerateDeviceMetrics(ctx context.Context) error { } // 分配率指标 - HamiVgpuCount.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Count)) - HamiVmemorySize.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Devmem)) - HamiVcoreSize.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Devcore)) + s.set(HamiVgpuCount, float64(device.Count), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) + s.set(HamiVmemorySize, float64(device.Devmem), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) + s.set(HamiVcoreSize, float64(device.Devcore), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) // 超配比指标 - HamiVCoreScaling.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Devcore) / 100) - HamiCoreSize.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(100) + s.set(HamiVCoreScaling, float64(device.Devcore)/100, device.NodeName, provider, device.Type, device.Id, driver, deviceNo) + s.set(HamiCoreSize, 100, device.NodeName, provider, device.Type, device.Id, driver, deviceNo) deviceMemUsed, err := s.deviceMemUsed(ctx, provider, device.Id) if err == nil { - HamiMemoryUsed.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(deviceMemUsed)) + s.set(HamiMemoryUsed, float64(deviceMemUsed), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } - HamiMemorySize.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Devmem)) - HamiMemoryUtil.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(roundToOneDecimal(100 * float64(deviceMemUsed/float32(device.Devmem)))) + s.set(HamiMemorySize, float64(device.Devmem), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) + s.set(HamiMemoryUtil, roundToOneDecimal(100*float64(deviceMemUsed/float32(device.Devmem))), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) deviceMemSize, err := s.deviceMemTotal(ctx, provider, device.Id) if err == nil && deviceMemSize > 0 { - HamiVMemoryScaling.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(roundToOneDecimal(float64(float32(device.Devmem) / deviceMemSize))) + s.set(HamiVMemoryScaling, roundToOneDecimal(float64(float32(device.Devmem)/deviceMemSize)), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } actualCoreUtil, err := s.deviceCoreUtil(ctx, provider, device.Id) if err == nil { - HamiCoreUsed.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(actualCoreUtil)) - HamiCoreUtil.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(actualCoreUtil)) + s.set(HamiCoreUsed, float64(actualCoreUtil), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) + s.set(HamiCoreUtil, float64(actualCoreUtil), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } actualCoreUtilAvg, err := s.deviceCoreUtil(ctx, provider, device.Id) if err == nil { - HamiCoreUsedAvg.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(actualCoreUtilAvg)) - HamiCoreUtilAvg.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(actualCoreUtilAvg)) + s.set(HamiCoreUsedAvg, float64(actualCoreUtilAvg), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) + s.set(HamiCoreUtilAvg, float64(actualCoreUtilAvg), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } gpuTemperature, err := s.gpuTemperature(ctx, provider, device.Id) if err == nil { - HamiDeviceTemperature.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(gpuTemperature)) + s.set(HamiDeviceTemperature, float64(gpuTemperature), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } memoryTemperature, err := s.memoryTemperature(ctx, provider, device.Id) if err == nil { - HamiDeviceMemoryTemperature.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(memoryTemperature)) + s.set(HamiDeviceMemoryTemperature, float64(memoryTemperature), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } gpuPower, err := s.gpuPower(ctx, provider, device.Id) if err == nil { - HamiDevicePower.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(gpuPower)) + s.set(HamiDevicePower, float64(gpuPower), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } fanSpeed, err := s.fanSpeed(ctx, provider, device.Id) if err == nil { switch provider { case biz.NvidiaGPUDevice: - HamiDeviceFanSpeedP.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(fanSpeed)) + s.set(HamiDeviceFanSpeedP, float64(fanSpeed), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) case biz.CambriconGPUDevice: - HamiDeviceFanSpeedR.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(fanSpeed)) + s.set(HamiDeviceFanSpeedR, float64(fanSpeed), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } } gpuHardwareHealth, err := s.gpuHardwareHealth(ctx, provider, device.Id) if err == nil { - HamiDeviceHardwareHealth.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(gpuHardwareHealth)) + s.set(HamiDeviceHardwareHealth, float64(gpuHardwareHealth), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } } return nil @@ -180,13 +325,14 @@ func (s *MetricsGenerator) generateMetricsForMetaxGPU(containers []*biz.Containe deviceType := res.Data[i].Metric["modelName"] nodeName := res.Data[i].Metric["Hostname"] - HamiContainerVgpuAllocated.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(1)) - HamiContainerVmemoryAllocated.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(memory[i])) - HamiContainerVcoreAllocated.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(core[i])) + podUIDLabel := fmt.Sprintf("%s:%s", c.Name, c.PodUID) + s.set(HamiContainerVgpuAllocated, float64(1), nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace, podUIDLabel) + s.set(HamiContainerVmemoryAllocated, float64(memory[i]), nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace, podUIDLabel) + s.set(HamiContainerVcoreAllocated, float64(core[i]), nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace, podUIDLabel) taskMemoryUsed := float32(res.Data[i].Value) // KB - HamiContainerMemoryUsed.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace).Set(float64(taskMemoryUsed / 1024)) - HamiContainerMemoryUtil.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace).Set(roundToOneDecimal(100 * float64(taskMemoryUsed/1024) / float64(memory[i]))) + s.set(HamiContainerMemoryUsed, float64(taskMemoryUsed/1024), nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace) + s.set(HamiContainerMemoryUtil, roundToOneDecimal(100*float64(taskMemoryUsed/1024)/float64(memory[i])), nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace) taskCoreUsed, err := s.taskCoreUsed(context.TODO(), metax.MetaxGPUDevice, c.Namespace, c.PodName, c.Name, c.PodUID, deviceUUID, nodeName, -1) if err == nil { @@ -197,8 +343,8 @@ func (s *MetricsGenerator) generateMetricsForMetaxGPU(containers []*biz.Containe used = float64(cardCoreUtil) / 100 * float64(core[i]) util = float64(cardCoreUtil) } - HamiContainerCoreUsed.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace).Set(used) - HamiContainerCoreUtil.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace).Set(util) + s.set(HamiContainerCoreUsed, used, nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace) + s.set(HamiContainerCoreUtil, util, nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace) } } } @@ -234,9 +380,10 @@ func (s *MetricsGenerator) GenerateContainerMetrics(ctx context.Context) error { if provider == "" || provider == metax.MetaxGPUDevice { continue } - HamiContainerVgpuAllocated.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(vGPU)) - HamiContainerVmemoryAllocated.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(memory)) - HamiContainerVcoreAllocated.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(core)) + podUIDLabel := fmt.Sprintf("%s:%s", c.Name, c.PodUID) + s.set(HamiContainerVgpuAllocated, float64(vGPU), device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, podUIDLabel) + s.set(HamiContainerVmemoryAllocated, float64(memory), device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, podUIDLabel) + s.set(HamiContainerVcoreAllocated, float64(core), device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, podUIDLabel) // 查询任务在当前设备下的算力利用率 taskCoreUsed, err := s.taskCoreUsed(ctx, provider, c.Namespace, c.PodName, c.Name, c.PodUID, device.Id, device.NodeName, device.Index) if err == nil { @@ -262,8 +409,8 @@ func (s *MetricsGenerator) GenerateContainerMetrics(ctx context.Context) error { used = float64(cardCoreUtil) / 100 * float64(core) util = float64(cardCoreUtil) } - HamiContainerCoreUsed.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace).Set(used) - HamiContainerCoreUtil.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace).Set(util) + s.set(HamiContainerCoreUsed, used, device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace) + s.set(HamiContainerCoreUtil, util, device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace) } taskMemoryUsed, err := s.taskMemoryUsed(ctx, provider, c.Namespace, c.PodName, c.Name, c.PodUID, device.Id, device.NodeName, device.Index) if err == nil { @@ -274,8 +421,8 @@ func (s *MetricsGenerator) GenerateContainerMetrics(ctx context.Context) error { taskMemoryUsed = float32(taskMemoryUsed) * 1024 // KB->Byte default: } - HamiContainerMemoryUsed.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace).Set(float64(taskMemoryUsed / 1024 / 1024)) - HamiContainerMemoryUtil.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace).Set(roundToOneDecimal(100 * float64(taskMemoryUsed/1024/1024) / float64(memory))) + s.set(HamiContainerMemoryUsed, float64(taskMemoryUsed/1024/1024), device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace) + s.set(HamiContainerMemoryUtil, roundToOneDecimal(100*float64(taskMemoryUsed/1024/1024)/float64(memory)), device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace) } } } diff --git a/server/internal/exporter/metrics.go b/server/internal/exporter/metrics.go index c00e1fda..26e460b9 100644 --- a/server/internal/exporter/metrics.go +++ b/server/internal/exporter/metrics.go @@ -43,38 +43,6 @@ func init() { prometheus.MustRegister(HamiSystemComponentHealth) // 系统组件健康状态 } -func reset() { - HamiVCoreScaling.Reset() - HamiVMemoryScaling.Reset() - HamiVgpuCount.Reset() - HamiVmemorySize.Reset() - HamiVcoreSize.Reset() - HamiMemoryUsed.Reset() - HamiMemorySize.Reset() - HamiMemoryUtil.Reset() - HamiCoreSize.Reset() - HamiCoreUsed.Reset() - HamiCoreUtil.Reset() - HamiCoreUsedAvg.Reset() - HamiCoreUtilAvg.Reset() - HamiDeviceTemperature.Reset() - HamiDeviceMemoryTemperature.Reset() - HamiDevicePower.Reset() - HamiDeviceFanSpeedP.Reset() - HamiDeviceFanSpeedR.Reset() - - HamiContainerVgpuAllocated.Reset() - HamiContainerVmemoryAllocated.Reset() - HamiContainerVcoreAllocated.Reset() - HamiContainerMemoryUsed.Reset() - HamiContainerMemoryUtil.Reset() - HamiContainerCoreUsed.Reset() - HamiContainerCoreUtil.Reset() - - HamiPoolVgpuCount.Reset() - HamiPoolVmemorySize.Reset() - HamiPoolVcoreSize.Reset() -} var ( HamiVCoreScaling = prometheus.NewGaugeVec(prometheus.GaugeOpts{ diff --git a/server/internal/server/http.go b/server/internal/server/http.go index 2625c146..73906944 100644 --- a/server/internal/server/http.go +++ b/server/internal/server/http.go @@ -1,9 +1,9 @@ package server import ( + nethttp "net/http" v1 "vgpu/api/v1" "vgpu/internal/conf" - "vgpu/internal/exporter" "vgpu/internal/service" "github.com/go-kratos/kratos/v2/log" @@ -15,12 +15,24 @@ import ( ) // NewHTTPServer new an HTTP server. +// +// The /metrics endpoint here only serializes the in-memory Prometheus registry. +// The registry is refreshed in the background by exporter.MetricsGenerator, which +// is registered as its own kratos transport.Server in main.go. This is what keeps +// scrape latency O(1) and decouples /metrics from server.http.timeout — at customer +// scale a single refresh can fan out ~1800 PromQL queries, which previously raced +// the 1s request deadline and starved vmselect. +// +// /readyz answers 200 once this HTTP server is accepting requests. The backend only +// starts listening after the k8s informer caches have synced (see data.NewNodeRepo / +// NewPodRepo), so a readiness probe on /readyz keeps the pod out of Service endpoints +// until the backend can actually serve — this is what prevents the frontend BFF from +// getting "connection refused" against the not-yet-listening backend at startup. func NewHTTPServer(c *conf.Bootstrap, node *service.NodeService, card *service.CardService, ctr *service.ContainerService, monitor *service.MonitorService, - exporter *exporter.MetricsGenerator, logger log.Logger) *http.Server { var opts = []http.ServerOption{ http.Middleware( @@ -43,10 +55,12 @@ func NewHTTPServer(c *conf.Bootstrap, v1.RegisterContainerHTTPServer(srv, ctr) v1.RegisterMonitorHTTPServer(srv, monitor) srv.HandlePrefix("/q/", openapiv2.NewHandler()) - srv.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { - exporter.GenerateMetrics(r.Context()) - //mock.MockMetrics(r.Context()) - promhttp.Handler().ServeHTTP(w, r) + srv.Handle("/metrics", promhttp.Handler()) + srv.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { + // Reaching this handler means the server is listening and the informer + // caches have already synced (the backend only starts serving after that). + w.WriteHeader(nethttp.StatusOK) + _, _ = w.Write([]byte("ok")) }) return srv } diff --git a/server/internal/service/card.go b/server/internal/service/card.go index 155ba756..4d3ec2b4 100644 --- a/server/internal/service/card.go +++ b/server/internal/service/card.go @@ -2,7 +2,6 @@ package service import ( "context" - "fmt" "sort" "strings" pb "vgpu/api/v1" @@ -27,6 +26,22 @@ func (s *CardService) GetAllGPUs(ctx context.Context, req *pb.GetAllGpusReq) (*p if err != nil { return nil, err } + + // Fetch the container list once. StatisticsByDeviceId re-lists all containers + // on every call, so calling it per device turned this into an O(devices) + // re-scan; at large scale (100+ cards) that alone was seconds of work. + containers, err := s.pod.ListAllContainers(ctx) + if err != nil { + return nil, err + } + + // Pull hami_core_size / hami_memory_size for ALL devices in two queries keyed + // by device_uuid, instead of two PromQL per device. The old per-device fanout + // meant ~2*N serial instant queries against Prometheus / VictoriaMetrics + // (300+ at large scale), which made the card list spin for several seconds. + coreSizeByUUID := s.queryGaugeByLabel(ctx, "avg(hami_core_size) by (device_uuid)", "device_uuid") + memSizeByUUID := s.queryGaugeByLabel(ctx, "avg(hami_memory_size) by (device_uuid)", "device_uuid") + var res = &pb.GPUsReply{List: []*pb.GPUReply{}} for _, device := range deviceInfos { gpu := &pb.GPUReply{} @@ -52,19 +67,16 @@ func (s *CardService) GetAllGPUs(ctx context.Context, req *pb.GetAllGpusReq) (*p gpu.Health = device.Health gpu.Mode = device.Mode - vGPU, core, memory, err := s.pod.StatisticsByDeviceId(ctx, device.AliasId) - if err == nil { - gpu.VgpuUsed = vGPU - gpu.CoreUsed = core - gpu.MemoryUsed = memory - } - resp, err := s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: fmt.Sprintf("avg(hami_core_size{device_uuid=~\"%s\"})", device.Id)}) - if err == nil && len(resp.Data) > 0 { - gpu.CoreTotal = int32(resp.Data[0].Value) + vGPU, core, memory := biz.ContainersStatisticsInfo(containers, device.AliasId) + gpu.VgpuUsed = vGPU + gpu.CoreUsed = core + gpu.MemoryUsed = memory + + if v, ok := coreSizeByUUID[device.Id]; ok { + gpu.CoreTotal = v } - resp, err = s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: fmt.Sprintf("avg(hami_memory_size{device_uuid=~\"%s\"})", device.Id)}) - if err == nil && len(resp.Data) > 0 { - gpu.MemoryTotal = int32(resp.Data[0].Value) + if v, ok := memSizeByUUID[device.Id]; ok { + gpu.MemoryTotal = v } res.List = append(res.List, gpu) } @@ -75,6 +87,25 @@ func (s *CardService) GetAllGPUs(ctx context.Context, req *pb.GetAllGpusReq) (*p return res, nil } +// queryGaugeByLabel runs a single instant query and returns the result values +// keyed by the given label, so callers can batch what used to be per-entity +// lookups into one round-trip to Prometheus / VictoriaMetrics. +func (s *CardService) queryGaugeByLabel(ctx context.Context, query, label string) map[string]int32 { + out := map[string]int32{} + resp, err := s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: query}) + if err != nil { + return out + } + for _, sample := range resp.Data { + key := sample.Metric[label] + if key == "" { + continue + } + out[key] = int32(sample.Value) + } + return out +} + func (s *CardService) GetAllGPUTypes(ctx context.Context, req *pb.GetAllGpusReq) (*pb.GPUsReply, error) { deviceInfos, err := s.node.ListAllDevices(ctx) if err != nil { diff --git a/server/internal/service/node.go b/server/internal/service/node.go index 0dafb379..37b43d1c 100644 --- a/server/internal/service/node.go +++ b/server/internal/service/node.go @@ -2,7 +2,6 @@ package service import ( "context" - "fmt" "sort" "strconv" "vgpu/internal/biz" @@ -42,17 +41,26 @@ func (s *NodeService) GetAllNodes(ctx context.Context, req *pb.GetAllNodesReq) ( return nil, err } + // Fetch containers once (StatisticsByDeviceId used to re-list per device) and + // pull the per-node core/memory totals in two queries keyed by node, instead + // of two PromQL per node. This is what made /v1/nodes take 3-4s — and it is + // also called for the workload page's node filter dropdown. + containers, err := s.pod.ListAllContainers(ctx) + if err != nil { + return nil, err + } + coreByNode := s.queryNodeGauge(ctx, "avg(sum(hami_core_size) by (node, instance)) by (node)") + memByNode := s.queryNodeGauge(ctx, "avg(sum(hami_memory_size) by (node, instance)) by (node)") + var res = &pb.NodesReply{List: []*pb.NodeReply{}} for _, node := range nodes { - nodeReply, err := s.buildNodeReply(ctx, node) - if err != nil { - return nil, err - } + nodeReply := s.buildNodeReply(node, containers) - coreTotal, memoryTotal, err := s.queryNodeMetrics(ctx, node.Name) - if err == nil { - nodeReply.CoreTotal = coreTotal - nodeReply.MemoryTotal = memoryTotal + if v, ok := coreByNode[node.Name]; ok { + nodeReply.CoreTotal = v + } + if v, ok := memByNode[node.Name]; ok { + nodeReply.MemoryTotal = v } if filters.Ip != "" && filters.Ip != nodeReply.Ip { @@ -85,10 +93,18 @@ func (s *NodeService) GetNode(ctx context.Context, req *pb.GetNodeReq) (*pb.Node return nil, err } - return s.buildNodeReply(ctx, node) + containers, err := s.pod.ListAllContainers(ctx) + if err != nil { + return nil, err + } + + return s.buildNodeReply(node, containers), nil } -func (s *NodeService) buildNodeReply(ctx context.Context, node *biz.Node) (*pb.NodeReply, error) { +// buildNodeReply assembles a node reply from in-memory state only. The caller +// passes a pre-fetched container list so per-device usage stats reuse one scan +// rather than re-listing all containers for each device. +func (s *NodeService) buildNodeReply(node *biz.Node, containers []*biz.Container) *pb.NodeReply { nodeReply := &pb.NodeReply{ Name: node.Name, Uid: node.Uid, @@ -110,32 +126,33 @@ func (s *NodeService) buildNodeReply(ctx context.Context, node *biz.Node) (*pb.N nodeReply.VgpuTotal += device.Count nodeReply.CoreTotal += device.Devcore nodeReply.MemoryTotal += device.Devmem - vGPU, core, memory, err := s.pod.StatisticsByDeviceId(ctx, device.AliasId) - if err == nil { - nodeReply.VgpuUsed += vGPU - nodeReply.CoreUsed += core - nodeReply.MemoryUsed += memory - } + vGPU, core, memory := biz.ContainersStatisticsInfo(containers, device.AliasId) + nodeReply.VgpuUsed += vGPU + nodeReply.CoreUsed += core + nodeReply.MemoryUsed += memory } nodeReply.Type = arrutil.Unique(nodeReply.Type) nodeReply.CardCnt = int32(len(node.Devices)) - return nodeReply, nil + return nodeReply } -func (s *NodeService) queryNodeMetrics(ctx context.Context, nodeName string) (int32, int32, error) { - coreTotal, memoryTotal := int32(0), int32(0) - - resp, err := s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: fmt.Sprintf("avg(sum(hami_core_size{node=~\"%s\"}) by (instance))", nodeName)}) - if err == nil && len(resp.Data) > 0 { - coreTotal = int32(resp.Data[0].Value) +// queryNodeGauge runs a single instant query whose result is grouped by the +// "node" label and returns value-by-node, batching what used to be two PromQL +// per node into one round-trip. +func (s *NodeService) queryNodeGauge(ctx context.Context, query string) map[string]int32 { + out := map[string]int32{} + resp, err := s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: query}) + if err != nil { + return out } - - resp, err = s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: fmt.Sprintf("avg(sum(hami_memory_size{node=~\"%s\"}) by (instance))", nodeName)}) - if err == nil && len(resp.Data) > 0 { - memoryTotal = int32(resp.Data[0].Value) + for _, sample := range resp.Data { + node := sample.Metric["node"] + if node == "" { + continue + } + out[node] = int32(sample.Value) } - - return coreTotal, memoryTotal, nil + return out }