Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `graphite`: ability to send metrics to the multiple servers.
Comment thread
oleg-jukovec marked this conversation as resolved.
Backward compatibility with previous plugin version is preserved.
From now on `init` method assigns an unique name to the created fiber
using incoming graphite server `opts` (if passed). Added new `stop()`
method to stop all fibers started by the plugin.

### Changed

### Fixed
Expand Down
90 changes: 67 additions & 23 deletions metrics/plugins/graphite.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ local fiber = require('fiber')
local metrics = require('metrics')
local checks = require('checks')
local log = require('log')
local fun = require('fun')

local graphite = {}

Expand All @@ -16,6 +15,29 @@ local DEFAULT_SEND_INTERVAL = 2
-- Constants
local LABELS_SEP = ';'

local GRAPHITE_FIBERS = {}

local function create_fiber_table(opts)
Comment thread
bigbes marked this conversation as resolved.
local needLongName = (opts ~= nil)
opts = opts or {}
local graphite_fiber = {}

graphite_fiber.sock = nil

graphite_fiber.prefix = opts.prefix or DEFAULT_PREFIX
graphite_fiber.host = opts.host or DEFAULT_HOST
graphite_fiber.port = opts.port or DEFAULT_PORT
graphite_fiber.send_interval = opts.send_interval or DEFAULT_SEND_INTERVAL

graphite_fiber.name = "metrics_graphite_worker"
if needLongName then
graphite_fiber.name = graphite_fiber.name .. '_' ..
graphite_fiber.prefix .. '_' .. graphite_fiber.host .. '_' .. graphite_fiber.port
end

return graphite_fiber
end

function graphite.format_observation(prefix, obs)
local metric_path = #prefix > 0 and ('%s.%s'):format(prefix, obs.metric_name) or obs.metric_name

Expand All @@ -36,26 +58,47 @@ function graphite.format_observation(prefix, obs)
return graph
end

local function graphite_worker(opts)
fiber.name('metrics_graphite_worker')
local function graphite_worker(args)
fiber.name(args.name)

while true do
metrics.invoke_callbacks()
for _, c in pairs(metrics.collectors()) do
for _, obs in ipairs(c:collect()) do
local data = graphite.format_observation(opts.prefix, obs)
local numbytes = opts.sock:sendto(opts.host, opts.port, data)
local data = graphite.format_observation(args.prefix, obs)
local numbytes = args.sock:sendto(args.host, args.port, data)
if numbytes == nil then
log.error('Error while sending to host %s port %s data %s',
opts.host, opts.port, data)
args.host, args.port, data)
end
end
end

fiber.sleep(opts.send_interval)
fiber.sleep(args.send_interval)
end
end

local function start_fiber(input)
input.sock = socket('AF_INET', 'SOCK_DGRAM', 'udp')
assert(input.sock ~= nil, 'Socket creation failed')

input.fiber = fiber.create(graphite_worker, {
name = input.name,
prefix = input.prefix,
sock = input.sock,
host = input.host,
port = input.port,
send_interval = input.send_interval,
})

return input
end

local function stop_fiber(input)
pcall(input.sock.close, input.sock)
pcall(fiber.kill, input.fiber)
end

function graphite.init(opts)
checks {
prefix = '?string',
Expand All @@ -64,25 +107,26 @@ function graphite.init(opts)
send_interval = '?number'
}

local sock = socket('AF_INET', 'SOCK_DGRAM', 'udp')
assert(sock ~= nil, 'Socket creation failed')
local graphite_fiber = create_fiber_table(opts)

local prefix = opts.prefix or DEFAULT_PREFIX
local host = opts.host or DEFAULT_HOST
local port = opts.port or DEFAULT_PORT
local send_interval = opts.send_interval or DEFAULT_SEND_INTERVAL
-- require('config'):reload() triggers only validate() and apply()
-- role's methods without stop().
-- so, we should kill previous fiber if exist.
if GRAPHITE_FIBERS[graphite_fiber.name] then
stop_fiber(GRAPHITE_FIBERS[graphite_fiber.name])
GRAPHITE_FIBERS[graphite_fiber.name] = nil
fiber.yield()
end

