Advertisement
Guest User

Untitled

a guest
Jul 22nd, 2016
95
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 16.12 KB | None | 0 0
  1. #!/usr/bin/perl
  2. use strict;
  3. use IO::Socket::INET;
  4. use DBI;
  5. use POSIX;
  6. use feature qw { switch };
  7. use threads;
  8. use threads::shared;
  9. use Thread;
  10. use Thread::Queue;
  11. use Thread::State;
  12. use Thread::Running;
  13. use Data::Dumper;
  14. use Net::Subnet;
  15.  
  16.  
  17. my $log_level = 12;
  18. my ($result,$request,$dbi,@v5thread,@v5queue,@v9thread,@v9queue,@thread,@template,@data_queue,$ases);
  19. my %deviceid;
  20. my @j:shared;
  21. my ($interval) = 300;#time interval
  22. #--------- Connect to DB --------------
  23. my ($dbname) = 'flow';
  24. my ($dbhost) = '127.0.0.1';
  25. #my ($dbhost) = '0';
  26. my ($dbusr) = 'netflow';
  27. my ($dbpass) = 'netflow';
  28. if ($dbhost == '0') {
  29. flog (10,0,"Connect to DB via socket...\n");
  30. $dbi = "DBI:mysql:$dbname";
  31. } else {
  32. flog (10,0,"Connect to DB via network...\n");
  33. $dbi = "DBI:mysql:$dbname;host=$dbhost";
  34. }
  35. my ($db) = DBI->connect("$dbi", "$dbusr", "$dbpass") || die "Could not connect to database: $DBI::errstr";
  36. #init thread-writer to sql
  37. my $ip4queue:shared = Thread::Queue->new;
  38. my $ip4thread=threads->new(\&datatosql,$dbname,$dbhost,$dbusr,$dbpass,$dbi);
  39.  
  40. #--------------- Read AS data from DB
  41. $request = $db->prepare('select * from as_names');;
  42. $request->execute;
  43. $ases=$request->fetchall_hashref('id');
  44. $request->finish;
  45.  
  46. my $asnet_request = $db->prepare('select as_net from as_data where as_id = ?');
  47. for my $as_id (keys %$ases) {
  48. $asnet_request->execute($as_id);
  49. while (my $as_data = $asnet_request->fetchrow_array) {
  50. push (@{$ases->{$as_id}->{nets}}, $as_data);
  51. # $asnet_request->fetchall_array;
  52. }
  53. $ases->{$as_id}->{matcher} = subnet_matcher(@{$ases->{$as_id}->{nets}});
  54. }
  55. $asnet_request->finish;
  56. #--------- Init Socket ----------------
  57. my ($socket,$received_data,$peer_address,$peer_port, $version);
  58. $socket = new IO::Socket::INET (
  59. LocalPort => '9801',
  60. Proto => 'udp',
  61. ) or die "ERROR in Socket Creation : $!\n";
  62. #--------- Reading Socket -------------
  63. while(1) {
  64. my ($recieved_data);
  65. while(!$recieved_data) {
  66. $socket->recv($recieved_data,0xFFFF);
  67. $peer_address = unpack("N", $socket->peeraddr());
  68. }
  69. $version = unpack("n", substr($recieved_data,0,2));
  70. given ( $version ) {
  71. when ( 5 ) { fv5($recieved_data,$peer_address); }
  72. when ( 9 ) { fv9($recieved_data,$peer_address); }
  73. }
  74. }
  75. #Joining threads
  76. foreach my $thr(threads->list) {
  77. $thr->join;
  78. }
  79. #Closing Socket
  80. $socket->close();
  81. $db->disconnect();
  82.  
  83. sub ip2as {
  84. my $ip = shift;
  85. my $as_found = 0;
  86. for my $as_id (sort keys %$ases) {
  87. my $m = $ases->{$as_id}->{matcher};
  88. $as_found = $as_id if ($m->($ip));
  89. }
  90. #print "as: $as_found\n";
  91. return $as_found;
  92.  
  93. }
  94.  
  95. sub fv5 {
  96. my ($data,$src)=@_;
  97. flog (10,0,"got v5 from ". dec2ip($src) ." - ".length($data)." bytes\n");
  98. my $devid = getdevid($src);
  99. if (exists($v5thread[$devid]) and $v5thread[$devid]->is_running) {
  100. flog(10,0,"Thread #$devid exists; Sending data...\n");
  101. $v5queue[$devid]->enqueue("$data");
  102. } else {
  103. flog(10,0, "Thread #$devid not exists. Starting up and sending data...\n");
  104. $v5queue[$devid]=Thread::Queue->new;
  105. $v5queue[$devid]->enqueue("$data");
  106. $v5thread[$devid]=threads->new(\&v5thread,$src,$devid);
  107. # print $data;
  108. }
  109. }
  110.  
  111. sub fv9 {
  112. my ($data,$src)=@_;
  113. flog (10,0,"\ngot v9 from ". dec2ip($src) ." - ".length($data)." bytes\n");
  114. my $devid = getdevid($src);
  115. if (exists($v9thread[$devid]) and $v9thread[$devid]->is_running) {
  116. print "Thread #$devid exists; Sending data...\n";
  117. $v9queue[$devid]->enqueue("$data");
  118. } else {
  119. print "Thread #$devid not exists. Starting up and sending data...\n";
  120. $v9queue[$devid]=Thread::Queue->new;
  121. $v9queue[$devid]->enqueue("$data");
  122. $v9thread[$devid]=threads->new(\&v9thread,$src,$devid);
  123. }
  124. }
  125.  
  126. #--------- IP converters --------------
  127. #---- (thanks to Patrick H. Piper) ----
  128. sub dec2ip ($) {
  129. join '.', unpack 'C4', pack 'N', shift;
  130. }
  131. sub ip2dec ($) {
  132. unpack N => pack CCCC => split /\./ => shift;
  133. }
  134.  
  135. sub flog {
  136. my (%log_msg) = ('1' => '',
  137. '2' => '',
  138. '3' => '',
  139. '4' => '',
  140. '5' => 'Init threads:',
  141. '6' => 'done'
  142. );
  143. if (($_[0] >= $log_level) && ($_[1] != 0)) {
  144. print $log_msg{$_[1]},"\n";
  145. } elsif (($_[0] >= $log_level) && ($_[1] == 0)) {
  146. print $_[2];
  147. }
  148. };
  149.  
  150. sub getdevid {
  151. my ($src) = @_;
  152. if (exists($deviceid{"$src"})) {
  153. flog(10,0,"There are device in memory. ");
  154. } else {
  155. print "There are no device with IP ". dec2ip($src) ." in memory. Searching in database... ";
  156. $request = $db->prepare('SELECT `device_id` FROM `devices` WHERE `device_header`='.$src);
  157. $request->execute;
  158. if ($result=$request->fetchrow_hashref) {
  159. print "- device found in DB. ";
  160. } else {
  161. print "- device not found. Adding new device... New ";
  162. $db->do("INSERT INTO devices (`device_header`) VALUES($src)");
  163. $request = $db->prepare('SELECT `device_id` FROM `devices` WHERE `device_header`='.$src);
  164. $request->execute;
  165. $result=$request->fetchrow_hashref;
  166. }
  167. $deviceid{"$src"}=$result->{device_id};
  168. }
  169. # print "Id=".$deviceid{"$src"}."\n";
  170. return $deviceid{"$src"};
  171. }
  172.  
  173. sub v5thread {
  174. my ($src,$devid)=@_;
  175. my (@header, @substr, $data, $i);
  176. print "Started thread v5 for device ".dec2ip($src)." with ID$devid\n";
  177. while (1) {
  178. if ($v5queue[$devid]->pending) {
  179. flog (10,0,"Thread #$devid got a data...\n");
  180. $data = $v5queue[$devid]->dequeue;
  181. @header = unpack("nnN4NNNHHH2", substr($data,0,24));
  182. my @records;
  183. for ($i=0; substr($data, 24+48*$i, 48); $i++) {
  184. @substr = unpack("N3n2N4n2C4n2C2n", substr($data, 24+48*$i, 48));
  185. push(@records, join(',', @substr));
  186. # print join(',', @substr)."\n";
  187. }
  188. flog (10,0,"Sending data for processing...\n");
  189. my @values = ip4process($devid,$header[3],$header[2],@records);
  190. # print "INSERT INTO `ip4temp` VALUES\n".join (",\n", @values)."\n";
  191. $ip4queue->enqueue(@values);
  192. } else {
  193. sleep 1;
  194. }
  195. }
  196. print "thread stopped\n";
  197. }
  198.  
  199. sub v9thread {
  200. my ($src,$devid)=@_;
  201. my ($i,$j, $count, $template, @template_format, @template_header, @template_length, @records);
  202. print "\nStarted thread v9 for device ".dec2ip($src)." with ID$devid\n";
  203. while (1) {
  204. if ($v9queue[$devid]->pending) {
  205. my ($data) = $v9queue[$devid]->dequeue;
  206. # open (MYFILE, '>data.txt');
  207. # print MYFILE $data;
  208. # close (MYFILE);
  209.  
  210. my ($version, $count, $system_uptime, $unix_seconds, $package_sequence, $source_id) = unpack("n2N4", substr($data,0,20));
  211. flog (10,0,"\nThread #$devid got a data - ".length($data)." bytes\n");
  212. my ($marker) = 20;
  213. for ($i=0; $i < $count; $i++) {
  214. my ($flowset_id, $length) = unpack("n2", substr($data,$marker,4));
  215. if ($flowset_id == 0) {
  216. # Template processing
  217. $length += $marker;
  218. $marker += 4;
  219. while ($marker < $length) {
  220. my ($template_id, $field_count) = unpack("n2",substr($data,$marker,$marker+4));
  221. $marker += 4;
  222. $template = substr ($data, $marker, $field_count * 4);
  223. my (@n,@l);
  224. for ($j=0; $j<$field_count;$j++) {
  225. push @n, unpack("n", substr($template, $j*4,2));
  226. push @l, (unpack("n", substr($template, $j*4+2,2))*2);
  227. }
  228. $template_format[$template_id] = "H".join("H",@l);
  229. $template_header[$template_id] = join(",",@n);
  230. $template_length[$template_id] = 0;
  231. $template_length[$template_id] += $_/2 foreach @l;
  232. print "[$template_id] $template_header[$template_id]\n$template_format[$template_id] ($template_length[$template_id])\n";
  233. $marker += $field_count*4;
  234. }
  235. } else {
  236. my ($template_id) = $flowset_id;
  237. if ($template_length[$template_id]>0) {
  238. # print "[$template_id] $template_header[$template_id]\n$template_format[$template_id] ($template_length[$template_id])\n";
  239. ($j, @records) = fv9_records(substr($data, $marker, $length), $length, $template_format[$template_id],$template_length[$template_id]);
  240. $i += $j;
  241. $marker += $template_length[$template_id]*$j+4;
  242. print join ("\n", @records),"\n";
  243. discover ($template_header[$template_id], $devid, $unix_seconds, $system_uptime, @records);
  244. }
  245. }
  246. }
  247. flog (10,0, "$marker bytes in ".($i-1)." FlowSets processed\n");
  248. } else {
  249. sleep 1;
  250. }
  251. }
  252. }
  253.  
  254. sub discover {
  255. my ($template, $devid,$datetime,$sysuptime, @records)=@_;
  256. my (@val,@result, $i);
  257. print "\nTrying to discover template ($template)\n";
  258. my @fields = split(",", $template);
  259. my %fieldtype = (
  260. '22' => 'start',
  261. '21' => 'end',
  262. '8' => 'srcaddr',
  263. '12' => 'dstaddr',
  264. '15' => 'nexthop',
  265. '10' => 'input',
  266. '14' => 'output',
  267. '2' => 'dpkts',
  268. '1' => 'doctets',
  269. '7' => 'srcport',
  270. '11' => 'dstport',
  271. '6' => 'tcp_flags',
  272. '4' => 'prot',
  273. '5' => 'tos',
  274. '16' => 'src_as',
  275. '17' => 'dst_as',
  276. '9' => 'src_mask',
  277. '13' => 'dst_mask'
  278. );
  279. my %ipv4 = (
  280. '8' => 0,
  281. '12' => 1,
  282. '15' => 2,
  283. '10' => 3,
  284. '14' => 4,
  285. '2' => 5,
  286. '1' => 6,
  287. '22' => 7,
  288. '21' => 8,
  289. '7' => 9,
  290. '11' => 10,
  291. # 'pad' => 11,
  292. '6' => 12,
  293. '4' => 13,
  294. '5' => 14,
  295. '16' => 15,
  296. '17' => 16,
  297. '9' => 17,
  298. '13' => 18,
  299. # 'pad2' => 19
  300. '61' => 20
  301. );
  302. foreach (@fields) {
  303. if (exists $ipv4{"$_"}) {
  304. push @val, $ipv4{"$_"};
  305. } else {push @val, -1};
  306. }
  307. print "formed up values:" . join (",", @val) . "\n";
  308. foreach (@records) {
  309. my @str = split (",", $_);
  310. my @str1 = (0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0);
  311. for ($i=0;$i < @str; $i++) {
  312. if ("$val[$i]" >= 0) {
  313. $str1[$val[$i]]=hex($str[$i]);
  314. }
  315. }
  316. push @result, join(",", @str1);
  317. }
  318. print join ("\n", @result)."\n";
  319. my @values = ip4process($devid,$datetime,$sysuptime,@result);
  320. $ip4queue->enqueue(@values);
  321. }
  322.  
  323. sub fv9_records {
  324. my ($data, $length, $template_format, $template_length) = @_;
  325. my $marker = 4;
  326. my ($count, @substr, @records);
  327. while ($marker+$template_length <= $length) {
  328. my $substr = substr ($data, $marker, $template_length);
  329. @substr = unpack ($template_format, $substr);
  330. $marker += $template_length;
  331. $count ++;
  332. push @records, join (",", @substr);
  333. # print join (",", @substr), " ($marker $count)\n";
  334. }
  335. return $count, @records;
  336. };
  337.  
  338. sub ip4process {
  339. my ($devid,$datetime,$sysuptime,@data) = @_;
  340. my ($start, $end, $istart, $iend, $timerange, $deltapkts, $timerange, $deltaoctets, $pkts ,$octets, $i, $j, $result, $ipkts, @values);
  341. foreach my $row (@data) {
  342. my @result = split(",",$row);
  343. $start = $datetime-ceil(($sysuptime - $result[7])/1000);
  344. $end = $datetime-ceil(($sysuptime - $result[8])/1000);
  345. $istart = floor($start/$interval)*$interval;
  346. $iend = floor($end/$interval)*$interval;
  347. $timerange = $end-$start;
  348. # Fill src_as, dst_as from IP addresses:
  349. $result[15] = ip2as($result[0]);
  350. $result[16] = ip2as($result[1]);
  351. # print "r: $row \n";
  352.  
  353.  
  354. if ($istart == $iend) {
  355. push (@values,"(0,'".
  356. $devid."','".
  357. $iend."','".
  358. join("','", @result[0..6,9..10,12..18])."')");
  359. } elsif (($istart<$iend) and (floor($result[6]/$timerange)>0)) {
  360. $deltapkts = $result[5] / $timerange;
  361. $deltaoctets = $result[6] / $result[5];
  362. $pkts = floor(($istart+$interval-$start)*($deltapkts));
  363. $octets = $pkts * floor($deltaoctets);
  364. if ($pkts > 0) {
  365. push (@values, "(0,'".
  366. $devid."','".
  367. $istart."','".
  368. join("','", @result[0..4])."','".
  369. $pkts."','".
  370. $octets."','".
  371. join("','", @result[9..10,12..18])."')");
  372. }
  373. my ($intervals) = floor(($iend-$istart)/$interval);
  374. for ($i=1; $i<$intervals; $i++) {
  375. $ipkts += $deltapkts*$interval;
  376. if (floor($ipkts)>0) {
  377. $octets = floor($deltaoctets)*floor($ipkts);
  378. push (@values,"(0,'".
  379. $devid."','".
  380. ($istart+$interval*$i)."','".
  381. join("','", @result[0..4])."','".
  382. floor($ipkts)."','".
  383. $octets."','".
  384. join("','", @result[9..10,12..18])."')");
  385. $pkts += floor($ipkts);
  386. $ipkts -= floor($ipkts);
  387. }
  388. }
  389. $octets = $result[6]-floor($deltaoctets)*$pkts;
  390. $pkts = $result[5] - $pkts;
  391. if ($pkts>0) {
  392. push (@values, "(0,'".
  393. $devid."','".
  394. $iend."','".
  395. join("','", @result[0..4])."','".
  396. $pkts."','".
  397. $octets."','".
  398. join("','", @result[9..10,12..18])."')");
  399. }
  400. } else { print "bad record\n"; }
  401. }
  402. # print "INSERT INTO `ip4temp` VALUES\n".join (",\n", @values)."\n";
  403. # print "v: " . join (", \n", @values) . "\n";
  404. return (@values);
  405. }
  406.  
  407. sub datatosql {
  408. my @data;
  409. my ($dbname,$dbhost,$dbusr,$dbpass,$dbi)=@_;
  410. my ($db) = DBI->connect("$dbi", "$dbusr", "$dbpass") || die "Could not connect to database: $DBI::errstr";
  411. while (1) {
  412. if ($ip4queue->pending) {
  413. push (@data, $ip4queue->dequeue);
  414. if (scalar(@data) >= 512) {
  415. flog(10,0,"Achieved ".scalar(@data)." records. Writing into database.\n");
  416. # print "INSERT INTO `ip4temp` VALUES\n".join (",\n", @data)."\n";
  417. $request = $db->prepare('SELECT COUNT(*) FROM `ip4temp`');
  418. $request->execute;
  419. if ($request->fetchrow >= 20000) {
  420. $db->do("INSERT INTO `ip4graph` (`device_id`,`dtime`,`srcaddr`,`dstaddr`,`nexthop`,`input`,`output`,`dpkts`,`doctets`,`srcport`,`dstport`,`tcp_flags`,`prot`,`tos`,`src_as`,`dst_as`,`src_mask`,`dst_mask`)
  421. SELECT `device_id`,`dtime`,`srcaddr`,`dstaddr`,`nexthop`,`input`,`output`,SUM(`dpkts`) AS `dpkts`,SUM(`doctets`) AS `doctets`,`srcport`,`dstport`,`tcp_flags`,`prot`,`tos`,`src_as`,`dst_as`,`src_mask`,`dst_mask`
  422. FROM `ip4temp`
  423. GROUP BY `device_id`,`dtime`,`srcaddr`,`dstaddr`,`nexthop`,`input`,`output`,`srcport`,`dstport`,`tcp_flags`,`prot`,`tos`,`src_as`,`dst_as`");
  424. $db->do("TRUNCATE `ip4temp`");
  425. }
  426. # print "in the table ".$request->fetchrow." rows\n";
  427. $request->finish;
  428. $db->do ("INSERT INTO `ip4temp` VALUES\n".join (",\n", @data)."\n");
  429. @data = ();
  430. }
  431. } else { sleep 5 }
  432. }
  433. $db->disconnect();
  434. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement