theanonym

YobaCoro.pm

May 28th, 2013
159
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Perl 2.80 KB | None | 0 0
  1. package YobaCoro;
  2.  
  3. use 5.10.1;
  4. use strict;
  5. use warnings;
  6.  
  7. use base "Exporter";
  8. our @EXPORT = qw/
  9.    watcher coro pool sleep
  10. /;
  11. our @EXPORT_OK = @EXPORT;
  12.  
  13. use Coro;
  14. use Coro::AnyEvent;
  15. use Coro::LWP;
  16. use AnyEvent;
  17.  
  18. sub sleep($)
  19. {
  20.    my($s) = @_;
  21.    Coro::AnyEvent::sleep($s);
  22.    return;
  23. }
  24.  
  25. sub watcher()
  26. {
  27.    return AnyEvent->timer(
  28.       interval => 2,
  29.       cb => sub {
  30.          my $time = time;
  31.          map {
  32.             say "YobaCoro: thread #$_->{id} ('$_->{desc}') terminated."
  33.                if $_->{debug};
  34.             $_->cancel(undef);
  35.          } grep {
  36.             $_->{timeout_at} && $time >= $_->{timeout_at}
  37.          } reverse Coro::State::list;
  38.       },
  39.    );
  40. }
  41.  
  42. sub coro($$;$)
  43. {
  44.    my($sub, $arg, $options_) = @_;
  45.    $options_ ||= {};
  46.  
  47.    my $options = {
  48.       debug   => 0,
  49.       desc    => "anon",
  50.       timeout => 0,
  51.       eval    => 0,
  52.       ready   => 0,
  53.       join    => 0,
  54.       %$options_,
  55.    };
  56.  
  57.    state $id = 1;
  58.  
  59.    my $coro; $coro = Coro->new(sub {
  60.       $coro->{id} = $id++;
  61.       $coro->{desc} = $options->{desc};
  62.       $coro->{timeout_at} = time + $options->{timeout}
  63.          if $options->{timeout};
  64.       $coro->{debug} = $options->{debug};
  65.  
  66.       say "YobaCoro: thread #$coro->{id} ('$coro->{desc}') started."
  67.          if $options->{debug};
  68.  
  69.       my $result;
  70.       if($options->{eval}) {
  71.          eval { $result = $sub->($arg) };
  72.          if($@ && $options->{debug}) {
  73.             chomp $@;
  74.             say "YobaCoro: thread #$coro->{id} ('$coro->{desc}') died: $@";
  75.          }
  76.       } else {
  77.          $result = $sub->($arg);
  78.       }
  79.  
  80.       say "YobaCoro: thread #$coro->{id} ('$coro->{desc}') finished."
  81.          if $options->{debug};
  82.  
  83.       return $result // undef;
  84.    });
  85.  
  86.    $coro->ready if $options->{ready} || $options->{join};
  87.    return $options->{join} ? $coro->join : $coro;
  88. }
  89.  
  90. sub pool($$;$)
  91. {
  92.    my($sub, $args, $options_) = @_;
  93.    $options_ ||= {};
  94.  
  95.    my $options = {
  96.       debug => 0,
  97.       desc  => "anon",
  98.       ready => 0,
  99.       join  => 0,
  100.       limit => 0,
  101.       %$options_,
  102.    };
  103.  
  104.    say "YobaCoro: create pool '$options->{desc}' with ".@$args." threads."
  105.       if $options->{debug};
  106.  
  107.    my $pool = async
  108.    {
  109.       my @results;
  110.  
  111.       while(my @args_ = ($options->{limit} > 0 ? splice @$args, 0, $options->{limit} : splice @$args))
  112.       {
  113.          my @coros = map {
  114.             coro($sub, $_, {
  115.                %$options,
  116.                desc  => "$options->{desc} pool",
  117.                ready => 0,
  118.                join  => 0,
  119.             });
  120.          } @args_;
  121.  
  122.          push @results, map { $_->join } map { $_->ready; $_ } @coros;
  123.       }
  124.  
  125.       return @results;
  126.    };
  127.  
  128.    $pool->ready if $options->{ready} || $options->{join};
  129.    return $options->{join} ? $pool->join : $pool;
  130. }
  131.  
  132. 2;
Advertisement
Add Comment
Please, Sign In to add comment