diff --git a/charts/hami-webui/templates/configmap.yaml b/charts/hami-webui/templates/configmap.yaml index 31dceb93..3b6b3280 100644 --- a/charts/hami-webui/templates/configmap.yaml +++ b/charts/hami-webui/templates/configmap.yaml @@ -8,13 +8,16 @@ data: server: http: addr: 0.0.0.0:8000 - timeout: 1s + timeout: {{ .Values.backend.http.timeout | default "60s" }} grpc: addr: 0.0.0.0:9000 - timeout: 1s + timeout: {{ .Values.backend.grpc.timeout | default "60s" }} prometheus: address: {{ ternary .Values.externalPrometheus.address (printf "http://%s-kube-prometh-prometheus.%s.svc.cluster.local:9090" (include "hami-webui.fullname" .) (include "hami-webui.namespace" .)) .Values.externalPrometheus.enabled }} - timeout: 1m + timeout: {{ .Values.externalPrometheus.timeout | default "1m" }} + exporter: + interval: {{ .Values.metricsExporter.interval | default "30s" }} + timeout: {{ .Values.metricsExporter.timeout | default "60s" }} node_selectors: {{- range $key, $value := .Values.vendorNodeSelectors }} {{ $key }}: {{ $value }} diff --git a/charts/hami-webui/templates/deployment.yaml b/charts/hami-webui/templates/deployment.yaml index a0d77b2e..83219b90 100644 --- a/charts/hami-webui/templates/deployment.yaml +++ b/charts/hami-webui/templates/deployment.yaml @@ -59,6 +59,16 @@ spec: args: - "--conf" - "/apps/config/config.yaml" + {{- if .Values.backend.readinessProbe.enabled }} + readinessProbe: + httpGet: + path: /readyz + port: metrics + initialDelaySeconds: {{ .Values.backend.readinessProbe.initialDelaySeconds }} + periodSeconds: {{ .Values.backend.readinessProbe.periodSeconds }} + timeoutSeconds: {{ .Values.backend.readinessProbe.timeoutSeconds }} + failureThreshold: {{ .Values.backend.readinessProbe.failureThreshold }} + {{- end }} resources: {{- toYaml .Values.resources.backend | nindent 12 }} volumeMounts: diff --git a/charts/hami-webui/values.yaml b/charts/hami-webui/values.yaml index c18ffca9..e7c93b43 100644 --- a/charts/hami-webui/values.yaml +++ b/charts/hami-webui/values.yaml @@ -162,4 +162,44 @@ kube-prometheus-stack: externalPrometheus: enabled: false # If externalPrometheus.enabled is true, this address will be used - address: "http://prometheus-kube-prometheus-prometheus.prometheus.svc.cluster.local:9090" \ No newline at end of file + address: "http://prometheus-kube-prometheus-prometheus.prometheus.svc.cluster.local:9090" + # Single PromQL upstream timeout (sent to Prometheus / VictoriaMetrics as the &timeout= parameter). + timeout: "1m" + +# Kratos server timeouts. These bound the deadline placed on each incoming HTTP/gRPC +# request context. They no longer affect /metrics generation (which runs in the background), +# but they still gate the page-side APIs, some of which legitimately take a few seconds +# against a large Prometheus / VictoriaMetrics cluster. +backend: + http: + timeout: "60s" + grpc: + timeout: "60s" + # Readiness probe on the backend's /readyz. The backend only starts listening + # after its k8s informer caches have synced, so gating the pod on /readyz keeps + # it out of the Service until the backend can serve — this prevents the frontend + # BFF from hitting "connection refused" against the not-yet-listening backend + # during startup. failureThreshold * periodSeconds should comfortably exceed the + # worst-case informer sync time on a large cluster. + readinessProbe: + enabled: true + initialDelaySeconds: 5 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 60 + +# Background /metrics collector. The collector runs independently of Prometheus scrape; +# scrapes only serialize the in-memory registry, so they are O(1) and immune to scrape +# timeouts. Tune interval if you need fresher data or want to reduce upstream load further. +metricsExporter: + # How often the collector refreshes hami_* metrics from the upstream Prometheus / VictoriaMetrics. + interval: "30s" + # Hard cap on a single refresh cycle. Should be >= externalPrometheus.timeout and < interval * 2. + timeout: "60s" + +# The frontend axios client timeout (default 60000ms) is compiled into the JS +# bundle at image build time from the VUE_APP_REQUEST_TIMEOUT env var, as Vite +# evaluates process.env at build, not at runtime. To change it for a deployed +# cluster you need to rebuild packages/web with a different VUE_APP_REQUEST_TIMEOUT +# (set in packages/web/.env.production or as a build-arg). It cannot be tuned +# through this values.yaml. \ No newline at end of file diff --git a/packages/web/.env.development b/packages/web/.env.development index e4017861..0db5fb6b 100644 --- a/packages/web/.env.development +++ b/packages/web/.env.development @@ -4,3 +4,6 @@ ENV = 'development' # base api VUE_APP_BASE_API = '/' +# axios global timeout in ms; override via build env or chart values.frontend.requestTimeout +VUE_APP_REQUEST_TIMEOUT = 60000 + diff --git a/packages/web/.env.production b/packages/web/.env.production index 30313bf1..d889738f 100644 --- a/packages/web/.env.production +++ b/packages/web/.env.production @@ -3,3 +3,6 @@ ENV = 'production' # base api VUE_APP_BASE_API = './' + +# axios global timeout in ms; override via build env or chart values.frontend.requestTimeout +VUE_APP_REQUEST_TIMEOUT = 60000 diff --git a/packages/web/src/utils/request.js b/packages/web/src/utils/request.js index f41db603..bd62594a 100644 --- a/packages/web/src/utils/request.js +++ b/packages/web/src/utils/request.js @@ -3,9 +3,17 @@ import axios from 'axios'; import { ElMessage, ElMessageBox, ElNotification } from 'element-plus'; import i18n from '@/locales'; +// Default request timeout in ms. Override at build time via VUE_APP_REQUEST_TIMEOUT +// (injected through .env.* or chart values.frontend.requestTimeout). 60s is large +// enough for the slowest known page-side API (/v1/nodes can take a few seconds +// against a large VictoriaMetrics cluster) while still bounding hung requests. +const DEFAULT_REQUEST_TIMEOUT = 60000; +const requestTimeout = + Number.parseInt(process.env.VUE_APP_REQUEST_TIMEOUT, 10) || DEFAULT_REQUEST_TIMEOUT; + const service = axios.create({ baseURL: process.env.VUE_APP_BASE_API, // url = base url + request url - timeout: 5000, + timeout: requestTimeout, validateStatus: function (status) { return (status >= 200 && status < 300) || status > 520; }, diff --git a/server/cmd/server/main.go b/server/cmd/server/main.go index 59feda46..2ff8fdc2 100644 --- a/server/cmd/server/main.go +++ b/server/cmd/server/main.go @@ -9,6 +9,7 @@ import ( "github.com/go-kratos/kratos/v2/transport/http" "os" "vgpu/internal/conf" + "vgpu/internal/exporter" _ "go.uber.org/automaxprocs" ) @@ -44,7 +45,9 @@ func main() { } } -func newApp(ctx context.Context, logger log.Logger, gs *grpc.Server, hs *http.Server) *kratos.App { +// newApp wires the background metrics collector in as a kratos transport.Server so +// its goroutine is started and stopped by the app lifecycle, alongside HTTP and gRPC. +func newApp(ctx context.Context, logger log.Logger, gs *grpc.Server, hs *http.Server, mc *exporter.MetricsGenerator) *kratos.App { return kratos.New( kratos.Context(ctx), kratos.ID(id), @@ -55,6 +58,7 @@ func newApp(ctx context.Context, logger log.Logger, gs *grpc.Server, hs *http.Se kratos.Server( gs, hs, + mc, ), ) } diff --git a/server/config/config.yaml b/server/config/config.yaml index 4f481e77..e46fe58a 100644 --- a/server/config/config.yaml +++ b/server/config/config.yaml @@ -1,16 +1,19 @@ server: http: addr: 0.0.0.0:8000 - timeout: 1s + timeout: 60s grpc: addr: 0.0.0.0:9000 - timeout: 1s + timeout: 60s prometheus: address: http://localhost:9090 timeout: 1m +exporter: + interval: 30s + timeout: 60s node_selectors: NVIDIA: gpu=on Ascend: ascend=on DCU: dcu=on MLU: mlu=on - Metax: metax-tech.com/gpu.installed=true \ No newline at end of file + Metax: metax-tech.com/gpu.installed=true diff --git a/server/internal/conf/conf.proto b/server/internal/conf/conf.proto index 7eb44e68..9ca428fe 100644 --- a/server/internal/conf/conf.proto +++ b/server/internal/conf/conf.proto @@ -9,6 +9,7 @@ message Bootstrap { Server server = 1; Prometheus prometheus = 2; map node_selectors = 3; + Exporter exporter = 4; } message Server { @@ -31,3 +32,14 @@ message Prometheus { string timeout = 2; string auth = 3; } + +// Exporter controls the background /metrics collector that periodically refreshes the +// in-memory Prometheus registry from upstream Prometheus / VictoriaMetrics. The /metrics +// HTTP handler only serializes that registry, so it is no longer tied to the per-request +// HTTP timeout and no longer fans out PromQL on every scrape. +message Exporter { + // Interval between two refresh cycles. Defaults to 30s if unset. + google.protobuf.Duration interval = 1; + // Hard cap on a single refresh cycle. Defaults to 60s if unset. + google.protobuf.Duration timeout = 2; +} diff --git a/server/internal/data/node.go b/server/internal/data/node.go index 5a1eef27..4f59ab6e 100644 --- a/server/internal/data/node.go +++ b/server/internal/data/node.go @@ -98,7 +98,9 @@ func (r *nodeRepo) updateLocalNodes() { } } } + r.mutex.Lock() r.nodes = n + r.mutex.Unlock() } } @@ -156,6 +158,8 @@ func (r *nodeRepo) fetchNodeInfo(node *corev1.Node) *biz.Node { } func (r *nodeRepo) ListAll(context.Context) ([]*biz.Node, error) { + r.mutex.RLock() + defer r.mutex.RUnlock() var nodeList []*biz.Node for _, node := range r.nodes { nodeList = append(nodeList, node) @@ -164,13 +168,18 @@ func (r *nodeRepo) ListAll(context.Context) ([]*biz.Node, error) { } func (r *nodeRepo) GetNode(_ context.Context, uid string) (*biz.Node, error) { - if _, ok := r.nodes[k8stypes.UID(uid)]; !ok { + r.mutex.RLock() + defer r.mutex.RUnlock() + node, ok := r.nodes[k8stypes.UID(uid)] + if !ok { return nil, errors.New("node not found") } - return r.nodes[k8stypes.UID(uid)], nil + return node, nil } func (r *nodeRepo) ListAllDevices(context.Context) ([]*biz.DeviceInfo, error) { + r.mutex.RLock() + defer r.mutex.RUnlock() var deviceList []*biz.DeviceInfo for _, node := range r.nodes { deviceList = append(deviceList, node.Devices...) @@ -179,6 +188,8 @@ func (r *nodeRepo) ListAllDevices(context.Context) ([]*biz.DeviceInfo, error) { } func (r *nodeRepo) FindDeviceByAliasId(aliasId string) (*biz.DeviceInfo, error) { + r.mutex.RLock() + defer r.mutex.RUnlock() for _, node := range r.nodes { for _, d := range node.Devices { if d.AliasId == aliasId { diff --git a/server/internal/data/pod.go b/server/internal/data/pod.go index 1cb2db78..492e26af 100644 --- a/server/internal/data/pod.go +++ b/server/internal/data/pod.go @@ -184,6 +184,8 @@ func (r *podRepo) GetStartTime(pod *corev1.Pod) time.Time { } func (r *podRepo) ListAll(context.Context) ([]*biz.Container, error) { + r.mutex.RLock() + defer r.mutex.RUnlock() var containerList []*biz.Container for _, pod := range r.pods { containerList = append(containerList, pod.Ctrs...) @@ -196,7 +198,13 @@ func (r *podRepo) FindOne(_ context.Context, podUID string, name string) (*biz.C return nil, fmt.Errorf("podUID or name is empty") } - for _, container := range r.pods[k8stypes.UID(podUID)].Ctrs { + r.mutex.RLock() + defer r.mutex.RUnlock() + pod, ok := r.pods[k8stypes.UID(podUID)] + if !ok { + return nil, fmt.Errorf("not found") + } + for _, container := range pod.Ctrs { if container.Name == name { return container, nil } diff --git a/server/internal/exporter/cells.go b/server/internal/exporter/cells.go new file mode 100644 index 00000000..07882b33 --- /dev/null +++ b/server/internal/exporter/cells.go @@ -0,0 +1,97 @@ +package exporter + +import ( + "strings" + + "github.com/prometheus/client_golang/prometheus" +) + +// Diff-based cell tracking for the background /metrics collector. +// +// Why this exists: the old reset()+populate cycle (called from a synchronous HTTP +// handler) was safe because scrape only ever observed the fully-populated state. +// Once the cycle moved to a background goroutine, every Prometheus scrape that +// landed between reset() and the end of populate saw partial / empty data, which +// surfaces in the UI as "vGPU 分配率有时有数据,有时没有数据" — series flickering +// in and out at scrape boundaries. +// +// The fix: instead of wiping the GaugeVec at the start of each cycle, every Set +// goes through MetricsGenerator.set, which both writes the value AND records the +// (gauge, label tuple) it touched. After a cycle completes successfully, we walk +// the previous cycle's recorded set and DeleteLabelValues for any tuple that +// disappeared this round. Existing series are atomically overwritten in place, +// brand-new series appear when their Set runs, vanished series disappear at the +// end-of-cycle prune. There is no window where a known device is missing. + +// cellKey identifies a single observation (gauge vector + concrete label tuple). +// The joined string is just a map-key encoding of the labels; the original +// []string is kept on the cell so we can pass it to DeleteLabelValues. +type cellKey struct { + gauge *prometheus.GaugeVec + joined string +} + +type cell struct { + gauge *prometheus.GaugeVec + labels []string +} + +// labelSep is a 0-byte separator that cannot appear in normal Prometheus label +// values, so strings.Join produces an unambiguous key. +const labelSep = "\x00" + +// set writes value into the gauge and records the (gauge, labels) tuple in the +// current-cycle map. Safe for concurrent use; the collector itself only calls +// it from one goroutine, but we lock anyway in case callers add a parallel pass. +func (s *MetricsGenerator) set(g *prometheus.GaugeVec, value float64, labels ...string) { + g.WithLabelValues(labels...).Set(value) + + k := cellKey{gauge: g, joined: strings.Join(labels, labelSep)} + s.cellMu.Lock() + defer s.cellMu.Unlock() + if s.current == nil { + s.current = make(map[cellKey]cell) + } + // Copy the labels slice — callers reuse the underlying array between iterations. + s.current[k] = cell{gauge: g, labels: append([]string(nil), labels...)} +} + +// commitCycle promotes the current cycle to "previous" and removes any label +// tuple that existed in the previous cycle but not this one. Call ONLY when the +// cycle completed without being cut short by ctx cancellation: pruning on a +// partial map would erroneously delete cells that just weren't re-Set this time. +func (s *MetricsGenerator) commitCycle() { + s.cellMu.Lock() + defer s.cellMu.Unlock() + if s.current == nil { + // Nothing was written this cycle; leave prev intact. + return + } + deleted := 0 + for k, c := range s.prev { + if _, ok := s.current[k]; ok { + continue + } + if c.gauge.DeleteLabelValues(c.labels...) { + deleted++ + } + } + if deleted > 0 { + s.log.Debugw("msg", "pruned stale metric cells", "count", deleted) + } + s.prev = s.current + s.current = nil +} + +// dropCurrentCycle discards the in-progress map without promoting it. Use this +// when a cycle ran into ctx cancellation or any other partial-completion path, +// so the next cycle's prune still references the last KNOWN-GOOD snapshot. +// +// Any partial Set() calls that did land remain in the GaugeVec as freshly +// overwritten cells — that is intentional and harmless, since they only update +// values on label tuples that already existed. +func (s *MetricsGenerator) dropCurrentCycle() { + s.cellMu.Lock() + s.current = nil + s.cellMu.Unlock() +} diff --git a/server/internal/exporter/exporter.go b/server/internal/exporter/exporter.go index 51361457..636831d4 100644 --- a/server/internal/exporter/exporter.go +++ b/server/internal/exporter/exporter.go @@ -6,14 +6,17 @@ import ( "fmt" "math" "strings" + "sync" "time" pb "vgpu/api/v1" "vgpu/internal/biz" + "vgpu/internal/conf" "vgpu/internal/data/prom" "vgpu/internal/provider/metax" "vgpu/internal/provider/mlu" "vgpu/internal/service" + "github.com/go-kratos/kratos/v2/log" "github.com/google/wire" ) @@ -22,12 +25,48 @@ var ProviderSet = wire.NewSet( NewMetricsGenerator, ) +const ( + defaultGenerateInterval = 30 * time.Second + defaultGenerateTimeout = 60 * time.Second +) + +// MetricsGenerator owns the lifecycle of the background /metrics collector. +// +// It satisfies kratos transport.Server: Start spins up a single refresh goroutine that +// periodically re-fans out PromQL into the in-memory Prometheus registry; Stop waits +// for the goroutine to drain. +// +// The HTTP /metrics handler does NOT call GenerateMetrics directly anymore: scrapes +// only serialize whatever the registry currently holds, which is what makes the +// endpoint scrape-timeout safe even at customer-scale fanout (~1800 PromQL/cycle). type MetricsGenerator struct { promClient *prom.Client nodeUsecase *biz.NodeUsecase podUsecase *biz.PodUseCase monitorService *service.MonitorService - cacheTime time.Time + + interval time.Duration + timeout time.Duration + + log *log.Helper + + startOnce sync.Once + stopOnce sync.Once + cancel context.CancelFunc + done chan struct{} + + runMu sync.Mutex + generating bool + + // Diff-based cell tracking. See cells.go for the full rationale. + // current holds tuples written during the in-progress cycle; prev holds the + // last successfully-committed cycle so we know which tuples to delete when a + // device or container disappears. + cellMu sync.Mutex + current map[cellKey]cell + prev map[cellKey]cell + + cacheTime time.Time } // roundToTwoDecimal 将浮点数保留两位小数 @@ -41,38 +80,144 @@ func roundToOneDecimal(value float64) float64 { } func NewMetricsGenerator( + bc *conf.Bootstrap, promClient *prom.Client, nodeUsecase *biz.NodeUsecase, podUsecase *biz.PodUseCase, monitorService *service.MonitorService, + logger log.Logger, ) *MetricsGenerator { + interval, timeout := resolveCollectorIntervals(bc) return &MetricsGenerator{ promClient: promClient, nodeUsecase: nodeUsecase, podUsecase: podUsecase, monitorService: monitorService, + interval: interval, + timeout: timeout, + log: log.NewHelper(log.With(logger, "module", "exporter")), } } -func (s *MetricsGenerator) generatorCache() time.Time { - now := time.Now() - return time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), 0, 0, now.Location()) + +func resolveCollectorIntervals(bc *conf.Bootstrap) (interval, timeout time.Duration) { + interval, timeout = defaultGenerateInterval, defaultGenerateTimeout + if bc == nil || bc.Exporter == nil { + return + } + if d := bc.Exporter.Interval.AsDuration(); d > 0 { + interval = d + } + if d := bc.Exporter.Timeout.AsDuration(); d > 0 { + timeout = d + } + return +} + +// Start launches the background collector loop. It is invoked once by the kratos +// app lifecycle. The loop deliberately runs against context.Background() so a +// graceful shutdown is the only thing that can cancel it; per-cycle ctx is bounded +// by s.timeout. +func (s *MetricsGenerator) Start(_ context.Context) error { + s.startOnce.Do(func() { + ctx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel + s.done = make(chan struct{}) + s.log.Infow("msg", "starting metrics collector", "interval", s.interval, "timeout", s.timeout) + go s.loop(ctx) + }) + return nil +} + +// Stop signals the loop to exit and waits for it to drain or for the caller's +// ctx to expire. +func (s *MetricsGenerator) Stop(ctx context.Context) error { + s.stopOnce.Do(func() { + if s.cancel != nil { + s.cancel() + } + if s.done != nil { + select { + case <-s.done: + case <-ctx.Done(): + s.log.Warn("msg", "metrics collector did not stop before shutdown deadline") + } + } + }) + return nil +} + +func (s *MetricsGenerator) loop(ctx context.Context) { + defer close(s.done) + s.runOnce(ctx) + t := time.NewTicker(s.interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + s.runOnce(ctx) + } + } } -func (s *MetricsGenerator) cacheIsValidate() bool { - if s.cacheTime == s.generatorCache() { - return true +// runOnce executes a single refresh cycle with a bounded context. It refuses to +// overlap cycles: a cycle that runs longer than s.interval simply skips the next +// tick instead of piling up parallel PromQL fanouts onto vmselect. +func (s *MetricsGenerator) runOnce(parent context.Context) { + s.runMu.Lock() + if s.generating { + s.runMu.Unlock() + s.log.Warnw("msg", "skip metrics refresh: previous cycle still running") + return + } + s.generating = true + s.runMu.Unlock() + defer func() { + s.runMu.Lock() + s.generating = false + s.runMu.Unlock() + }() + + ctx, cancel := context.WithTimeout(parent, s.timeout) + defer cancel() + start := time.Now() + if err := s.GenerateMetrics(ctx); err != nil { + // Partial cycle (ctx canceled, upstream error, etc.): keep last known good + // snapshot intact so the registry doesn't briefly lose cells that simply + // could not be re-queried this round. + s.dropCurrentCycle() + s.log.Errorw("msg", "metrics refresh failed (partial cycle, last-known cells retained)", "err", err, "elapsed", time.Since(start)) + return } - return false + s.commitCycle() + s.log.Debugw("msg", "metrics refresh complete", "elapsed", time.Since(start)) } +// GenerateMetrics runs one full fanout cycle (device + container dimensions) into +// the in-memory Prometheus registry. It is exported so it can be unit-tested and +// triggered manually if needed, but on the request path it must never be called +// synchronously: the registry is the only thing /metrics serves. +// +// The function intentionally does NOT clear existing gauges. Each Set goes +// through s.set, which both writes the value and records the (gauge, labels) +// tuple. runOnce decides whether to commit the new snapshot (delete tuples that +// disappeared) or drop it (keep the previous snapshot intact). See cells.go. func (s *MetricsGenerator) GenerateMetrics(ctx context.Context) error { - //if s.cacheIsValidate() { - // return nil - //} - reset() // 重置所有指标缓存值 - s.GenerateDeviceMetrics(ctx) // 卡维度指标 - s.GenerateContainerMetrics(ctx) // 任务维度指标 - s.cacheTime = s.generatorCache() + s.cellMu.Lock() + s.current = make(map[cellKey]cell) + s.cellMu.Unlock() + + s.GenerateDeviceMetrics(ctx) + s.GenerateContainerMetrics(ctx) + + // Surface ctx cancellation so runOnce treats this as a partial cycle and + // preserves the prior snapshot. Without this guard, a mid-cycle timeout + // would silently commit an incomplete map and erase real series. + if err := ctx.Err(); err != nil { + return err + } + s.cacheTime = time.Now() return nil } @@ -93,56 +238,56 @@ func (s *MetricsGenerator) GenerateDeviceMetrics(ctx context.Context) error { } // 分配率指标 - HamiVgpuCount.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Count)) - HamiVmemorySize.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Devmem)) - HamiVcoreSize.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Devcore)) + s.set(HamiVgpuCount, float64(device.Count), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) + s.set(HamiVmemorySize, float64(device.Devmem), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) + s.set(HamiVcoreSize, float64(device.Devcore), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) // 超配比指标 - HamiVCoreScaling.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Devcore) / 100) - HamiCoreSize.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(100) + s.set(HamiVCoreScaling, float64(device.Devcore)/100, device.NodeName, provider, device.Type, device.Id, driver, deviceNo) + s.set(HamiCoreSize, 100, device.NodeName, provider, device.Type, device.Id, driver, deviceNo) deviceMemUsed, err := s.deviceMemUsed(ctx, provider, device.Id) if err == nil { - HamiMemoryUsed.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(deviceMemUsed)) + s.set(HamiMemoryUsed, float64(deviceMemUsed), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } - HamiMemorySize.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Devmem)) - HamiMemoryUtil.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(roundToOneDecimal(100 * float64(deviceMemUsed/float32(device.Devmem)))) + s.set(HamiMemorySize, float64(device.Devmem), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) + s.set(HamiMemoryUtil, roundToOneDecimal(100*float64(deviceMemUsed/float32(device.Devmem))), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) deviceMemSize, err := s.deviceMemTotal(ctx, provider, device.Id) if err == nil && deviceMemSize > 0 { - HamiVMemoryScaling.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(roundToOneDecimal(float64(float32(device.Devmem) / deviceMemSize))) + s.set(HamiVMemoryScaling, roundToOneDecimal(float64(float32(device.Devmem)/deviceMemSize)), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } actualCoreUtil, err := s.deviceCoreUtil(ctx, provider, device.Id) if err == nil { - HamiCoreUsed.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(actualCoreUtil)) - HamiCoreUtil.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(actualCoreUtil)) + s.set(HamiCoreUsed, float64(actualCoreUtil), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) + s.set(HamiCoreUtil, float64(actualCoreUtil), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } actualCoreUtilAvg, err := s.deviceCoreUtil(ctx, provider, device.Id) if err == nil { - HamiCoreUsedAvg.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(actualCoreUtilAvg)) - HamiCoreUtilAvg.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(actualCoreUtilAvg)) + s.set(HamiCoreUsedAvg, float64(actualCoreUtilAvg), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) + s.set(HamiCoreUtilAvg, float64(actualCoreUtilAvg), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } gpuTemperature, err := s.gpuTemperature(ctx, provider, device.Id) if err == nil { - HamiDeviceTemperature.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(gpuTemperature)) + s.set(HamiDeviceTemperature, float64(gpuTemperature), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } memoryTemperature, err := s.memoryTemperature(ctx, provider, device.Id) if err == nil { - HamiDeviceMemoryTemperature.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(memoryTemperature)) + s.set(HamiDeviceMemoryTemperature, float64(memoryTemperature), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } gpuPower, err := s.gpuPower(ctx, provider, device.Id) if err == nil { - HamiDevicePower.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(gpuPower)) + s.set(HamiDevicePower, float64(gpuPower), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } fanSpeed, err := s.fanSpeed(ctx, provider, device.Id) if err == nil { switch provider { case biz.NvidiaGPUDevice: - HamiDeviceFanSpeedP.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(fanSpeed)) + s.set(HamiDeviceFanSpeedP, float64(fanSpeed), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) case biz.CambriconGPUDevice: - HamiDeviceFanSpeedR.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(fanSpeed)) + s.set(HamiDeviceFanSpeedR, float64(fanSpeed), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } } gpuHardwareHealth, err := s.gpuHardwareHealth(ctx, provider, device.Id) if err == nil { - HamiDeviceHardwareHealth.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(gpuHardwareHealth)) + s.set(HamiDeviceHardwareHealth, float64(gpuHardwareHealth), device.NodeName, provider, device.Type, device.Id, driver, deviceNo) } } return nil @@ -180,13 +325,14 @@ func (s *MetricsGenerator) generateMetricsForMetaxGPU(containers []*biz.Containe deviceType := res.Data[i].Metric["modelName"] nodeName := res.Data[i].Metric["Hostname"] - HamiContainerVgpuAllocated.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(1)) - HamiContainerVmemoryAllocated.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(memory[i])) - HamiContainerVcoreAllocated.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(core[i])) + podUIDLabel := fmt.Sprintf("%s:%s", c.Name, c.PodUID) + s.set(HamiContainerVgpuAllocated, float64(1), nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace, podUIDLabel) + s.set(HamiContainerVmemoryAllocated, float64(memory[i]), nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace, podUIDLabel) + s.set(HamiContainerVcoreAllocated, float64(core[i]), nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace, podUIDLabel) taskMemoryUsed := float32(res.Data[i].Value) // KB - HamiContainerMemoryUsed.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace).Set(float64(taskMemoryUsed / 1024)) - HamiContainerMemoryUtil.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace).Set(roundToOneDecimal(100 * float64(taskMemoryUsed/1024) / float64(memory[i]))) + s.set(HamiContainerMemoryUsed, float64(taskMemoryUsed/1024), nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace) + s.set(HamiContainerMemoryUtil, roundToOneDecimal(100*float64(taskMemoryUsed/1024)/float64(memory[i])), nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace) taskCoreUsed, err := s.taskCoreUsed(context.TODO(), metax.MetaxGPUDevice, c.Namespace, c.PodName, c.Name, c.PodUID, deviceUUID, nodeName, -1) if err == nil { @@ -197,8 +343,8 @@ func (s *MetricsGenerator) generateMetricsForMetaxGPU(containers []*biz.Containe used = float64(cardCoreUtil) / 100 * float64(core[i]) util = float64(cardCoreUtil) } - HamiContainerCoreUsed.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace).Set(used) - HamiContainerCoreUtil.WithLabelValues(nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace).Set(util) + s.set(HamiContainerCoreUsed, used, nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace) + s.set(HamiContainerCoreUtil, util, nodeName, metax.MetaxGPUDevice, deviceType, deviceUUID, c.PodName, c.Name, c.Namespace) } } } @@ -234,9 +380,10 @@ func (s *MetricsGenerator) GenerateContainerMetrics(ctx context.Context) error { if provider == "" || provider == metax.MetaxGPUDevice { continue } - HamiContainerVgpuAllocated.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(vGPU)) - HamiContainerVmemoryAllocated.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(memory)) - HamiContainerVcoreAllocated.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(core)) + podUIDLabel := fmt.Sprintf("%s:%s", c.Name, c.PodUID) + s.set(HamiContainerVgpuAllocated, float64(vGPU), device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, podUIDLabel) + s.set(HamiContainerVmemoryAllocated, float64(memory), device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, podUIDLabel) + s.set(HamiContainerVcoreAllocated, float64(core), device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, podUIDLabel) // 查询任务在当前设备下的算力利用率 taskCoreUsed, err := s.taskCoreUsed(ctx, provider, c.Namespace, c.PodName, c.Name, c.PodUID, device.Id, device.NodeName, device.Index) if err == nil { @@ -262,8 +409,8 @@ func (s *MetricsGenerator) GenerateContainerMetrics(ctx context.Context) error { used = float64(cardCoreUtil) / 100 * float64(core) util = float64(cardCoreUtil) } - HamiContainerCoreUsed.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace).Set(used) - HamiContainerCoreUtil.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace).Set(util) + s.set(HamiContainerCoreUsed, used, device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace) + s.set(HamiContainerCoreUtil, util, device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace) } taskMemoryUsed, err := s.taskMemoryUsed(ctx, provider, c.Namespace, c.PodName, c.Name, c.PodUID, device.Id, device.NodeName, device.Index) if err == nil { @@ -274,8 +421,8 @@ func (s *MetricsGenerator) GenerateContainerMetrics(ctx context.Context) error { taskMemoryUsed = float32(taskMemoryUsed) * 1024 // KB->Byte default: } - HamiContainerMemoryUsed.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace).Set(float64(taskMemoryUsed / 1024 / 1024)) - HamiContainerMemoryUtil.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace).Set(roundToOneDecimal(100 * float64(taskMemoryUsed/1024/1024) / float64(memory))) + s.set(HamiContainerMemoryUsed, float64(taskMemoryUsed/1024/1024), device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace) + s.set(HamiContainerMemoryUtil, roundToOneDecimal(100*float64(taskMemoryUsed/1024/1024)/float64(memory)), device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace) } } } diff --git a/server/internal/exporter/metrics.go b/server/internal/exporter/metrics.go index c00e1fda..26e460b9 100644 --- a/server/internal/exporter/metrics.go +++ b/server/internal/exporter/metrics.go @@ -43,38 +43,6 @@ func init() { prometheus.MustRegister(HamiSystemComponentHealth) // 系统组件健康状态 } -func reset() { - HamiVCoreScaling.Reset() - HamiVMemoryScaling.Reset() - HamiVgpuCount.Reset() - HamiVmemorySize.Reset() - HamiVcoreSize.Reset() - HamiMemoryUsed.Reset() - HamiMemorySize.Reset() - HamiMemoryUtil.Reset() - HamiCoreSize.Reset() - HamiCoreUsed.Reset() - HamiCoreUtil.Reset() - HamiCoreUsedAvg.Reset() - HamiCoreUtilAvg.Reset() - HamiDeviceTemperature.Reset() - HamiDeviceMemoryTemperature.Reset() - HamiDevicePower.Reset() - HamiDeviceFanSpeedP.Reset() - HamiDeviceFanSpeedR.Reset() - - HamiContainerVgpuAllocated.Reset() - HamiContainerVmemoryAllocated.Reset() - HamiContainerVcoreAllocated.Reset() - HamiContainerMemoryUsed.Reset() - HamiContainerMemoryUtil.Reset() - HamiContainerCoreUsed.Reset() - HamiContainerCoreUtil.Reset() - - HamiPoolVgpuCount.Reset() - HamiPoolVmemorySize.Reset() - HamiPoolVcoreSize.Reset() -} var ( HamiVCoreScaling = prometheus.NewGaugeVec(prometheus.GaugeOpts{ diff --git a/server/internal/server/http.go b/server/internal/server/http.go index 2625c146..73906944 100644 --- a/server/internal/server/http.go +++ b/server/internal/server/http.go @@ -1,9 +1,9 @@ package server import ( + nethttp "net/http" v1 "vgpu/api/v1" "vgpu/internal/conf" - "vgpu/internal/exporter" "vgpu/internal/service" "github.com/go-kratos/kratos/v2/log" @@ -15,12 +15,24 @@ import ( ) // NewHTTPServer new an HTTP server. +// +// The /metrics endpoint here only serializes the in-memory Prometheus registry. +// The registry is refreshed in the background by exporter.MetricsGenerator, which +// is registered as its own kratos transport.Server in main.go. This is what keeps +// scrape latency O(1) and decouples /metrics from server.http.timeout — at customer +// scale a single refresh can fan out ~1800 PromQL queries, which previously raced +// the 1s request deadline and starved vmselect. +// +// /readyz answers 200 once this HTTP server is accepting requests. The backend only +// starts listening after the k8s informer caches have synced (see data.NewNodeRepo / +// NewPodRepo), so a readiness probe on /readyz keeps the pod out of Service endpoints +// until the backend can actually serve — this is what prevents the frontend BFF from +// getting "connection refused" against the not-yet-listening backend at startup. func NewHTTPServer(c *conf.Bootstrap, node *service.NodeService, card *service.CardService, ctr *service.ContainerService, monitor *service.MonitorService, - exporter *exporter.MetricsGenerator, logger log.Logger) *http.Server { var opts = []http.ServerOption{ http.Middleware( @@ -43,10 +55,12 @@ func NewHTTPServer(c *conf.Bootstrap, v1.RegisterContainerHTTPServer(srv, ctr) v1.RegisterMonitorHTTPServer(srv, monitor) srv.HandlePrefix("/q/", openapiv2.NewHandler()) - srv.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { - exporter.GenerateMetrics(r.Context()) - //mock.MockMetrics(r.Context()) - promhttp.Handler().ServeHTTP(w, r) + srv.Handle("/metrics", promhttp.Handler()) + srv.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { + // Reaching this handler means the server is listening and the informer + // caches have already synced (the backend only starts serving after that). + w.WriteHeader(nethttp.StatusOK) + _, _ = w.Write([]byte("ok")) }) return srv } diff --git a/server/internal/service/card.go b/server/internal/service/card.go index 155ba756..4d3ec2b4 100644 --- a/server/internal/service/card.go +++ b/server/internal/service/card.go @@ -2,7 +2,6 @@ package service import ( "context" - "fmt" "sort" "strings" pb "vgpu/api/v1" @@ -27,6 +26,22 @@ func (s *CardService) GetAllGPUs(ctx context.Context, req *pb.GetAllGpusReq) (*p if err != nil { return nil, err } + + // Fetch the container list once. StatisticsByDeviceId re-lists all containers + // on every call, so calling it per device turned this into an O(devices) + // re-scan; at large scale (100+ cards) that alone was seconds of work. + containers, err := s.pod.ListAllContainers(ctx) + if err != nil { + return nil, err + } + + // Pull hami_core_size / hami_memory_size for ALL devices in two queries keyed + // by device_uuid, instead of two PromQL per device. The old per-device fanout + // meant ~2*N serial instant queries against Prometheus / VictoriaMetrics + // (300+ at large scale), which made the card list spin for several seconds. + coreSizeByUUID := s.queryGaugeByLabel(ctx, "avg(hami_core_size) by (device_uuid)", "device_uuid") + memSizeByUUID := s.queryGaugeByLabel(ctx, "avg(hami_memory_size) by (device_uuid)", "device_uuid") + var res = &pb.GPUsReply{List: []*pb.GPUReply{}} for _, device := range deviceInfos { gpu := &pb.GPUReply{} @@ -52,19 +67,16 @@ func (s *CardService) GetAllGPUs(ctx context.Context, req *pb.GetAllGpusReq) (*p gpu.Health = device.Health gpu.Mode = device.Mode - vGPU, core, memory, err := s.pod.StatisticsByDeviceId(ctx, device.AliasId) - if err == nil { - gpu.VgpuUsed = vGPU - gpu.CoreUsed = core - gpu.MemoryUsed = memory - } - resp, err := s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: fmt.Sprintf("avg(hami_core_size{device_uuid=~\"%s\"})", device.Id)}) - if err == nil && len(resp.Data) > 0 { - gpu.CoreTotal = int32(resp.Data[0].Value) + vGPU, core, memory := biz.ContainersStatisticsInfo(containers, device.AliasId) + gpu.VgpuUsed = vGPU + gpu.CoreUsed = core + gpu.MemoryUsed = memory + + if v, ok := coreSizeByUUID[device.Id]; ok { + gpu.CoreTotal = v } - resp, err = s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: fmt.Sprintf("avg(hami_memory_size{device_uuid=~\"%s\"})", device.Id)}) - if err == nil && len(resp.Data) > 0 { - gpu.MemoryTotal = int32(resp.Data[0].Value) + if v, ok := memSizeByUUID[device.Id]; ok { + gpu.MemoryTotal = v } res.List = append(res.List, gpu) } @@ -75,6 +87,25 @@ func (s *CardService) GetAllGPUs(ctx context.Context, req *pb.GetAllGpusReq) (*p return res, nil } +// queryGaugeByLabel runs a single instant query and returns the result values +// keyed by the given label, so callers can batch what used to be per-entity +// lookups into one round-trip to Prometheus / VictoriaMetrics. +func (s *CardService) queryGaugeByLabel(ctx context.Context, query, label string) map[string]int32 { + out := map[string]int32{} + resp, err := s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: query}) + if err != nil { + return out + } + for _, sample := range resp.Data { + key := sample.Metric[label] + if key == "" { + continue + } + out[key] = int32(sample.Value) + } + return out +} + func (s *CardService) GetAllGPUTypes(ctx context.Context, req *pb.GetAllGpusReq) (*pb.GPUsReply, error) { deviceInfos, err := s.node.ListAllDevices(ctx) if err != nil { diff --git a/server/internal/service/node.go b/server/internal/service/node.go index 0dafb379..37b43d1c 100644 --- a/server/internal/service/node.go +++ b/server/internal/service/node.go @@ -2,7 +2,6 @@ package service import ( "context" - "fmt" "sort" "strconv" "vgpu/internal/biz" @@ -42,17 +41,26 @@ func (s *NodeService) GetAllNodes(ctx context.Context, req *pb.GetAllNodesReq) ( return nil, err } + // Fetch containers once (StatisticsByDeviceId used to re-list per device) and + // pull the per-node core/memory totals in two queries keyed by node, instead + // of two PromQL per node. This is what made /v1/nodes take 3-4s — and it is + // also called for the workload page's node filter dropdown. + containers, err := s.pod.ListAllContainers(ctx) + if err != nil { + return nil, err + } + coreByNode := s.queryNodeGauge(ctx, "avg(sum(hami_core_size) by (node, instance)) by (node)") + memByNode := s.queryNodeGauge(ctx, "avg(sum(hami_memory_size) by (node, instance)) by (node)") + var res = &pb.NodesReply{List: []*pb.NodeReply{}} for _, node := range nodes { - nodeReply, err := s.buildNodeReply(ctx, node) - if err != nil { - return nil, err - } + nodeReply := s.buildNodeReply(node, containers) - coreTotal, memoryTotal, err := s.queryNodeMetrics(ctx, node.Name) - if err == nil { - nodeReply.CoreTotal = coreTotal - nodeReply.MemoryTotal = memoryTotal + if v, ok := coreByNode[node.Name]; ok { + nodeReply.CoreTotal = v + } + if v, ok := memByNode[node.Name]; ok { + nodeReply.MemoryTotal = v } if filters.Ip != "" && filters.Ip != nodeReply.Ip { @@ -85,10 +93,18 @@ func (s *NodeService) GetNode(ctx context.Context, req *pb.GetNodeReq) (*pb.Node return nil, err } - return s.buildNodeReply(ctx, node) + containers, err := s.pod.ListAllContainers(ctx) + if err != nil { + return nil, err + } + + return s.buildNodeReply(node, containers), nil } -func (s *NodeService) buildNodeReply(ctx context.Context, node *biz.Node) (*pb.NodeReply, error) { +// buildNodeReply assembles a node reply from in-memory state only. The caller +// passes a pre-fetched container list so per-device usage stats reuse one scan +// rather than re-listing all containers for each device. +func (s *NodeService) buildNodeReply(node *biz.Node, containers []*biz.Container) *pb.NodeReply { nodeReply := &pb.NodeReply{ Name: node.Name, Uid: node.Uid, @@ -110,32 +126,33 @@ func (s *NodeService) buildNodeReply(ctx context.Context, node *biz.Node) (*pb.N nodeReply.VgpuTotal += device.Count nodeReply.CoreTotal += device.Devcore nodeReply.MemoryTotal += device.Devmem - vGPU, core, memory, err := s.pod.StatisticsByDeviceId(ctx, device.AliasId) - if err == nil { - nodeReply.VgpuUsed += vGPU - nodeReply.CoreUsed += core - nodeReply.MemoryUsed += memory - } + vGPU, core, memory := biz.ContainersStatisticsInfo(containers, device.AliasId) + nodeReply.VgpuUsed += vGPU + nodeReply.CoreUsed += core + nodeReply.MemoryUsed += memory } nodeReply.Type = arrutil.Unique(nodeReply.Type) nodeReply.CardCnt = int32(len(node.Devices)) - return nodeReply, nil + return nodeReply } -func (s *NodeService) queryNodeMetrics(ctx context.Context, nodeName string) (int32, int32, error) { - coreTotal, memoryTotal := int32(0), int32(0) - - resp, err := s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: fmt.Sprintf("avg(sum(hami_core_size{node=~\"%s\"}) by (instance))", nodeName)}) - if err == nil && len(resp.Data) > 0 { - coreTotal = int32(resp.Data[0].Value) +// queryNodeGauge runs a single instant query whose result is grouped by the +// "node" label and returns value-by-node, batching what used to be two PromQL +// per node into one round-trip. +func (s *NodeService) queryNodeGauge(ctx context.Context, query string) map[string]int32 { + out := map[string]int32{} + resp, err := s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: query}) + if err != nil { + return out } - - resp, err = s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: fmt.Sprintf("avg(sum(hami_memory_size{node=~\"%s\"}) by (instance))", nodeName)}) - if err == nil && len(resp.Data) > 0 { - memoryTotal = int32(resp.Data[0].Value) + for _, sample := range resp.Data { + node := sample.Metric["node"] + if node == "" { + continue + } + out[node] = int32(sample.Value) } - - return coreTotal, memoryTotal, nil + return out }