Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/perl
- use strict;
- use IO::Socket::INET;
- use DBI;
- use POSIX;
- use feature qw { switch };
- use threads;
- use threads::shared;
- use Thread;
- use Thread::Queue;
- use Thread::State;
- use Thread::Running;
- use Data::Dumper;
- use Net::Subnet;
- my $log_level = 12;
- my ($result,$request,$dbi,@v5thread,@v5queue,@v9thread,@v9queue,@thread,@template,@data_queue,$ases);
- my %deviceid;
- my @j:shared;
- my ($interval) = 300;#time interval
- #--------- Connect to DB --------------
- my ($dbname) = 'flow';
- my ($dbhost) = '127.0.0.1';
- #my ($dbhost) = '0';
- my ($dbusr) = 'netflow';
- my ($dbpass) = 'netflow';
- if ($dbhost == '0') {
- flog (10,0,"Connect to DB via socket...\n");
- $dbi = "DBI:mysql:$dbname";
- } else {
- flog (10,0,"Connect to DB via network...\n");
- $dbi = "DBI:mysql:$dbname;host=$dbhost";
- }
- my ($db) = DBI->connect("$dbi", "$dbusr", "$dbpass") || die "Could not connect to database: $DBI::errstr";
- #init thread-writer to sql
- my $ip4queue:shared = Thread::Queue->new;
- my $ip4thread=threads->new(\&datatosql,$dbname,$dbhost,$dbusr,$dbpass,$dbi);
- #--------------- Read AS data from DB
- $request = $db->prepare('select * from as_names');;
- $request->execute;
- $ases=$request->fetchall_hashref('id');
- $request->finish;
- my $asnet_request = $db->prepare('select as_net from as_data where as_id = ?');
- for my $as_id (keys %$ases) {
- $asnet_request->execute($as_id);
- while (my $as_data = $asnet_request->fetchrow_array) {
- push (@{$ases->{$as_id}->{nets}}, $as_data);
- # $asnet_request->fetchall_array;
- }
- $ases->{$as_id}->{matcher} = subnet_matcher(@{$ases->{$as_id}->{nets}});
- }
- $asnet_request->finish;
- #--------- Init Socket ----------------
- my ($socket,$received_data,$peer_address,$peer_port, $version);
- $socket = new IO::Socket::INET (
- LocalPort => '9801',
- Proto => 'udp',
- ) or die "ERROR in Socket Creation : $!\n";
- #--------- Reading Socket -------------
- while(1) {
- my ($recieved_data);
- while(!$recieved_data) {
- $socket->recv($recieved_data,0xFFFF);
- $peer_address = unpack("N", $socket->peeraddr());
- }
- $version = unpack("n", substr($recieved_data,0,2));
- given ( $version ) {
- when ( 5 ) { fv5($recieved_data,$peer_address); }
- when ( 9 ) { fv9($recieved_data,$peer_address); }
- }
- }
- #Joining threads
- foreach my $thr(threads->list) {
- $thr->join;
- }
- #Closing Socket
- $socket->close();
- $db->disconnect();
- sub ip2as {
- my $ip = shift;
- my $as_found = 0;
- for my $as_id (sort keys %$ases) {
- my $m = $ases->{$as_id}->{matcher};
- $as_found = $as_id if ($m->($ip));
- }
- #print "as: $as_found\n";
- return $as_found;
- }
- sub fv5 {
- my ($data,$src)=@_;
- flog (10,0,"got v5 from ". dec2ip($src) ." - ".length($data)." bytes\n");
- my $devid = getdevid($src);
- if (exists($v5thread[$devid]) and $v5thread[$devid]->is_running) {
- flog(10,0,"Thread #$devid exists; Sending data...\n");
- $v5queue[$devid]->enqueue("$data");
- } else {
- flog(10,0, "Thread #$devid not exists. Starting up and sending data...\n");
- $v5queue[$devid]=Thread::Queue->new;
- $v5queue[$devid]->enqueue("$data");
- $v5thread[$devid]=threads->new(\&v5thread,$src,$devid);
- # print $data;
- }
- }
- sub fv9 {
- my ($data,$src)=@_;
- flog (10,0,"\ngot v9 from ". dec2ip($src) ." - ".length($data)." bytes\n");
- my $devid = getdevid($src);
- if (exists($v9thread[$devid]) and $v9thread[$devid]->is_running) {
- print "Thread #$devid exists; Sending data...\n";
- $v9queue[$devid]->enqueue("$data");
- } else {
- print "Thread #$devid not exists. Starting up and sending data...\n";
- $v9queue[$devid]=Thread::Queue->new;
- $v9queue[$devid]->enqueue("$data");
- $v9thread[$devid]=threads->new(\&v9thread,$src,$devid);
- }
- }
- #--------- IP converters --------------
- #---- (thanks to Patrick H. Piper) ----
- sub dec2ip ($) {
- join '.', unpack 'C4', pack 'N', shift;
- }
- sub ip2dec ($) {
- unpack N => pack CCCC => split /\./ => shift;
- }
- sub flog {
- my (%log_msg) = ('1' => '',
- '2' => '',
- '3' => '',
- '4' => '',
- '5' => 'Init threads:',
- '6' => 'done'
- );
- if (($_[0] >= $log_level) && ($_[1] != 0)) {
- print $log_msg{$_[1]},"\n";
- } elsif (($_[0] >= $log_level) && ($_[1] == 0)) {
- print $_[2];
- }
- };
- sub getdevid {
- my ($src) = @_;
- if (exists($deviceid{"$src"})) {
- flog(10,0,"There are device in memory. ");
- } else {
- print "There are no device with IP ". dec2ip($src) ." in memory. Searching in database... ";
- $request = $db->prepare('SELECT `device_id` FROM `devices` WHERE `device_header`='.$src);
- $request->execute;
- if ($result=$request->fetchrow_hashref) {
- print "- device found in DB. ";
- } else {
- print "- device not found. Adding new device... New ";
- $db->do("INSERT INTO devices (`device_header`) VALUES($src)");
- $request = $db->prepare('SELECT `device_id` FROM `devices` WHERE `device_header`='.$src);
- $request->execute;
- $result=$request->fetchrow_hashref;
- }
- $deviceid{"$src"}=$result->{device_id};
- }
- # print "Id=".$deviceid{"$src"}."\n";
- return $deviceid{"$src"};
- }
- sub v5thread {
- my ($src,$devid)=@_;
- my (@header, @substr, $data, $i);
- print "Started thread v5 for device ".dec2ip($src)." with ID$devid\n";
- while (1) {
- if ($v5queue[$devid]->pending) {
- flog (10,0,"Thread #$devid got a data...\n");
- $data = $v5queue[$devid]->dequeue;
- @header = unpack("nnN4NNNHHH2", substr($data,0,24));
- my @records;
- for ($i=0; substr($data, 24+48*$i, 48); $i++) {
- @substr = unpack("N3n2N4n2C4n2C2n", substr($data, 24+48*$i, 48));
- push(@records, join(',', @substr));
- # print join(',', @substr)."\n";
- }
- flog (10,0,"Sending data for processing...\n");
- my @values = ip4process($devid,$header[3],$header[2],@records);
- # print "INSERT INTO `ip4temp` VALUES\n".join (",\n", @values)."\n";
- $ip4queue->enqueue(@values);
- } else {
- sleep 1;
- }
- }
- print "thread stopped\n";
- }
- sub v9thread {
- my ($src,$devid)=@_;
- my ($i,$j, $count, $template, @template_format, @template_header, @template_length, @records);
- print "\nStarted thread v9 for device ".dec2ip($src)." with ID$devid\n";
- while (1) {
- if ($v9queue[$devid]->pending) {
- my ($data) = $v9queue[$devid]->dequeue;
- # open (MYFILE, '>data.txt');
- # print MYFILE $data;
- # close (MYFILE);
- my ($version, $count, $system_uptime, $unix_seconds, $package_sequence, $source_id) = unpack("n2N4", substr($data,0,20));
- flog (10,0,"\nThread #$devid got a data - ".length($data)." bytes\n");
- my ($marker) = 20;
- for ($i=0; $i < $count; $i++) {
- my ($flowset_id, $length) = unpack("n2", substr($data,$marker,4));
- if ($flowset_id == 0) {
- # Template processing
- $length += $marker;
- $marker += 4;
- while ($marker < $length) {
- my ($template_id, $field_count) = unpack("n2",substr($data,$marker,$marker+4));
- $marker += 4;
- $template = substr ($data, $marker, $field_count * 4);
- my (@n,@l);
- for ($j=0; $j<$field_count;$j++) {
- push @n, unpack("n", substr($template, $j*4,2));
- push @l, (unpack("n", substr($template, $j*4+2,2))*2);
- }
- $template_format[$template_id] = "H".join("H",@l);
- $template_header[$template_id] = join(",",@n);
- $template_length[$template_id] = 0;
- $template_length[$template_id] += $_/2 foreach @l;
- print "[$template_id] $template_header[$template_id]\n$template_format[$template_id] ($template_length[$template_id])\n";
- $marker += $field_count*4;
- }
- } else {
- my ($template_id) = $flowset_id;
- if ($template_length[$template_id]>0) {
- # print "[$template_id] $template_header[$template_id]\n$template_format[$template_id] ($template_length[$template_id])\n";
- ($j, @records) = fv9_records(substr($data, $marker, $length), $length, $template_format[$template_id],$template_length[$template_id]);
- $i += $j;
- $marker += $template_length[$template_id]*$j+4;
- print join ("\n", @records),"\n";
- discover ($template_header[$template_id], $devid, $unix_seconds, $system_uptime, @records);
- }
- }
- }
- flog (10,0, "$marker bytes in ".($i-1)." FlowSets processed\n");
- } else {
- sleep 1;
- }
- }
- }
- sub discover {
- my ($template, $devid,$datetime,$sysuptime, @records)=@_;
- my (@val,@result, $i);
- print "\nTrying to discover template ($template)\n";
- my @fields = split(",", $template);
- my %fieldtype = (
- '22' => 'start',
- '21' => 'end',
- '8' => 'srcaddr',
- '12' => 'dstaddr',
- '15' => 'nexthop',
- '10' => 'input',
- '14' => 'output',
- '2' => 'dpkts',
- '1' => 'doctets',
- '7' => 'srcport',
- '11' => 'dstport',
- '6' => 'tcp_flags',
- '4' => 'prot',
- '5' => 'tos',
- '16' => 'src_as',
- '17' => 'dst_as',
- '9' => 'src_mask',
- '13' => 'dst_mask'
- );
- my %ipv4 = (
- '8' => 0,
- '12' => 1,
- '15' => 2,
- '10' => 3,
- '14' => 4,
- '2' => 5,
- '1' => 6,
- '22' => 7,
- '21' => 8,
- '7' => 9,
- '11' => 10,
- # 'pad' => 11,
- '6' => 12,
- '4' => 13,
- '5' => 14,
- '16' => 15,
- '17' => 16,
- '9' => 17,
- '13' => 18,
- # 'pad2' => 19
- '61' => 20
- );
- foreach (@fields) {
- if (exists $ipv4{"$_"}) {
- push @val, $ipv4{"$_"};
- } else {push @val, -1};
- }
- print "formed up values:" . join (",", @val) . "\n";
- foreach (@records) {
- my @str = split (",", $_);
- my @str1 = (0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0);
- for ($i=0;$i < @str; $i++) {
- if ("$val[$i]" >= 0) {
- $str1[$val[$i]]=hex($str[$i]);
- }
- }
- push @result, join(",", @str1);
- }
- print join ("\n", @result)."\n";
- my @values = ip4process($devid,$datetime,$sysuptime,@result);
- $ip4queue->enqueue(@values);
- }
- sub fv9_records {
- my ($data, $length, $template_format, $template_length) = @_;
- my $marker = 4;
- my ($count, @substr, @records);
- while ($marker+$template_length <= $length) {
- my $substr = substr ($data, $marker, $template_length);
- @substr = unpack ($template_format, $substr);
- $marker += $template_length;
- $count ++;
- push @records, join (",", @substr);
- # print join (",", @substr), " ($marker $count)\n";
- }
- return $count, @records;
- };
- sub ip4process {
- my ($devid,$datetime,$sysuptime,@data) = @_;
- my ($start, $end, $istart, $iend, $timerange, $deltapkts, $timerange, $deltaoctets, $pkts ,$octets, $i, $j, $result, $ipkts, @values);
- foreach my $row (@data) {
- my @result = split(",",$row);
- $start = $datetime-ceil(($sysuptime - $result[7])/1000);
- $end = $datetime-ceil(($sysuptime - $result[8])/1000);
- $istart = floor($start/$interval)*$interval;
- $iend = floor($end/$interval)*$interval;
- $timerange = $end-$start;
- # Fill src_as, dst_as from IP addresses:
- $result[15] = ip2as($result[0]);
- $result[16] = ip2as($result[1]);
- # print "r: $row \n";
- if ($istart == $iend) {
- push (@values,"(0,'".
- $devid."','".
- $iend."','".
- join("','", @result[0..6,9..10,12..18])."')");
- } elsif (($istart<$iend) and (floor($result[6]/$timerange)>0)) {
- $deltapkts = $result[5] / $timerange;
- $deltaoctets = $result[6] / $result[5];
- $pkts = floor(($istart+$interval-$start)*($deltapkts));
- $octets = $pkts * floor($deltaoctets);
- if ($pkts > 0) {
- push (@values, "(0,'".
- $devid."','".
- $istart."','".
- join("','", @result[0..4])."','".
- $pkts."','".
- $octets."','".
- join("','", @result[9..10,12..18])."')");
- }
- my ($intervals) = floor(($iend-$istart)/$interval);
- for ($i=1; $i<$intervals; $i++) {
- $ipkts += $deltapkts*$interval;
- if (floor($ipkts)>0) {
- $octets = floor($deltaoctets)*floor($ipkts);
- push (@values,"(0,'".
- $devid."','".
- ($istart+$interval*$i)."','".
- join("','", @result[0..4])."','".
- floor($ipkts)."','".
- $octets."','".
- join("','", @result[9..10,12..18])."')");
- $pkts += floor($ipkts);
- $ipkts -= floor($ipkts);
- }
- }
- $octets = $result[6]-floor($deltaoctets)*$pkts;
- $pkts = $result[5] - $pkts;
- if ($pkts>0) {
- push (@values, "(0,'".
- $devid."','".
- $iend."','".
- join("','", @result[0..4])."','".
- $pkts."','".
- $octets."','".
- join("','", @result[9..10,12..18])."')");
- }
- } else { print "bad record\n"; }
- }
- # print "INSERT INTO `ip4temp` VALUES\n".join (",\n", @values)."\n";
- # print "v: " . join (", \n", @values) . "\n";
- return (@values);
- }
- sub datatosql {
- my @data;
- my ($dbname,$dbhost,$dbusr,$dbpass,$dbi)=@_;
- my ($db) = DBI->connect("$dbi", "$dbusr", "$dbpass") || die "Could not connect to database: $DBI::errstr";
- while (1) {
- if ($ip4queue->pending) {
- push (@data, $ip4queue->dequeue);
- if (scalar(@data) >= 512) {
- flog(10,0,"Achieved ".scalar(@data)." records. Writing into database.\n");
- # print "INSERT INTO `ip4temp` VALUES\n".join (",\n", @data)."\n";
- $request = $db->prepare('SELECT COUNT(*) FROM `ip4temp`');
- $request->execute;
- if ($request->fetchrow >= 20000) {
- $db->do("INSERT INTO `ip4graph` (`device_id`,`dtime`,`srcaddr`,`dstaddr`,`nexthop`,`input`,`output`,`dpkts`,`doctets`,`srcport`,`dstport`,`tcp_flags`,`prot`,`tos`,`src_as`,`dst_as`,`src_mask`,`dst_mask`)
- SELECT `device_id`,`dtime`,`srcaddr`,`dstaddr`,`nexthop`,`input`,`output`,SUM(`dpkts`) AS `dpkts`,SUM(`doctets`) AS `doctets`,`srcport`,`dstport`,`tcp_flags`,`prot`,`tos`,`src_as`,`dst_as`,`src_mask`,`dst_mask`
- FROM `ip4temp`
- GROUP BY `device_id`,`dtime`,`srcaddr`,`dstaddr`,`nexthop`,`input`,`output`,`srcport`,`dstport`,`tcp_flags`,`prot`,`tos`,`src_as`,`dst_as`");
- $db->do("TRUNCATE `ip4temp`");
- }
- # print "in the table ".$request->fetchrow." rows\n";
- $request->finish;
- $db->do ("INSERT INTO `ip4temp` VALUES\n".join (",\n", @data)."\n");
- @data = ();
- }
- } else { sleep 5 }
- }
- $db->disconnect();
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement