Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * This version support only compiled code and works with streams API
- */
- const debug = require('debug')('NetFlowV9');
- const dgram = require('dgram');
- const clone = require('clone');
- const util = require('util');
- const eventEmitter = require('events');
- const Dequeue = require('dequeue');
- const nft = require('./js/nf9/nftypes');
- const nf1PktDecode = require('./js/nf1/nf1decode');
- const nf5PktDecode = require('./js/nf5/nf5decode');
- const nf7PktDecode = require('./js/nf7/nf7decode');
- const nf9PktDecode = require('./js/nf9/nf9decode');
- class NetFlowV9 extends eventEmitter {
- constructor(options) {
- super();
- this.templates = {};
- this.nfTypes = clone(nft.nfTypes);
- this.nfScope = clone(nft.nfScope);
- this.cb = null;
- this.templateCb = null;
- this.socketType = 'udp4';
- this.port = null;
- this.proxy = null;
- this.fifo = new Dequeue();
- if (typeof options == 'function') this.cb = options; else
- if (typeof options.cb == 'function') this.cb = options.cb;
- if (typeof options.templateCb == 'function') this.templateCb = options.templateCb;
- if (typeof options == 'object') {
- if (options.ipv4num) decIpv4Rule[4] = "o['$name']=buf.readUInt32BE($pos);";
- if (options.nfTypes) this.nfTypes = util._extend(this.nfTypes,options.nfTypes); // Inherit nfTypes
- if (options.nfScope) this.nfScope = util._extend(this.nfScope,options.nfScope); // Inherit nfTypes
- if (options.socketType) this.socketType = options.socketType;
- if (options.port) this.port = options.port;
- if (options.templates) this.templates = options.templates;
- if (options.fwd) this.fwd = options.fwd;
- if (typeof options.proxy == 'object' || typeof options.proxy == 'string') {
- this.proxy = [];
- if (typeof options.proxy == 'string') {
- debug('Defining proxy destination %s',options.proxy);
- var m = options.proxy.match(/^(.*)(\:(\d+))$/);
- if (m) {
- this.proxy.push({host: m[1], port: m[3]||5555});
- debug('Proxy added %s:%s',m[1],m[3]||5555);
- }
- } else {
- for (var k in options.proxy) {
- var v = options.proxy[k];
- if (typeof v == 'string') {
- debug('Defining proxy destination %s = %s',k,v);
- var m = v.match(/^(.*)(\:(\d+))$/);
- if (m) {
- this.proxy.push({host: m[1], port: m[3]||5555});
- debug('Proxy added %s:%s',m[1],m[3]||5555);
- }
- }
- }
- }
- if (this.proxy.length == 0) {
- this.proxy = null;
- }
- }
- eventEmitter.call(this,options);
- }
- this.server = dgram.createSocket(this.socketType);
- this.server.on('message',(msg,rinfo) => {
- this.fifo.push([msg, rinfo]);
- if (!this.closed && this.set) {
- this.set = false;
- setImmediate(this.fetch);
- }
- if (this.proxy) { // Resend the traffic
- this.proxy.forEach((p) => {
- this.server.send(msg,0,msg.length,p.port,p.host,function() {});
- });
- }
- });
- this.server.on('close', () => {
- this.closed = true;
- });
- if (this.port){
- this.listen(options.port, options.host)
- };
- }
- fetch() {
- while (this.fifo.length > 0 && !this.closed) {
- var data = this.fifo.shift();
- var msg = data[0];
- var rinfo = data[1];
- var startTime = new Date().getTime();
- if (this.fwd) {
- var data = JSON.parse(msg.toString());
- msg = new Buffer(data.buffer);
- rinfo = data.rinfo;
- }
- if (rinfo.size<20) return;
- var o = this.nfPktDecode(msg,rinfo);
- var timeMs = (new Date().getTime()) - startTime;
- debug('Flows length => '+o.flows.length);
- if (o && o.flows.length > 0) { // If the packet does not contain flows, only templates we do not decode
- o.rinfo = rinfo;
- o.packet = msg;
- o.decodeMs = timeMs;
- if (this.cb)
- this.cb(o);
- else
- this.emit('data',o);
- } else if (o && o.templates) {
- o.rinfo = rinfo;
- o.packet = msg;
- o.decodeMs = timeMs;
- if (this.templateCb)
- this.templateCb(o);
- else
- this.emit('template', o);
- } else {
- debug('Undecoded flows',o);
- }
- }
- this.set = true;
- };
- listen(port,host,cb) {
- this.fetch();
- setTimeout(() => {
- if (host && typeof host === 'function')
- this.server.bind(port,host);
- else if (host && typeof host === 'string' && cb)
- this.server.bind(port,host,cb);
- else if (host && typeof host === 'string' && !cb)
- this.server.bind(port,host);
- else if (!host && cb)
- this.server.bind(port, cb);
- else
- this.server.bind(port);
- },50);
- };
- addTemplate (template,rinfo) {
- const id = rinfo.address + ':' + rinfo.port;
- const tId = Object.keys(template[id])[0];
- this.templates[tId] = template[id][tId];
- console.log(this.templates);
- }
- nfPktDecode(msg,rinfo) {
- const version = msg.readUInt16BE(0);
- debug('entering into nfPktDecode with version =>'+version);
- switch (version) {
- case 1:
- return nf1PktDecode(msg,rinfo);
- case 5:
- return nf5PktDecode(msg,rinfo);
- case 7:
- return nf7PktDecode(msg,rinfo);
- case 9:
- return nf9PktDecode(msg,rinfo,this.templates);
- default:
- debug('bad header version %d', version);
- return;
- }
- }
- }
- module.exports = NetFlowV9;
Advertisement
Add Comment
Please, Sign In to add comment