From bc24c32b4823e168a4fac805e5e26500a4f4f576 Mon Sep 17 00:00:00 2001 From: Alexey Slynko Date: Wed, 17 Feb 2016 02:44:28 +0300 Subject: [PATCH] implement line protocol (https://docs.influxdata.com/influxdb/v0.10/write_protocols/write_syntax/) use obsoleted json protocol by default --- config/default.yaml | 1 + lib/influxdb.coffee | 50 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 41 insertions(+), 10 deletions(-) diff --git a/config/default.yaml b/config/default.yaml index 3d250d8..3152e84 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -17,6 +17,7 @@ influxdb: username: 'root' password: 'root' use_udp: false + use_json: true retentionPolicy: 'default' # Acceptable version are: '0.8' and '0.9' version: '0.9' diff --git a/lib/influxdb.coffee b/lib/influxdb.coffee index 368bff5..a2e51cc 100644 --- a/lib/influxdb.coffee +++ b/lib/influxdb.coffee @@ -11,7 +11,7 @@ class Client @send = if useUDP then @sendUDP() else @sendHTTP() write: (metrics) -> - @send @metricsJson metrics + @send @metricsStringify metrics sendHTTP: -> version = @config.get('influxdb.version').get() ? '0.9' @@ -20,11 +20,14 @@ class Client database = @config.get('influxdb.database').get() ? 'bucky' username = @config.get('influxdb.username').get() ? 'root' password = @config.get('influxdb.password').get() ? 'root' + use_json = @config.get('influxdb.use_json').get() ? false logger = @logger if version == '0.8' endpoint = 'http://' + host + ':' + port + '/db/' + database + '/series' else endpoint = 'http://' + host + ':' + port + '/write' + if not use_json + endpoint += '?db=' + database + '&rp=' + (@config.get('influxdb.retentionPolicy').get() ? "default") client = request.defaults method: 'POST' url: endpoint @@ -46,43 +49,70 @@ class Client client.send message, 0, message.length, port, host - metricsJson: (metrics) -> + metricsStringify: (metrics) -> version = @config.get('influxdb.version').get() ? '0.9' + use_json = @config.get('influxdb.use_json').get() ? false if version == '0.8' data = [] - else + else if use_json data = database: @config.get('influxdb.database').get() ? 'bucky' retentionPolicy: @config.get('influxdb.retentionPolicy').get() ? "default" time: new Date().toISOString() points: [] + else + data = [] + hrTime = process.hrtime() + timestamp = new Date().getTime() * 1e6 for key, desc of metrics - [val, unit, sample] = @parseRow desc + [val, unit, sample] = @parseRow key,desc + + if not val? + continue if version == '0.8' data.push name: key, columns: ['value'], points: [[parseFloat val]] - else + else if use_json is true data.points.push measurement: key fields: value: parseFloat val unit: unit sample: sample - # @logger.log(JSON.stringify(data, null, 2)) - JSON.stringify data + else + str = (@escape key, false) + ' value=' + (parseFloat val) + if unit? + str += ',unit="' + unit + '"' + if sample? + str += ',sample="' + sample + '"' + str += ' ' + timestamp + data.push str + if version == '0.8' or use_json is true + #@logger.log(JSON.stringify(data, null, 2)) + JSON.stringify data + else + @logger.log(data.join("\n")) + data.join("\n") - parseRow: (row) -> + parseRow: (key, row) -> re = /([0-9\.]+)\|([a-z]+)(?:@([0-9\.]+))?/ groups = re.exec(row) unless groups - @logger.log "Unparsable row: #{ row }" - return + @logger.log "Unparsable row: #{ row } key: #{ key }" + return [] groups.slice(1, 4) + escape: (value, escapeEqualSign = true) -> + escaped = value.replace(/(\s+)/, '\\ ').replace(/,/, '\\,') + if escapeEqualSign + escaped = escaped.replace(/\=/, '\\=') + + escaped + module.exports = Client