Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env perl6
- ########################################################################
- # housekeeping
- ########################################################################
- use v6.d;
- ########################################################################
- # global values
- ########################################################################
- constant WRITER = </bin/gzip -9v>;
- constant WRITERS = 4;
- constant OUTDIR = './out';
- constant NAME = $*PROGRAM.basename;
- ########################################################################
- # utility subs
- ########################################################################
- sub out-dir
- (
- IO() $dir = './out'
- --> IO
- )
- {
- my $out = $dir.IO.absolute.IO;
- $out ~~ :d
- or
- mkdir $out, 0o770
- or
- die "Failed mkdir: '$out', $!"
- ;
- note "# Out: '$out'";
- $out
- }
- sub out-path
- (
- Stringy $name = NAME
- , *@dir-args
- --> IO
- )
- {
- constant format = '%s-%02x.out.gz';
- my $dir = out-dir |@dir-args;
- my $base = sprintf format, $name, ++$;
- my $path = $dir.add( $base );
- note "# Base: '$base' ($name)";
- $path
- }
- sub out-open
- (
- Stringy $enc = 'ascii'
- , *@path-args
- --> IO::Handle
- )
- {
- my $path = out-path |@path-args;
- my $fh = open $path, :w, :create, :truncate, :enc( $enc )
- or die "Failed open: '$path', $!";
- note "# Open: '$fh'";
- $fh
- }
- ########################################################################
- # set up a list of writers sharing a channel for input.
- ########################################################################
- sub writers
- (
- Channel:D $channel
- , Int() $count = 1
- , *@file-args
- --> Array
- )
- {
- state @writers
- = ( 1 .. $count )
- .map\
- (
- {
- note "# Writer: $_";
- my $stdout = out-open |@file-args;
- my $proc = Proc::Async.new( :w, :enc('ascii'), WRITER );
- $proc.stderr.tap( { note "\nMessage: $^a ($stdout).\n" } );
- $proc.bind-stdout( $stdout );
- $proc.start;
- Promise.start:
- {
- note "# Running: $_";
- my $writes = 0;
- loop
- {
- try
- {
- CATCH
- {
- when X::Channel::ReceiveOnClosed
- {
- note 'Channel closed: ', $stdout.key;
- }
- default
- {
- note "Error: failed recieve/say, $!";
- }
- }
- $proc.say: $channel.receive;
- ++$writes;
- }
- }
- note "# Closing: $_ ($writes written).";
- $proc.close-stdin;
- }
- }
- )
- }
- with Channel.new -> $channel
- {
- my $procs = ( %*ENV<procs> // 1 ).Int;
- my $rows = ( %*ENV<rows> // 1_000 ).Int;
- note "$*PROGRAM procs=$procs rows=$rows";
- my @writers = writers $channel, $procs
- or die 'Failed starting writers.';
- $channel.send( $_ ) for 1 .. $rows;
- $channel.close;
- say '# Promise:', await Promise.allof( @writers );
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement