From 1d3caf1016f892c1ca642535d240228775a2a113 Mon Sep 17 00:00:00 2001 From: Patrik Lindahl Date: Fri, 1 Oct 2021 15:37:07 +0200 Subject: [PATCH] Support for getting clustered metrics as object New functions: getClusterMetricsAsJSON() getClusterMetricsAsArray() --- index.d.ts | 20 ++++++- lib/cluster.js | 138 ++++++++++++++++++++++++++++++-------------- test/clusterTest.js | 18 ++++++ 3 files changed, 131 insertions(+), 45 deletions(-) diff --git a/index.d.ts b/index.d.ts index 7f380e64..b935a06e 100644 --- a/index.d.ts +++ b/index.d.ts @@ -92,6 +92,20 @@ export class AggregatorRegistry extends Registry { */ clusterMetrics(): Promise; + /** + * Gets aggregated metrics for all workers as objects + * @return {Promise} Promise that resolves with the aggregated + * metrics. + */ + getClusterMetricsAsJSON(): Promise; + + /** + * Gets aggregated metrics for all workers as objects + * @return {Promise} Promise that resolves with the aggregated + * metrics. + */ + getClusterMetricsAsArray(): Promise; + /** * Creates a new Registry instance from an array of metrics that were * created by `registry.getMetricsAsJSON()`. Metrics are aggregated using @@ -560,7 +574,7 @@ export class Pushgateway { */ pushAdd( params: Pushgateway.Parameters, - ): Promise<{ resp?: unknown, body?: unknown }>; + ): Promise<{ resp?: unknown; body?: unknown }>; /** * Overwrite all metric (using PUT to Pushgateway) @@ -568,7 +582,7 @@ export class Pushgateway { */ push( params: Pushgateway.Parameters, - ): Promise<{ resp?: unknown, body?: unknown }>; + ): Promise<{ resp?: unknown; body?: unknown }>; /** * Delete all metrics for jobName @@ -576,7 +590,7 @@ export class Pushgateway { */ delete( params: Pushgateway.Parameters, - ): Promise<{ resp?: unknown, body?: unknown }>; + ): Promise<{ resp?: unknown; body?: unknown }>; } export namespace Pushgateway { diff --git a/lib/cluster.js b/lib/cluster.js index cb564ede..4fdee055 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -40,48 +40,29 @@ class AggregatorRegistry extends Registry { * metrics. */ clusterMetrics() { - const requestId = requestCtr++; - - return new Promise((resolve, reject) => { - let settled = false; - function done(err, result) { - if (settled) return; - settled = true; - if (err) reject(err); - else resolve(result); - } + return clusterMetricsRequest('string'); + } - const request = { - responses: [], - pending: 0, - done, - errorTimeout: setTimeout(() => { - const err = new Error('Operation timed out.'); - request.done(err); - }, 5000), - }; - requests.set(requestId, request); - - const message = { - type: GET_METRICS_REQ, - requestId, - }; - - for (const id in cluster().workers) { - // If the worker exits abruptly, it may still be in the workers - // list but not able to communicate. - if (cluster().workers[id].isConnected()) { - cluster().workers[id].send(message); - request.pending++; - } - } + /** + * Gets aggregated metrics for all workers in JSON format. + * The optional callback and returned Promise resolve with the same value; + * either may be used. + * @return {Promise} Promise that resolves with the aggregated + * metrics in JSON format. + */ + getClusterMetricsAsJSON() { + return clusterMetricsRequest('json'); + } - if (request.pending === 0) { - // No workers were up - clearTimeout(request.errorTimeout); - process.nextTick(() => done(null, '')); - } - }); + /** + * Gets aggregated metrics for all workers as an array. + * The optional callback and returned Promise resolve with the same value; + * either may be used. + * @return {Promise} Promise that resolves with the aggregated + * metrics in JSON format. + */ + getClusterMetricsAsArray() { + return clusterMetricsRequest('array'); } /** @@ -174,8 +155,21 @@ function addListeners() { clearTimeout(request.errorTimeout); const registry = AggregatorRegistry.aggregate(request.responses); - const promString = registry.metrics(); - request.done(null, promString); + switch (request.format) { + case 'json': + request.done(null, registry.getMetricsAsJSON()); + break; + case 'array': + request.done(null, registry.getMetricsAsArray()); + break; + case 'string': + request.done(null, registry.metrics()); + break; + default: + request.done( + new Error(`unknown metrics format: ${request.format}`), + ); + } } } }); @@ -205,4 +199,64 @@ function addListeners() { } } +function clusterMetricsRequest(format) { + const requestId = requestCtr++; + + return new Promise((resolve, reject) => { + let settled = false; + function done(err, result) { + if (settled) return; + settled = true; + if (err) reject(err); + else resolve(result); + } + + const request = { + responses: [], + pending: 0, + format, + done, + errorTimeout: setTimeout(() => { + const err = new Error('Operation timed out.'); + request.done(err); + }, 5000), + }; + requests.set(requestId, request); + + const message = { + type: GET_METRICS_REQ, + requestId, + }; + + for (const id in cluster().workers) { + // If the worker exits abruptly, it may still be in the workers + // list but not able to communicate. + if (cluster().workers[id].isConnected()) { + cluster().workers[id].send(message); + request.pending++; + } + } + + if (request.pending === 0) { + // No workers were up + clearTimeout(request.errorTimeout); + switch (request.format) { + case 'json': + process.nextTick(() => done(null, [])); + break; + case 'array': + process.nextTick(() => done(null, [])); + break; + case 'string': + process.nextTick(() => done(null, '')); + break; + default: + process.nextTick(() => + done(new Error(`unknown metrics format: ${request.format}`)), + ); + } + } + }); +} + module.exports = AggregatorRegistry; diff --git a/test/clusterTest.js b/test/clusterTest.js index 8569ea99..b54b8b6c 100644 --- a/test/clusterTest.js +++ b/test/clusterTest.js @@ -41,6 +41,24 @@ describe('AggregatorRegistry', () => { }); }); + describe('aggregatorRegistry.getClusterMetricsAsJSON()', () => { + it('works properly if there are no cluster workers', async () => { + const AggregatorRegistry = require('../lib/cluster'); + const ar = new AggregatorRegistry(); + const metrics = await ar.getClusterMetricsAsJSON(); + expect(metrics).toEqual([]); + }); + }); + + describe('aggregatorRegistry.getClusterMetricsAsArray()', () => { + it('works properly if there are no cluster workers', async () => { + const AggregatorRegistry = require('../lib/cluster'); + const ar = new AggregatorRegistry(); + const metrics = await ar.getClusterMetricsAsArray(); + expect(metrics).toEqual([]); + }); + }); + describe('AggregatorRegistry.aggregate()', () => { const Registry = require('../lib/cluster'); // These mimic the output of `getMetricsAsJSON`.