fun.iter(fiber.info()):
filter(function(_, x) return x.name == 'metrics_graphite_worker' end):
each(function(x) fiber.kill(x) end)
GRAPHITE_FIBERS[graphite_fiber.name] = start_fiber(graphite_fiber)
end

fiber.create(graphite_worker, {
prefix = prefix,
sock = sock,
host = host,
port = port,
send_interval = send_interval,
})
function graphite.stop()
Comment thread
bigbes marked this conversation as resolved.
for _, v in pairs(GRAPHITE_FIBERS) do
stop_fiber(v)
end
Comment thread
olegrok marked this conversation as resolved.

GRAPHITE_FIBERS = {}
end

return graphite
95 changes: 76 additions & 19 deletions test/plugins/graphite_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,10 @@ end)

g.after_each(function(cg)
cg.server:exec(function()
local fiber = require('fiber')
local fun = require('fun')
local metrics = require('metrics')
-- Delete all collectors and global labels
metrics.clear()
fun.iter(fiber.info()):
filter(function(_, x) return x.name == 'metrics_graphite_worker' end):
each(function(x) fiber.kill(x) end)
fiber.yield() -- let cancelled fibers disappear from fiber.info()
require('metrics').clear()
-- Stop all graphite fibers
require('metrics.plugins.graphite').stop()
end)
end)

Expand Down Expand Up @@ -152,43 +147,105 @@ g.test_graphite_sends_data_to_socket = function(cg)

local cnt = metrics.counter('test_cnt', 'test-cnt')
cnt:inc(1)

graphite.init({port = port})
end, {port})

require('fiber').sleep(0.5)
sock:readable(2)

local graphite_obs = sock:recvfrom(50)
local obs_table = graphite_obs:split(' ')
t.assert_equals(obs_table[1], 'tarantool.test_cnt')
t.assert_equals(obs_table[2], '1')
sock:close()
end

g.test_graphite_kills_previous_fibers_on_init = function(cg)
g.test_graphite_stop_default_fibers = function(cg)
cg.server:exec(function()
local fiber = require('fiber')
local fun = require('fun')
local graphite = require('metrics.plugins.graphite')

local function mock_graphite_worker()
fiber.create(function()
fiber.name('metrics_graphite_worker')
fiber.sleep(math.huge)
end)
local function count_workers()
return fun.iter(fiber.info()):
filter(function(_, x) return string.find(x.name, 'metrics_graphite_worker') end):
length()
end

t.assert_equals(count_workers(), 0)

graphite.init({})
t.assert_equals(count_workers(), 1)

graphite.stop()
require('fiber').yield()
t.assert_equals(count_workers(), 0)
end)
end

g.test_graphite_stop_custom_fiber = function(cg)
cg.server:exec(function()
local fiber = require('fiber')
local fun = require('fun')
local graphite = require('metrics.plugins.graphite')

local function count_workers()
return fun.iter(fiber.info()):
filter(function(_, x) return x.name == 'metrics_graphite_worker' end):
filter(function(_, x) return string.find(x.name, 'metrics_graphite_worker') end):
length()
end

local opts = {
prefix = "master",
host = "127.0.0.1",
port = 3333,
send_interval = 1,
}

local opts2 = {
prefix = "tarantool",
host = "127.0.0.1",
port = 4444,
send_interval = 1,
}

t.assert_equals(count_workers(), 0)
mock_graphite_worker()
mock_graphite_worker()

graphite.init(opts)
graphite.init(opts2)

t.assert_equals(count_workers(), 2)

graphite.stop()
require('fiber').yield()
t.assert_equals(count_workers(), 0)
end)
end

g.test_graphite_double_start = function(cg)
cg.server:exec(function()
local fiber = require('fiber')
local fun = require('fun')
local graphite = require('metrics.plugins.graphite')

local function count_workers()
return fun.iter(fiber.info()):
filter(function(_, x) return string.find(x.name, 'metrics_graphite_worker') end):
length()
end

t.assert_equals(count_workers(), 0)

graphite.init({})

t.assert_equals(count_workers(), 1)

graphite.init({})
fiber.yield() -- let cancelled fibers disappear from fiber.info()

t.assert_equals(count_workers(), 1)

graphite.stop()
require('fiber').yield()
t.assert_equals(count_workers(), 0)
end)
end
Loading