Advertisement
Guest User

Untitled

a guest
Feb 3rd, 2015
221
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Perl 4.07 KB | None | 0 0
  1. # Copyright 2014-15 Daniel Parry
  2. #
  3. # Licensed under the Artistic License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. #     http://www.perlfoundation.org/artistic_license_2_0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  12. # implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15.  
  16. package Sharpener::Receive::Carbon;
  17. use strict;
  18. use warnings;
  19. use Mojo::IOLoop;
  20. use Mojo::UserAgent;
  21. use Sharpener::Log;
  22. use Sharpener::Receive::CarbonFilter;
  23. use Sharpener::Time;
  24.  
  25. our $VERSION = '0.42';
  26.  
  27. sub new {
  28.     my ( $class, $conf, $fork_manager, $state ) = @_;
  29.     my ( $io_socket_inet, $select );
  30.     my $carbon = {
  31.         'CarbonMax'   => $conf->get('carbon_max'),
  32.         'Config'      => $conf,
  33.         'Filter'      => Sharpener::Receive::CarbonFilter->new( $conf, $state ),
  34.         'ForkManager' => $fork_manager,
  35.         'IOSocket'    => $io_socket_inet,
  36.         'Select'      => $select,
  37.     };
  38.     bless $carbon, $class;
  39.     return $carbon;
  40. }
  41. sub _cleanup {
  42.     my ($self) = @_;
  43.     close $self->{'IOSocket'} or Sharpener::Log::error('carbon receive clean');
  44.     exit 0;
  45. }
  46.  
  47. sub _process {
  48.     my ($self) = @_;
  49.     foreach my $file_handle ( $self->{'Select'}->handles ) {
  50.         ( $file_handle == $self->{'IOSocket'} )
  51.           ? $self->_socket_create()
  52.           : $self->_socket_process($file_handle);
  53.     }
  54.     return 1;
  55. }
  56.  
  57. sub _update {
  58.     my ( $self, $conf, $tx ) = @_;
  59.     if ( $self->{'Filter'}->{'Updated'} ) {
  60.         $self->{'Filter'}->check_targets_path();
  61.         my $check_conf = $self->{'Config'}->get_carbon();
  62.         $check_conf->{'carbon_enable'} or last;
  63.         if (   ( $conf->{'carbon_conns'} ne $check_conf->{'carbon_conns'} )
  64.             or ( $conf->{'carbon_addr'} ne $check_conf->{'carbon_addr'} )
  65.             or ( $conf->{'carbon_port'} ne $check_conf->{'carbon_port'} ) )
  66.         {
  67.             Mojo::IOLoop->stop();
  68.             return 1;
  69.         }
  70.         $self->{'CarbonMax'}            = $check_conf->{'carbon_max'};
  71.         $self->{'Filter'}->{'Interval'} = $check_conf->{'carbon_interval'};
  72.         $self->{'Filter'}->{'Updated'}  = 0;
  73.     }
  74.     else {
  75.         $self->{'Filter'}->update($tx);
  76.     }
  77.     return 1;
  78. }
  79.  
  80. sub _socket_create {
  81.     my ($self) = @_;
  82.     return $self->{'Select'}->add( $self->{'IOSocket'}->accept );
  83. }
  84.  
  85. sub _socket_finish {
  86.     my ( $self, $file_handle ) = @_;
  87.     $self->{'Select'}->remove($file_handle);
  88.     return $file_handle->close;
  89. }
  90.  
  91. sub _socket_process {
  92.     my ( $self, $file_handle ) = @_;
  93.     my $data;
  94.     my $receive_success = $file_handle->recv( $data, $self->{'CarbonMax'}, 0 );
  95.     $data
  96.       ? $self->{'Filter'}->match($data)
  97.       : $self->_socket_finish($file_handle);
  98.     return 1;
  99. }
  100.  
  101. sub spawn {
  102.     my ( $self, $conf ) = @_;
  103.     my $ua = Mojo::UserAgent->new;
  104.     $ua->websocket(
  105.         'ws://127.0.0.1:3000/ws_carbon' => sub {
  106.             my ( $user_agent, $tx ) = @_;
  107.             $tx->is_websocket or return 0;
  108.             Mojo::IOLoop->recurring(
  109.                 1 => sub {
  110.                     $self->{'Filter'}->check_targets();
  111.                     $self->{'ForkManager'}->parent_alive() or $self->_cleanup();
  112.                 }
  113.             );
  114.             Mojo::IOLoop->recurring(
  115.                 ( $self->{'Filter'}->{'Interval'} / 2 ) => sub {
  116.                     $self->_update( $conf, $tx );
  117.                 }
  118.             );
  119.             Mojo::IOLoop->singleton->reactor->io(
  120.                 $self->{'IOSocket'} => sub {
  121.                     my ( $reactor, $writable ) = @_;
  122.                     not $writable and $self->_process( $conf, $tx );
  123.                     $writable and Sharpener::Log::error('writable');
  124.                 }
  125.             );
  126.         }
  127.     );
  128.     Mojo::IOLoop->is_running or Mojo::IOLoop->start;
  129.     return 1;
  130. }
  131.  
  132. 1;
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement