Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Copyright 2014-15 Daniel Parry
- #
- # Licensed under the Artistic License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.perlfoundation.org/artistic_license_2_0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- # implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- package Sharpener::Receive::Carbon;
- use strict;
- use warnings;
- use Mojo::IOLoop;
- use Mojo::UserAgent;
- use Sharpener::Log;
- use Sharpener::Receive::CarbonFilter;
- use Sharpener::Time;
- our $VERSION = '0.42';
- sub new {
- my ( $class, $conf, $fork_manager, $state ) = @_;
- my ( $io_socket_inet, $select );
- my $carbon = {
- 'CarbonMax' => $conf->get('carbon_max'),
- 'Config' => $conf,
- 'Filter' => Sharpener::Receive::CarbonFilter->new( $conf, $state ),
- 'ForkManager' => $fork_manager,
- 'IOSocket' => $io_socket_inet,
- 'Select' => $select,
- };
- bless $carbon, $class;
- return $carbon;
- }
- sub _cleanup {
- my ($self) = @_;
- close $self->{'IOSocket'} or Sharpener::Log::error('carbon receive clean');
- exit 0;
- }
- sub _process {
- my ($self) = @_;
- foreach my $file_handle ( $self->{'Select'}->handles ) {
- ( $file_handle == $self->{'IOSocket'} )
- ? $self->_socket_create()
- : $self->_socket_process($file_handle);
- }
- return 1;
- }
- sub _update {
- my ( $self, $conf, $tx ) = @_;
- if ( $self->{'Filter'}->{'Updated'} ) {
- $self->{'Filter'}->check_targets_path();
- my $check_conf = $self->{'Config'}->get_carbon();
- $check_conf->{'carbon_enable'} or last;
- if ( ( $conf->{'carbon_conns'} ne $check_conf->{'carbon_conns'} )
- or ( $conf->{'carbon_addr'} ne $check_conf->{'carbon_addr'} )
- or ( $conf->{'carbon_port'} ne $check_conf->{'carbon_port'} ) )
- {
- Mojo::IOLoop->stop();
- return 1;
- }
- $self->{'CarbonMax'} = $check_conf->{'carbon_max'};
- $self->{'Filter'}->{'Interval'} = $check_conf->{'carbon_interval'};
- $self->{'Filter'}->{'Updated'} = 0;
- }
- else {
- $self->{'Filter'}->update($tx);
- }
- return 1;
- }
- sub _socket_create {
- my ($self) = @_;
- return $self->{'Select'}->add( $self->{'IOSocket'}->accept );
- }
- sub _socket_finish {
- my ( $self, $file_handle ) = @_;
- $self->{'Select'}->remove($file_handle);
- return $file_handle->close;
- }
- sub _socket_process {
- my ( $self, $file_handle ) = @_;
- my $data;
- my $receive_success = $file_handle->recv( $data, $self->{'CarbonMax'}, 0 );
- $data
- ? $self->{'Filter'}->match($data)
- : $self->_socket_finish($file_handle);
- return 1;
- }
- sub spawn {
- my ( $self, $conf ) = @_;
- my $ua = Mojo::UserAgent->new;
- $ua->websocket(
- 'ws://127.0.0.1:3000/ws_carbon' => sub {
- my ( $user_agent, $tx ) = @_;
- $tx->is_websocket or return 0;
- Mojo::IOLoop->recurring(
- 1 => sub {
- $self->{'Filter'}->check_targets();
- $self->{'ForkManager'}->parent_alive() or $self->_cleanup();
- }
- );
- Mojo::IOLoop->recurring(
- ( $self->{'Filter'}->{'Interval'} / 2 ) => sub {
- $self->_update( $conf, $tx );
- }
- );
- Mojo::IOLoop->singleton->reactor->io(
- $self->{'IOSocket'} => sub {
- my ( $reactor, $writable ) = @_;
- not $writable and $self->_process( $conf, $tx );
- $writable and Sharpener::Log::error('writable');
- }
- );
- }
- );
- Mojo::IOLoop->is_running or Mojo::IOLoop->start;
- return 1;
- }
- 1;
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement