Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- use MONKEY;
- class ConcurrentChain does Iterable { # perhaps this should be HyperSeq ?
- has $!source; # the source iterator
- has int $!batch; # batch size
- has int $!degree; # degree size (number of workers)
- has str $!method; # name of method: "hyper" or "race"
- has @!actions; # Pairs of actions: Method => Capture
- # Need to special case "map", to handle "for {}", which is code-genned
- # as a .map, but with a :item named parameter specified
- my $map-method := List.^find_method("map");
- method map(ConcurrentChain:D: :$item, |c) {
- if defined($item) {
- Seq.new(self.iterator).map(:$item, |c)
- }
- elsif c.AT-POS(0).has-phaser("LAST") {
- Seq.new(self.iterator).map(|c) # must use serial version
- }
- else { # concurrent, save as action to do
- @!actions.push(Pair.new($map-method,c));
- self
- }
- }
- # Need to special case "grep" in case the block has a LAST phaser.
- my $grep-method := List.^find_method("grep");
- method grep(ConcurrentChain:D: |c) {
- if c.AT-POS(0).has-phaser("LAST") {
- Seq.new(self.iterator).grep(|c) # must use serial version
- }
- else { # concurrent, save as action to do
- @!actions.push(Pair.new($grep-method,c));
- self
- }
- }
- # concurrent methods that should just save action for later
- # BEGIN for <grep> {
- # my $call-method := List.^find_method($_);
- # my $chain-method := method (ConcurrentChain:D: |c) {
- # @!actions.push(Pair.new($call-method,c));
- # self
- # }
- # $chain-method.set_name($_);
- # ::?CLASS.^add_method($_,$chain-method)
- # }
- # serial methods
- BEGIN for <gist keys kv perl repeated squish Str unique values> {
- my $call-method := List.^find_method($_);
- my $chain-method := method (ConcurrentChain:D: |c) {
- self.iterator.push-all(my $buffer := IterationBuffer.CREATE);
- $call-method(
- nqp::p6bindattrinvres(nqp::create(List),List,'$!reified',$buffer)
- )
- }
- $chain-method.set_name($_);
- ::?CLASS.^add_method($_,$chain-method)
- }
- method !SET-SELF(\source, \batch, \degree, $method) {
- $!source := source;
- $!batch = batch;
- $!degree = degree;
- $!method = $method;
- self
- }
- method new(\source, \batch, \degree, $method) {
- batch <= 0
- ?? X::Invalid::Value.new(
- :$method, :name<batch>, :value(batch)).throw
- !! degree <= 0
- ?? X::Invalid::Value.new(
- :$method,:name<degree>,:value(degree)).throw
- !! self.CREATE!SET-SELF(source, batch, degree, $method)
- }
- # The serial iterator producing the results
- method iterator() {
- @!actions # we can run actions
- ?? $!method eq 'hyper'
- ?? Rakudo::Iterator.HyperActions( # we want "hyper"
- $!source,@!actions,$!batch,$!degree)
- !! Rakudo::Iterator.RaceActions( # we want "race"
- $!source,@!actions,$!batch,$!degree)
- !! $!source # sadly, nothing to do
- }
- # Make sure we will always run the iterator
- method sink() { self.iterator.sink-all }
- }
- # Since we cannot augment a role, specifically the Iterable role, we do
- # an augment on Seq. For now, we must therefore do a .Seq on any iterable
- # before we can hyper / race it.
- augment class Seq {
- # Cannot call it hyper
- method hijper(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) {
- ConcurrentChain.new(self.iterator,$batch,$degree,"hyper")
- }
- # Cannot call it race
- method rees(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) {
- ConcurrentChain.new(self.iterator,$batch,$degree,"race")
- }
- }
- augment class Rakudo::Iterator {
- my $empty := IterationBuffer.CREATE;
- # Returns a sort of hyper-iterator that can be called from different
- # threads at the same, and which produces Pairs with an ordinal number
- # and an IterationBuffer filled with as many elements that could be
- # fetched from the given source iterator and the given batch size.
- # If the source iterator is exhausted, it will keep returning
- # IterationEnd no matter how many times .pull-buffer is called.
- method ConcurrentBatcher(\iterator, Int:D $batch) {
- class {
- has $!iterator; # the source iterator
- has $!lock; # making sure one worker runs this code
- has int $!ordinal; # the key of the Pair we produce
- has int $!batch; # 0 indicates we're exhausted
- method !SET-SELF(\iterator, \batch) {
- $!iterator := iterator;
- $!lock := Lock.new;
- $!ordinal = -1;
- $!batch = batch;
- self
- }
- method new(\it,\ba) { self.CREATE!SET-SELF(it,ba) }
- method pull-buffer() {
- nqp::if(
- $!batch,
- nqp::stmts( # we're still in business
- (my $buffer := IterationBuffer.CREATE),
- (my $iterator := $!iterator),
- (my int $batch = $!batch),
- (my int $found = -1),
- $!lock.protect({
- nqp::stmts( # we haz the source iterator
- nqp::until(
- nqp::iseq_i(
- ($found = nqp::add_i($found,1)),
- $batch
- ) || nqp::eqaddr(
- (my $pulled := $iterator.pull-one),
- IterationEnd
- ),
- nqp::bindpos($buffer,$found,$pulled)
- ),
- nqp::if( # source iterator exhausted
- nqp::islt_i($found,$batch),
- nqp::stmts( # only produce what we found
- nqp::splice(
- $buffer,
- $empty,
- $found,
- nqp::sub_i($batch,$found)
- ),
- ($!batch = 0) # tell the world we're done
- )
- ),
- nqp::if(
- $found,
- Pair.new( # produce a batch
- ($!ordinal = nqp::add_i($!ordinal,1)),
- $buffer
- ),
- IterationEnd # we're done here
- )
- )
- })
- ),
- IterationEnd # we were already done
- )
- }
- }.new(iterator,$batch)
- }
- # This role provides the basis for concurrent processing of an
- # iterator. Its .new method expects a source iterator, a list of
- # actions to be performed (consisting of method => capture pairs),
- # on each value obtained from the source iterator, the maximum size
- # of the buffer to be processed inside a single worker, and the
- # degree (aka number of workers). Classes must provide at least a
- # pull-one and a !TWEAK method (which is expected to return self).
- #
- # Please note that although all sorts of parallel processing happens
- # inside the classes that do this role, the classes are expose
- # themselves as ordinary Iterator classes to the world.
- role ConcurrentActionator does Iterator {
- has $!queue;
- has @!promises;
- has Int $!alive; # alas, cannot be a native int :-(
- # We use a concurrent blocking queue (the work horse of a
- # Channel, but without any frills here). It is save to
- # nqp::push() on it from any thread, and nqp::shift() will
- # block until a value becomes available. Since we know the
- # number of workers, and each worker will send an IterationEnd
- # at the end, by counting the number of IterationEnd's seen,
- # we know when we should stop nqp::shift()ing from the queue.
- my class Queue is repr('ConcBlockingQueue') { }
- method !TWEAK { ... }
- method !SET-SELF(\source, \actions, \batch, \degree) {
- # Create two lists: one with methods to call, and one
- # with captures to apply, so we can easily index into
- # what we need to do.
- my int $todo = actions.elems;
- my $methods := nqp::setelems(nqp::list,$todo);
- my $captures := nqp::setelems(nqp::list,$todo);
- for actions.kv -> $w, $action {
- my $m := $action.key;
- nqp::istype($m,Method)
- ?? nqp::bindpos($methods,$w,$m)
- !! nqp::bindpos($methods,$w,List.^find_method($m.Str));
- nqp::bindpos($captures,$w,$action.value);
- }
- # Set up the batcher and the queue.
- $!alive = degree;
- $!queue := Queue.CREATE;
- my $batcher := Rakudo::Iterator.ConcurrentBatcher(source,batch);
- # Set up the promises of the workers. Not sure we would
- # actually ever need that, but maybe we can do something
- # with reducing operations in the future, where we would
- # need the result. Or we can have the worker just return
- # the number of elements processed, or other statistics.
- for ^degree -> $w {
- @!promises.BIND-POS($w,
- start {
- CATCH { .say } # not sure what to do about exceptions
- # Ensure we *always* queue an IterationEnd for this
- # worker when we're done, normally or exceptionally.
- LEAVE nqp::push($!queue,IterationEnd);
- # A dummy HLL list to be passed as a "self", with
- # its $!reified changed for each call.
- my $List := List.CREATE;
- # While we have batches
- nqp::until(
- nqp::eqaddr(
- (my $enroute := $batcher.pull-buffer),
- IterationEnd
- ),
- # For all actions to perform
- nqp::stmts(
- (my int $i = -1),
- nqp::while(
- nqp::islt_i(($i = nqp::add_i($i,1)),$todo),
- nqp::stmts(
- # Transplant batch into List
- nqp::bindattr($List,List,'$!reified',
- nqp::getattr($enroute,Pair,'$!value')
- ),
- # Run the iterator on the List for this action
- nqp::atpos($methods,$i)(
- $List,|nqp::atpos($captures,$i)
- ).iterator.push-all(
- nqp::bindattr($enroute,Pair,'$!value',
- IterationBuffer.CREATE
- )
- )
- )
- )
- ),
- # Queue the result of all actions
- nqp::push($!queue,$enroute)
- )
- }
- )
- }
- # The instantiated object
- self
- }
- method new(\source, \actions, \batch, \degree) {
- self.CREATE!SET-SELF(source,actions,batch,degree)!TWEAK
- }
- method sink-all(--> IterationEnd) is raw {
- # Just eat the queue, we don't care about order in any way
- nqp::while(
- $!alive,
- nqp::stmts(
- nqp::until(
- nqp::eqaddr(nqp::shift($!queue),IterationEnd),
- nqp::null
- ),
- nqp::unless(
- --$!alive,
- self!cleanup,
- )
- )
- )
- }
- # Handle the promises, indicate we're done
- method !cleanup(--> IterationEnd) {
- .result for @!promises
- }
- }
- # The "hyper" case of the ConcurrentActionator role.
- method HyperActions(\source,\actions,\batch,\degree) {
- class :: does ConcurrentActionator {
- has $!slipped; # current list of values to produce
- has $!processed; # list of processed chunks
- has int $!offset; # ordinal number of chunk at index 0
- method !TWEAK() {
- $!slipped := $empty;
- $!processed := nqp::list;
- self
- }
- method pull-one() is raw {
- nqp::if(
- nqp::elems($!slipped),
- nqp::shift($!slipped), # produce from the chunk
- nqp::if(
- $!alive,
- nqp::stmts(
- nqp::if( # no chunk to produce from
- nqp::existspos($!processed,0),
- nqp::stmts( # next chunk is available
- ($!offset = nqp::add_i($!offset,1)),
- ($!slipped := nqp::shift($!processed))
- ),
- nqp::if( # next chunk not there
- nqp::eqaddr(
- (my $chunk := nqp::shift($!queue)),
- IterationEnd
- ),
- nqp::if( # a worker has expired
- --$!alive,
- self.pull-one, # others not, try again
- self!cleanup, # mohican time, bye bye
- ),
- nqp::if( # a fresh chunk
- nqp::iseq_i(
- $!offset,
- (my int $ordinal = $chunk.key)
- ),
- nqp::stmts( # in sequence chunk
- ($!offset = nqp::add_i($!offset,1)),
- nqp::if( # lose placeholder if any
- nqp::elems($!processed),
- nqp::shift($!processed)
- ),
- ($!slipped := $chunk.value)
- ),
- nqp::stmts( # out of sequence
- nqp::bindpos( # store for later usage
- $!processed,
- nqp::sub_i($ordinal,$!offset),
- $chunk.value
- ),
- )
- )
- )
- ),
- self.pull-one # rinse and repeat
- ),
- IterationEnd
- )
- )
- }
- method push-all($target --> IterationEnd) {
- nqp::stmts(
- nqp::while( # produce from available
- nqp::elems($!slipped),
- $target.push(nqp::shift($!slipped))
- ),
- nqp::while( # do the other chunks
- $!alive,
- nqp::if(
- nqp::existspos($!processed,0),
- nqp::stmts( # next chunk is available
- ($!offset = nqp::add_i($!offset,1)),
- (my $slipped := nqp::shift($!processed)),
- nqp::while(
- nqp::elems($slipped),
- $target.push(nqp::shift($slipped))
- )
- ),
- nqp::if( # next chunk not there
- nqp::eqaddr(
- (my $chunk := nqp::shift($!queue)),
- IterationEnd
- ),
- nqp::unless( # worker expired
- --$!alive,
- self!cleanup # mohican time, bye bye
- ),
- nqp::bindpos( # out of sequence
- $!processed, # store for later usage
- nqp::sub_i($chunk.key,$!offset),
- $chunk.value
- )
- )
- )
- )
- )
- }
- }.new(source, actions, batch, degree)
- }
- # The "race" case of the ConcurrentActionator role.
- method RaceActions(\source,\actions,\batch,\degree) {
- class :: does ConcurrentActionator {
- has $!slipped;
- method !TWEAK() {
- $!slipped := $empty;
- self
- }
- method pull-one() is raw {
- nqp::if(
- nqp::elems($!slipped),
- nqp::shift($!slipped), # produce from available
- nqp::if( # no values available
- nqp::eqaddr(
- (my $chunk := nqp::shift($!queue)),
- IterationEnd
- ),
- nqp::if( # worker exhausted
- --$!alive,
- self.pull-one, # but still others
- self!cleanup, # mohican time, bye bye
- ),
- nqp::stmts(
- ($!slipped := $chunk.value), # could be empty
- self.pull-one # so try again
- )
- )
- )
- }
- method push-all($target --> IterationEnd) {
- nqp::stmts(
- nqp::while( # produce from available
- nqp::elems($!slipped),
- $target.push(nqp::shift($!slipped))
- ),
- nqp::while( # do the other chunks
- $!alive,
- nqp::until( # still in business
- nqp::eqaddr(
- (my $chunk := nqp::shift($!queue)),
- IterationEnd
- ),
- nqp::stmts( # we have a chunk
- (my $slipped := $chunk.value),
- nqp::while( # push the chunk
- nqp::elems($slipped),
- $target.push(nqp::shift($slipped))
- )
- )
- ),
- nqp::unless(
- --$!alive,
- self!cleanup, # mohican time, bye bye
- )
- )
- )
- }
- }.new(source, actions, batch, degree)
- }
- }
- my @a = ^106;
- my $now = now;
- dd @a.Seq.hijper(:10batch).map({ sleep rand / 1000; $_++ }).grep({ $_ %% 2 });
- say "parallel processed in {now - $now}";
- $now = now;
- dd @a.map( { sleep rand / 1000; $_++ } ).grep: { $_ %% 2 };
- say "serial processed in {now - $now}";
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement