From 09b586e5ff02d815acfdcbdc29468166b0c0792a Mon Sep 17 00:00:00 2001 From: Alexey Potapenko Date: Fri, 13 Mar 2026 15:34:57 +0300 Subject: [PATCH] graphite: multiple servers support This patch adds the ability to send metrics to the multiple servers. Part of #TNTP-6584 --- CHANGELOG.md | 6 +++ metrics/plugins/graphite.lua | 90 ++++++++++++++++++++++++-------- test/plugins/graphite_test.lua | 95 +++++++++++++++++++++++++++------- 3 files changed, 149 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ac4368e1..db465acb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `graphite`: ability to send metrics to the multiple servers. + Backward compatibility with previous plugin version is preserved. + From now on `init` method assigns an unique name to the created fiber + using incoming graphite server `opts` (if passed). Added new `stop()` + method to stop all fibers started by the plugin. + ### Changed ### Fixed diff --git a/metrics/plugins/graphite.lua b/metrics/plugins/graphite.lua index e75355c8..4d8f6caf 100644 --- a/metrics/plugins/graphite.lua +++ b/metrics/plugins/graphite.lua @@ -3,7 +3,6 @@ local fiber = require('fiber') local metrics = require('metrics') local checks = require('checks') local log = require('log') -local fun = require('fun') local graphite = {} @@ -16,6 +15,29 @@ local DEFAULT_SEND_INTERVAL = 2 -- Constants local LABELS_SEP = ';' +local GRAPHITE_FIBERS = {} + +local function create_fiber_table(opts) + local needLongName = (opts ~= nil) + opts = opts or {} + local graphite_fiber = {} + + graphite_fiber.sock = nil + + graphite_fiber.prefix = opts.prefix or DEFAULT_PREFIX + graphite_fiber.host = opts.host or DEFAULT_HOST + graphite_fiber.port = opts.port or DEFAULT_PORT + graphite_fiber.send_interval = opts.send_interval or DEFAULT_SEND_INTERVAL + + graphite_fiber.name = "metrics_graphite_worker" + if needLongName then + graphite_fiber.name = graphite_fiber.name .. '_' .. + graphite_fiber.prefix .. '_' .. graphite_fiber.host .. '_' .. graphite_fiber.port + end + + return graphite_fiber +end + function graphite.format_observation(prefix, obs) local metric_path = #prefix > 0 and ('%s.%s'):format(prefix, obs.metric_name) or obs.metric_name @@ -36,26 +58,47 @@ function graphite.format_observation(prefix, obs) return graph end -local function graphite_worker(opts) - fiber.name('metrics_graphite_worker') +local function graphite_worker(args) + fiber.name(args.name) while true do metrics.invoke_callbacks() for _, c in pairs(metrics.collectors()) do for _, obs in ipairs(c:collect()) do - local data = graphite.format_observation(opts.prefix, obs) - local numbytes = opts.sock:sendto(opts.host, opts.port, data) + local data = graphite.format_observation(args.prefix, obs) + local numbytes = args.sock:sendto(args.host, args.port, data) if numbytes == nil then log.error('Error while sending to host %s port %s data %s', - opts.host, opts.port, data) + args.host, args.port, data) end end end - fiber.sleep(opts.send_interval) + fiber.sleep(args.send_interval) end end +local function start_fiber(input) + input.sock = socket('AF_INET', 'SOCK_DGRAM', 'udp') + assert(input.sock ~= nil, 'Socket creation failed') + + input.fiber = fiber.create(graphite_worker, { + name = input.name, + prefix = input.prefix, + sock = input.sock, + host = input.host, + port = input.port, + send_interval = input.send_interval, + }) + + return input +end + +local function stop_fiber(input) + pcall(input.sock.close, input.sock) + pcall(fiber.kill, input.fiber) +end + function graphite.init(opts) checks { prefix = '?string', @@ -64,25 +107,26 @@ function graphite.init(opts) send_interval = '?number' } - local sock = socket('AF_INET', 'SOCK_DGRAM', 'udp') - assert(sock ~= nil, 'Socket creation failed') + local graphite_fiber = create_fiber_table(opts) - local prefix = opts.prefix or DEFAULT_PREFIX - local host = opts.host or DEFAULT_HOST - local port = opts.port or DEFAULT_PORT - local send_interval = opts.send_interval or DEFAULT_SEND_INTERVAL + -- require('config'):reload() triggers only validate() and apply() + -- role's methods without stop(). + -- so, we should kill previous fiber if exist. + if GRAPHITE_FIBERS[graphite_fiber.name] then + stop_fiber(GRAPHITE_FIBERS[graphite_fiber.name]) + GRAPHITE_FIBERS[graphite_fiber.name] = nil + fiber.yield() + end - fun.iter(fiber.info()): - filter(function(_, x) return x.name == 'metrics_graphite_worker' end): - each(function(x) fiber.kill(x) end) + GRAPHITE_FIBERS[graphite_fiber.name] = start_fiber(graphite_fiber) +end - fiber.create(graphite_worker, { - prefix = prefix, - sock = sock, - host = host, - port = port, - send_interval = send_interval, - }) +function graphite.stop() + for _, v in pairs(GRAPHITE_FIBERS) do + stop_fiber(v) + end + + GRAPHITE_FIBERS = {} end return graphite diff --git a/test/plugins/graphite_test.lua b/test/plugins/graphite_test.lua index 3f547fa5..ebc2eb19 100755 --- a/test/plugins/graphite_test.lua +++ b/test/plugins/graphite_test.lua @@ -36,15 +36,10 @@ end) g.after_each(function(cg) cg.server:exec(function() - local fiber = require('fiber') - local fun = require('fun') - local metrics = require('metrics') -- Delete all collectors and global labels - metrics.clear() - fun.iter(fiber.info()): - filter(function(_, x) return x.name == 'metrics_graphite_worker' end): - each(function(x) fiber.kill(x) end) - fiber.yield() -- let cancelled fibers disappear from fiber.info() + require('metrics').clear() + -- Stop all graphite fibers + require('metrics.plugins.graphite').stop() end) end) @@ -152,10 +147,12 @@ g.test_graphite_sends_data_to_socket = function(cg) local cnt = metrics.counter('test_cnt', 'test-cnt') cnt:inc(1) + graphite.init({port = port}) end, {port}) - require('fiber').sleep(0.5) + sock:readable(2) + local graphite_obs = sock:recvfrom(50) local obs_table = graphite_obs:split(' ') t.assert_equals(obs_table[1], 'tarantool.test_cnt') @@ -163,32 +160,92 @@ g.test_graphite_sends_data_to_socket = function(cg) sock:close() end -g.test_graphite_kills_previous_fibers_on_init = function(cg) +g.test_graphite_stop_default_fibers = function(cg) cg.server:exec(function() local fiber = require('fiber') local fun = require('fun') local graphite = require('metrics.plugins.graphite') - local function mock_graphite_worker() - fiber.create(function() - fiber.name('metrics_graphite_worker') - fiber.sleep(math.huge) - end) + local function count_workers() + return fun.iter(fiber.info()): + filter(function(_, x) return string.find(x.name, 'metrics_graphite_worker') end): + length() end + t.assert_equals(count_workers(), 0) + + graphite.init({}) + t.assert_equals(count_workers(), 1) + + graphite.stop() + require('fiber').yield() + t.assert_equals(count_workers(), 0) + end) +end + +g.test_graphite_stop_custom_fiber = function(cg) + cg.server:exec(function() + local fiber = require('fiber') + local fun = require('fun') + local graphite = require('metrics.plugins.graphite') + local function count_workers() return fun.iter(fiber.info()): - filter(function(_, x) return x.name == 'metrics_graphite_worker' end): + filter(function(_, x) return string.find(x.name, 'metrics_graphite_worker') end): length() end + local opts = { + prefix = "master", + host = "127.0.0.1", + port = 3333, + send_interval = 1, + } + + local opts2 = { + prefix = "tarantool", + host = "127.0.0.1", + port = 4444, + send_interval = 1, + } + t.assert_equals(count_workers(), 0) - mock_graphite_worker() - mock_graphite_worker() + + graphite.init(opts) + graphite.init(opts2) + t.assert_equals(count_workers(), 2) + graphite.stop() + require('fiber').yield() + t.assert_equals(count_workers(), 0) + end) +end + +g.test_graphite_double_start = function(cg) + cg.server:exec(function() + local fiber = require('fiber') + local fun = require('fun') + local graphite = require('metrics.plugins.graphite') + + local function count_workers() + return fun.iter(fiber.info()): + filter(function(_, x) return string.find(x.name, 'metrics_graphite_worker') end): + length() + end + + t.assert_equals(count_workers(), 0) + + graphite.init({}) + + t.assert_equals(count_workers(), 1) + graphite.init({}) - fiber.yield() -- let cancelled fibers disappear from fiber.info() + t.assert_equals(count_workers(), 1) + + graphite.stop() + require('fiber').yield() + t.assert_equals(count_workers(), 0) end) end