Advertisement
Guest User

Untitled

a guest
Dec 8th, 2016
73
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 14.67 KB | None | 0 0
  1. /*
  2. * Flush stats to InfluxDB (http://influxdb.org/)
  3. *
  4. * To enable this backend, include 'statsd-influxdb-backend' in the backends
  5. * configuration array:
  6. *
  7. * backends: ['statsd-influxdb-backend']
  8. *
  9. * The backend will read the configuration options from the following
  10. * 'influxdb' hash defined in the main statsd config file:
  11. *
  12. * influxdb: {
  13. * host: '127.0.0.1', // InfluxDB host. (default 127.0.0.1)
  14. * port: 8086, // InfluxDB port. (default 8086)
  15. * ssl: false, // InfluxDB is hosted over SSL. (default false)
  16. * database: 'dbname', // InfluxDB database instance. (required)
  17. * username: 'user', // InfluxDB database username.
  18. * password: 'pass', // InfluxDB database password.
  19. * flush: {
  20. * enable: true // Enable regular flush strategy. (default true)
  21. * },
  22. * proxy: {
  23. * enable: false, // Enable the proxy strategy. (default false)
  24. * suffix: 'raw', // Metric name suffix. (default 'raw')
  25. * flushInterval: 1000 // Flush interval for the internal buffer.
  26. * // (default 1000)
  27. * },
  28. * includeStatsdMetrics: false, // Send internal statsd metrics to InfluxDB. (default false)
  29. * includeInfluxdbMetrics: false // Send internal backend metrics to InfluxDB. (default false)
  30. * // Requires includeStatsdMetrics to be enabled.
  31. * }
  32. *
  33. */
  34. var util = require('util'),
  35. querystring = require('querystring'),
  36. http = require('http'),
  37. https = require('https');
  38.  
  39. function InfluxdbBackend(startupTime, config, events) {
  40. var self = this;
  41.  
  42. self.debug = config.debug;
  43. self.registry = {};
  44. self.influxdbStats = {};
  45.  
  46. self.defaultHost = '127.0.0.1';
  47. self.defaultPort = 8086;
  48. self.defaultVersion = 0.8;
  49. self.defaultFlushEnable = true;
  50. self.defaultProxyEnable = false;
  51. self.defaultProxySuffix = 'raw';
  52. self.defaultProxyFlushInterval = 1000;
  53.  
  54. self.host = self.defaultHost;
  55. self.port = self.defaultPort;
  56. self.version = self.defaultVersion;
  57. self.protocol = http;
  58. self.flushEnable = self.defaultFlushEnable;
  59. self.proxyEnable = self.defaultProxyEnable;
  60. self.proxySuffix = self.defaultProxySuffix;
  61. self.proxyFlushInterval = self.defaultProxyFlushInterval;
  62. self.includeStatsdMetrics = false;
  63. self.includeInfluxdbMetrics = false;
  64.  
  65. /* XXX Hardcoding default prefix here because it is not accessible otherwise. */
  66. self.prefixStats = config.prefixStats !== undefined ? config.prefixStats : 'statsd';
  67.  
  68. if (config.influxdb) {
  69. self.host = config.influxdb.host || self.defaultHost;
  70. self.port = config.influxdb.port || self.defaultPort;
  71. self.version = config.influxdb.version || self.defaultVersion;
  72. self.user = config.influxdb.username;
  73. self.pass = config.influxdb.password;
  74. self.database = config.influxdb.database;
  75. self.includeStatsdMetrics = config.influxdb.includeStatsdMetrics;
  76. self.includeInfluxdbMetrics = config.influxdb.includeInfluxdbMetrics;
  77.  
  78. if (config.influxdb.ssl) {
  79. self.protocol = https;
  80. }
  81.  
  82. if (config.influxdb.flush) {
  83. self.flushEnable = config.influxdb.flush.enable;
  84. }
  85.  
  86. if (config.influxdb.proxy) {
  87. self.proxyEnable = config.influxdb.proxy.enable || self.defaultProxyEnable;
  88. self.proxySuffix = config.influxdb.proxy.suffix || self.defaultProxySuffix;
  89. self.proxyFlushInterval = config.influxdb.proxy.flushInterval || self.defaultProxyFlushInterval;
  90. }
  91. }
  92.  
  93. if (self.version >= 0.9) {
  94. self.assembleEvent = self.assembleEvent_v09;
  95. self.httpPOST = self.httpPOST_v09;
  96. } else {
  97. self.assembleEvent = self.assembleEvent_v08;
  98. self.httpPOST = self.httpPOST_v08;
  99. }
  100.  
  101. if (self.proxyEnable) {
  102. self.log('Starting the buffer flush interval. (every ' + self.proxyFlushInterval + 'ms)');
  103. setInterval(function () {
  104. self.flushQueue();
  105. }, self.proxyFlushInterval);
  106.  
  107. events.on('packet', function (packet, rinfo) {
  108. try {
  109. self.processPacket(packet, rinfo);
  110. } catch (e) {
  111. self.log(e);
  112. }
  113. });
  114. }
  115.  
  116. if (self.flushEnable) {
  117. events.on('flush', function (timestamp, metrics) {
  118. try {
  119. self.processFlush(timestamp, metrics);
  120. } catch (e) {
  121. self.log(e);
  122. }
  123. });
  124. }
  125.  
  126. events.on('status', function (writeCb) {
  127. for (var stat in self.influxdbStats) {
  128. writeCb(null, 'influxdb', stat, self.influxdbStats[stat]);
  129. }
  130. });
  131.  
  132. return true;
  133. }
  134.  
  135. function millisecondsSince(start) {
  136. diff = process.hrtime(start);
  137. return diff[0] * 1000 + diff[1] / 1000000;
  138. }
  139.  
  140. InfluxdbBackend.prototype.log = function (msg) {
  141. util.log('[influxdb] ' + msg);
  142. }
  143.  
  144. InfluxdbBackend.prototype.logDebug = function (msg) {
  145. if (this.debug) {
  146. var string;
  147.  
  148. if (msg instanceof Function) {
  149. string = msg();
  150. } else {
  151. string = msg;
  152. }
  153.  
  154. util.log('[influxdb] (DEBUG) ' + string);
  155. }
  156. }
  157.  
  158. /**
  159. * Flush strategy handler
  160. *
  161. * @param {Number} timestamp
  162. * @param {Object} stats metric
  163. */
  164. InfluxdbBackend.prototype.processFlush = function (timestamp, metrics) {
  165. var self = this,
  166. counters = metrics.counters,
  167. gauges = metrics.gauges,
  168. timerData = metrics.timer_data,
  169. statsdMetrics = metrics.statsd_metrics,
  170. points = [],
  171. sets = function (vals) {
  172. var ret = {};
  173. for (var val in vals) {
  174. ret[val] = vals[val].values();
  175. }
  176. return ret;
  177. }(metrics.sets),
  178. startTime = process.hrtime(),
  179. key, timerKey,
  180. statsPrefixRegexp = new RegExp('^' + self.prefixStats + '\\.');
  181.  
  182. /* Convert timestamp from seconds to milliseconds. */
  183. timestamp = (timestamp * 1000);
  184.  
  185. for (key in counters) {
  186. /* Do not include statsd counters. */
  187. if (!self.includeStatsdMetrics && key.match(statsPrefixRegexp)) { continue; }
  188.  
  189. var value = counters[key],
  190. k = key + '.counter';
  191.  
  192. if (value) {
  193. points.push(self.assembleEvent(k, [{value: value, time: timestamp}]));
  194. }
  195. }
  196.  
  197. for (set in sets) {
  198. sets[set].map(function (v) {
  199. points.push(self.assembleEvent(set, [{value: v, time: timestamp}]));
  200. })
  201. points.push(self.assembleEvent(set + "_count", [{value: sets[set].length, time: timestamp}]));
  202. }
  203.  
  204. for (key in gauges) {
  205. /* Do not include statsd gauges. */
  206. if (!self.includeStatsdMetrics && key.match(statsPrefixRegexp)) { continue; }
  207.  
  208. var value = gauges[key],
  209. k = key + '.gauge';
  210.  
  211. if (!isNaN(parseFloat(value)) && isFinite(value)) {
  212. points.push(self.assembleEvent(k, [{value: value, time: timestamp}]));
  213. }
  214. }
  215.  
  216. for (key in timerData) {
  217. var timerMetrics = timerData[key];
  218.  
  219. // Try to add histogram data, if it is there:
  220. if (timerMetrics.histogram) {
  221. var histoMetrics = timerMetrics.histogram
  222. , histoKey;
  223.  
  224. for (histoKey in histoMetrics) {
  225. var value = histoMetrics[histoKey],
  226. k = key + '.timer.histogram.' + histoKey;
  227.  
  228. points.push(self.assembleEvent(k, [{value: value, time: timestamp}]));
  229. }
  230.  
  231. // Delete here so it isn't iterated over later:
  232. delete timerMetrics.histogram;
  233. }
  234.  
  235. // Iterate over normal metrics:
  236. for (timerKey in timerMetrics) {
  237. var value = timerMetrics[timerKey],
  238. k = key + '.timer' + '.' + timerKey;
  239.  
  240. points.push(self.assembleEvent(k, [{value: value, time: timestamp}]));
  241. }
  242. }
  243.  
  244. if (self.includeStatsdMetrics) {
  245. // Include backend metrics for the previous flush
  246. if (self.includeInfluxdbMetrics) {
  247. statsdMetrics['influxdbStats.flush_time'] = self.influxdbStats.flushTime;
  248. statsdMetrics['influxdbStats.http_response_time'] = self.influxdbStats.httpResponseTime;
  249. statsdMetrics['influxdbStats.payload_size'] = self.influxdbStats.payloadSize;
  250. statsdMetrics['influxdbStats.num_stats'] = self.influxdbStats.numStats;
  251. }
  252.  
  253. for (key in statsdMetrics) {
  254. var value = statsdMetrics[key],
  255. k = self.prefixStats + '.' + key;
  256.  
  257. if (!isNaN(parseFloat(value)) && isFinite(value)) {
  258. points.push(self.assembleEvent(k, [{value: value, time: timestamp}]));
  259. }
  260. }
  261. }
  262.  
  263. self.httpPOST(points);
  264. self.influxdbStats.flushTime = millisecondsSince(startTime);
  265. }
  266.  
  267. InfluxdbBackend.prototype.processPacket = function (packet, rinfo) {
  268. var self = this,
  269. ts = (new Date()).valueOf();
  270.  
  271. /* Stolen from statsd's stats.js. */
  272. var packet_data = packet.toString(),
  273. metrics;
  274.  
  275. if (packet_data.indexOf("\n") > -1) {
  276. metrics = packet_data.split("\n");
  277. } else {
  278. metrics = [packet_data];
  279. }
  280.  
  281. for (var midx in metrics) {
  282. if (metrics[midx].length === 0) {
  283. continue;
  284. }
  285. var bits = metrics[midx].toString().split(':');
  286. var key = bits.shift()
  287. .replace(/\s+/g, '_')
  288. .replace(/\//g, '-')
  289. .replace(/[^a-zA-Z_\-0-9\.]/g, '');
  290.  
  291. if (bits.length === 0) {
  292. bits.push("1");
  293. }
  294.  
  295. for (var i = 0; i < bits.length; i++) {
  296. var fields = bits[i].split("|");
  297.  
  298. if (fields[1] === undefined) {
  299. self.log('Bad line: ' + fields + ' in msg "' + metrics[midx] +'"');
  300. continue;
  301. }
  302.  
  303. var metric_type = fields[1].trim();
  304.  
  305. /* Timer */
  306. if (metric_type === "ms") {
  307. self.enqueue('timer', ts, key, Number(fields[0] || 0));
  308. /* Gauge */
  309. } else if (metric_type === "g") {
  310. if (fields[0].match(/^[-+]/)) {
  311. self.logDebug('Sending gauges with +/- is not supported yet.');
  312. } else {
  313. self.enqueue('gauge', ts, key, Number(fields[0] || 0));
  314. }
  315. /* Set */
  316. } else if (metric_type === "s") {
  317. self.logDebug('Sets not supported yet.');
  318. /* Counter */
  319. } else {
  320. /* XXX Handle sampling. */
  321. self.enqueue('counter', ts, key, Number(fields[0] || 1));
  322. }
  323. }
  324. }
  325. }
  326.  
  327. InfluxdbBackend.prototype.enqueue = function (type, ts, key, value) {
  328. var self = this;
  329.  
  330. key = key + '.' + type + '.' + self.proxySuffix;
  331.  
  332. if (!self.registry[key]) {
  333. self.registry[key] = [];
  334. }
  335.  
  336. self.registry[key].push({value: value, time: ts});
  337. }
  338.  
  339. InfluxdbBackend.prototype.flushQueue = function () {
  340. var self = this,
  341. registry = self.clearRegistry(),
  342. points = [];
  343.  
  344. for (var key in registry) {
  345. var payload = self.assembleEvent(key, registry[key]);
  346.  
  347. self.logDebug(function () {
  348. return 'Flush ' + registry[key].length + ' values for ' + key;
  349. });
  350.  
  351. points.push(payload);
  352. }
  353.  
  354. self.httpPOST(points);
  355.  
  356. self.logDebug('Queue flushed');
  357. }
  358.  
  359.  
  360. InfluxdbBackend.prototype.clearRegistry = function () {
  361. var self = this,
  362. registry = self.registry;
  363.  
  364. self.registry = {};
  365.  
  366. return registry;
  367. }
  368.  
  369. InfluxdbBackend.prototype.assembleEvent_v08 = function (name, events) {
  370. var self = this;
  371.  
  372. var payload = {
  373. name: name,
  374. columns: Object.keys(events[0]),
  375. points: []
  376. };
  377.  
  378. for (var idx in events) {
  379. var event = events[idx],
  380. points = [];
  381.  
  382. for (var cidx in payload.columns) {
  383. var column = payload.columns[cidx];
  384.  
  385. points.push(event[column]);
  386. }
  387.  
  388. payload.points.push(points);
  389. }
  390.  
  391. return payload;
  392. }
  393.  
  394. InfluxdbBackend.prototype.assembleEvent_v09 = function (name, events) {
  395. var self = this;
  396.  
  397. var payload = {
  398. measurement: name,
  399. fields: { value: events[0]['value'] }
  400. }
  401.  
  402. return payload;
  403. }
  404.  
  405. InfluxdbBackend.prototype.httpPOST_v08 = function (points) {
  406. /* Do not send if there are no points. */
  407. if (!points.length) { return; }
  408.  
  409. var self = this,
  410. query = {u: self.user, p: self.pass, time_precision: 'ms'},
  411. protocolName = self.protocol == http ? 'HTTP' : 'HTTPS',
  412. startTime;
  413.  
  414. self.logDebug(function () {
  415. return 'Sending ' + points.length + ' different points via ' + protocolName;
  416. });
  417.  
  418. self.influxdbStats.numStats = points.length;
  419.  
  420. var options = {
  421. hostname: self.host,
  422. port: self.port,
  423. path: '/write?db=' + self.database,
  424. method: 'POST',
  425. agent: false // Is it okay to use "undefined" here? (keep-alive)
  426. };
  427.  
  428. var req = self.protocol.request(options);
  429.  
  430. req.on('socket', function (res) {
  431. startTime = process.hrtime();
  432. });
  433.  
  434. req.on('response', function (res) {
  435. var status = res.statusCode;
  436.  
  437. self.influxdbStats.httpResponseTime = millisecondsSince(startTime);
  438.  
  439. if (status !== 200) {
  440. self.log(protocolName + ' Error: ' + status);
  441. }
  442. });
  443.  
  444. req.on('error', function (e, i) {
  445. self.log(e);
  446. });
  447.  
  448. var payload = JSON.stringify(points)
  449. self.influxdbStats.payloadSize = Buffer.byteLength(payload);
  450.  
  451. self.logDebug(function () {
  452. var size = (self.influxdbStats.payloadSize / 1024).toFixed(2);
  453. return 'Payload size ' + size + ' KB';
  454. });
  455.  
  456. req.write(payload);
  457. req.end();
  458. }
  459.  
  460. InfluxdbBackend.prototype.httpPOST_v09 = function (points) {
  461. /* Do not send if there are no points. */
  462. if (!points.length) { return; }
  463.  
  464. var self = this,
  465. query = {u: self.user, p: self.pass},
  466. protocolName = self.protocol == http ? 'HTTP' : 'HTTPS',
  467. startTime;
  468.  
  469. self.logDebug(function () {
  470. return 'Sending ' + points.length + ' different points via ' + protocolName;
  471. });
  472.  
  473. self.influxdbStats.numStats = points.length;
  474.  
  475. var options = {
  476. hostname: self.host,
  477. port: self.port,
  478. path: '/write?' + querystring.stringify(query),
  479. method: 'POST',
  480. agent: false // Is it okay to use "undefined" here? (keep-alive)
  481. };
  482.  
  483. var req = self.protocol.request(options);
  484.  
  485. req.on('socket', function (res) {
  486. startTime = process.hrtime();
  487. });
  488.  
  489. req.on('response', function (res) {
  490. var status = res.statusCode;
  491.  
  492. self.influxdbStats.httpResponseTime = millisecondsSince(startTime);
  493.  
  494. if (status >= 400) {
  495. self.log(protocolName + ' Error: ' + status);
  496. }
  497. });
  498.  
  499. req.on('error', function (e, i) {
  500. self.log(e);
  501. });
  502.  
  503. var payload = JSON.stringify({
  504. database: self.database,
  505. points: points
  506. });
  507.  
  508. self.influxdbStats.payloadSize = Buffer.byteLength(payload);
  509.  
  510. self.logDebug(function () {
  511. var size = (self.influxdbStats.payloadSize / 1024).toFixed(2);
  512. return 'Payload size ' + size + ' KB';
  513. });
  514.  
  515. req.write(payload);
  516. req.end();
  517. }
  518.  
  519. InfluxdbBackend.prototype.configCheck = function () {
  520. var self = this,
  521. success = true;
  522.  
  523. /* Make sure the database name is configured. */
  524. if (!self.database) {
  525. self.log('Missing config option: database');
  526. success = false;
  527. }
  528.  
  529. return success;
  530. }
  531.  
  532. exports.init = function (startupTime, config, events) {
  533. var influxdb = new InfluxdbBackend(startupTime, config, events);
  534.  
  535. return influxdb.configCheck();
  536. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement