Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- =head1 NAME
- Parallel::ForkManager - A simple parallel processing fork manager
- =head1 SYNOPSIS
- use Parallel::ForkManager;
- $pm = new Parallel::ForkManager($MAX_PROCESSES);
- foreach $data (@all_data) {
- # Forks and returns the pid for the child:
- my $pid = $pm->start and next;
- ... do some work with $data in the child process ...
- $pm->finish; # Terminates the child process
- }
- =head1 DESCRIPTION
- This module is intended for use in operations that can be done in parallel
- where the number of processes to be forked off should be limited. Typical
- use is a downloader which will be retrieving hundreds/thousands of files.
- The code for a downloader would look something like this:
- use LWP::Simple;
- use Parallel::ForkManager;
- ...
- @links=(
- ["http://www.foo.bar/rulez.data","rulez_data.txt"],
- ["http://new.host/more_data.doc","more_data.doc"],
- ...
- );
- ...
- # Max 30 processes for parallel download
- my $pm = new Parallel::ForkManager(30);
- foreach my $linkarray (@links) {
- $pm->start and next; # do the fork
- my ($link,$fn) = @$linkarray;
- warn "Cannot get $fn from $link"
- if getstore($link,$fn) != RC_OK;
- $pm->finish; # do the exit in the child process
- }
- $pm->wait_all_children;
- First you need to instantiate the ForkManager with the "new" constructor.
- You must specify the maximum number of processes to be created. If you
- specify 0, then NO fork will be done; this is good for debugging purposes.
- Next, use $pm->start to do the fork. $pm returns 0 for the child process,
- and child pid for the parent process (see also L<perlfunc(1p)/fork()>).
- The "and next" skips the internal loop in the parent process. NOTE:
- $pm->start dies if the fork fails.
- $pm->finish terminates the child process (assuming a fork was done in the
- "start").
- NOTE: You cannot use $pm->start if you are already in the child process.
- If you want to manage another set of subprocesses in the child process,
- you must instantiate another Parallel::ForkManager object!
- =head1 METHODS
- =over 5
- =item new $processes
- Instantiate a new Parallel::ForkManager object. You must specify the maximum
- number of children to fork off. If you specify 0 (zero), then no children
- will be forked. This is intended for debugging purposes.
- =item start [ $process_identifier ]
- This method does the fork. It returns the pid of the child process for
- the parent, and 0 for the child process. If the $processes parameter
- for the constructor is 0 then, assuming you're in the child process,
- $pm->start simply returns 0.
- An optional $process_identifier can be provided to this method... It is used by
- the "run_on_finish" callback (see CALLBACKS) for identifying the finished
- process.
- =item finish [ $exit_code ]
- Closes the child process by exiting and accepts an optional exit code
- (default exit code is 0) which can be retrieved in the parent via callback.
- If you use the program in debug mode ($processes == 0), this method doesn't
- do anything.
- =item set_max_procs $processes
- Allows you to set a new maximum number of children to maintain. Returns
- the previous setting.
- =item wait_all_children
- You can call this method to wait for all the processes which have been
- forked. This is a blocking wait.
- =back
- =head1 CALLBACKS
- You can define callbacks in the code, which are called on events like starting
- a process or upon finish.
- The callbacks can be defined with the following methods:
- =over 4
- =item run_on_finish $code [, $pid ]
- You can define a subroutine which is called when a child is terminated. It is
- called in the parent process.
- The paremeters of the $code are the following:
- - pid of the process, which is terminated
- - exit code of the program
- - identification of the process (if provided in the "start" method)
- - exit signal (0-127: signal name)
- - core dump (1 if there was core dump at exit)
- =item run_on_start $code
- You can define a subroutine which is called when a child is started. It called
- after the successful startup of a child in the parent process.
- The parameters of the $code are the following:
- - pid of the process which has been started
- - identification of the process (if provided in the "start" method)
- =item run_on_wait $code, [$period]
- You can define a subroutine which is called when the child process needs to wait
- for the startup. If $period is not defined, then one call is done per
- child. If $period is defined, then $code is called periodically and the
- module waits for $period seconds betwen the two calls. Note, $period can be
- fractional number also. The exact "$period seconds" is not guarranteed,
- signals can shorten and the process scheduler can make it longer (on busy
- systems).
- The $code called in the "start" and the "wait_all_children" method also.
- No parameters are passed to the $code on the call.
- =back
- =head1 EXAMPLE
- =head2 Parallel get
- This small example can be used to get URLs in parallel.
- use Parallel::ForkManager;
- use LWP::Simple;
- my $pm=new Parallel::ForkManager(10);
- for my $link (@ARGV) {
- $pm->start and next;
- my ($fn)= $link =~ /^.*\/(.*?)$/;
- if (!$fn) {
- warn "Cannot determine filename from $fn\n";
- } else {
- $0.=" ".$fn;
- print "Getting $fn from $link\n";
- my $rc=getstore($link,$fn);
- print "$link downloaded. response code: $rc\n";
- };
- $pm->finish;
- };
- =head2 Callbacks
- Example of a program using callbacks to get child exit codes:
- use strict;
- use Parallel::ForkManager;
- my $max_procs = 5;
- my @names = qw( Fred Jim Lily Steve Jessica Bob Dave Christine Rico Sara );
- # hash to resolve PID's back to child specific information
- my $pm = new Parallel::ForkManager($max_procs);
- # Setup a callback for when a child finishes up so we can
- # get it's exit code
- $pm->run_on_finish(
- sub { my ($pid, $exit_code, $ident) = @_;
- print "** $ident just got out of the pool ".
- "with PID $pid and exit code: $exit_code\n";
- }
- );
- $pm->run_on_start(
- sub { my ($pid,$ident)=@_;
- print "** $ident started, pid: $pid\n";
- }
- );
- $pm->run_on_wait(
- sub {
- print "** Have to wait for one children ...\n"
- },
- 0.5
- );
- foreach my $child ( 0 .. $#names ) {
- my $pid = $pm->start($names[$child]) and next;
- # This code is the child process
- print "This is $names[$child], Child number $child\n";
- sleep ( 2 * $child );
- print "$names[$child], Child $child is about to get out...\n";
- sleep 1;
- $pm->finish($child); # pass an exit code to finish
- }
- print "Waiting for Children...\n";
- $pm->wait_all_children;
- print "Everybody is out of the pool!\n";
- =head1 BUGS AND LIMITATIONS
- Do not use Parallel::ForkManager in an environment, where other child
- processes can affect the run of the main program, so using this module
- is not recommended in an environment where fork() / wait() is already used.
- If you want to use more than one copies of the Parallel::ForkManager, then
- you have to make sure that all children processes are terminated, before you
- use the second object in the main program.
- You are free to use a new copy of Parallel::ForkManager in the child
- processes, although I don't think it makes sense.
- =head1 COPYRIGHT
- Copyright (c) 2000 Szabó, Balázs (dLux)
- All right reserved. This program is free software; you can redistribute it
- and/or modify it under the same terms as Perl itself.
- =head1 AUTHOR
- dLux (Szabó, Balázs) <dlux@kapu.hu>
- =head1 CREDITS
- Noah Robin <sitz@onastick.net> (documentation tweaks)
- Chuck Hirstius <chirstius@megapathdsl.net> (callback exit status, example)
- Grant Hopwood <hopwoodg@valero.com> (win32 port)
- Mark Southern <mark_southern@merck.com> (bugfix)
- =cut
- package Parallel::ForkManager;
- use POSIX ":sys_wait_h";
- use strict;
- use vars qw($VERSION);
- $VERSION='0.7.5';
- sub new { my ($c,$processes)=@_;
- my $h={
- max_proc => $processes,
- processes => {},
- in_child => 0,
- };
- return bless($h,ref($c)||$c);
- };
- sub start { my ($s,$identification)=@_;
- die "Cannot start another process while you are in the child process"
- if $s->{in_child};
- while ($s->{max_proc} && ( keys %{ $s->{processes} } ) >= $s->{max_proc}) {
- $s->on_wait;
- $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
- };
- $s->wait_children;
- if ($s->{max_proc}) {
- my $pid=fork();
- die "Cannot fork: $!" if !defined $pid;
- if ($pid) {
- $s->{processes}->{$pid}=$identification;
- $s->on_start($pid,$identification);
- } else {
- $s->{in_child}=1 if !$pid;
- }
- return $pid;
- } else {
- $s->{processes}->{$$}=$identification;
- $s->on_start($$,$identification);
- return 0; # Simulating the child which returns 0
- }
- }
- sub finish { my ($s, $x)=@_;
- if ( $s->{in_child} ) {
- exit ($x || 0);
- }
- if ($s->{max_proc} == 0) { # max_proc == 0
- $s->on_finish($$, $x ,$s->{processes}->{$$}, 0, 0);
- delete $s->{processes}->{$$};
- }
- return 0;
- }
- sub wait_children { my ($s)=@_;
- return if !keys %{$s->{processes}};
- my $kid;
- do {
- $kid = $s->wait_one_child(&WNOHANG);
- } while $kid > 0 || $kid < -1; # AS 5.6/Win32 returns negative PIDs
- };
- *wait_childs=*wait_children; # compatibility
- sub wait_one_child { my ($s,$par)=@_;
- my $kid;
- while (1) {
- $kid = $s->_waitpid(-1,$par||=0);
- last if $kid == 0 || $kid == -1; # AS 5.6/Win32 returns negative PIDs
- redo if !exists $s->{processes}->{$kid};
- my $id = delete $s->{processes}->{$kid};
- $s->on_finish( $kid, $? >> 8 , $id, $? & 0x7f, $? & 0x80 ? 1 : 0);
- last;
- }
- $kid;
- };
- sub wait_all_children { my ($s)=@_;
- while (keys %{ $s->{processes} }) {
- $s->on_wait;
- $s->wait_one_child(defined $s->{on_wait_period} ? &WNOHANG : undef);
- };
- }
- *wait_all_childs=*wait_all_children; # compatibility;
- sub run_on_finish { my ($s,$code,$pid)=@_;
- $s->{on_finish}->{$pid || 0}=$code;
- }
- sub on_finish { my ($s,$pid,@par)=@_;
- my $code=$s->{on_finish}->{$pid} || $s->{on_finish}->{0} or return 0;
- $code->($pid,@par);
- };
- sub run_on_wait { my ($s,$code, $period)=@_;
- $s->{on_wait}=$code;
- $s->{on_wait_period} = $period;
- }
- sub on_wait { my ($s)=@_;
- if(ref($s->{on_wait}) eq 'CODE') {
- $s->{on_wait}->();
- if (defined $s->{on_wait_period}) {
- local $SIG{CHLD} = sub { } if ! defined $SIG{CHLD};
- select undef, undef, undef, $s->{on_wait_period}
- };
- };
- };
- sub run_on_start { my ($s,$code)=@_;
- $s->{on_start}=$code;
- }
- sub on_start { my ($s,@par)=@_;
- $s->{on_start}->(@par) if ref($s->{on_start}) eq 'CODE';
- };
- sub set_max_procs { my ($s, $mp)=@_;
- $s->{max_proc} = $mp;
- }
- # OS dependant code follows...
- sub _waitpid { # Call waitpid() in the standard Unix fashion.
- return waitpid($_[1],$_[2]);
- }
- # On ActiveState Perl 5.6/Win32 build 625, waitpid(-1, &WNOHANG) always
- # blocks unless an actual PID other than -1 is given.
- sub _NT_waitpid { my ($s, $pid, $par) = @_;
- if ($par == &WNOHANG) { # Need to nonblock on each of our PIDs in the pool.
- my @pids = keys %{ $s->{processes} };
- # Simulate -1 (no processes awaiting cleanup.)
- return -1 unless scalar(@pids);
- # Check each PID in the pool.
- my $kid;
- foreach $pid (@pids) {
- $kid = waitpid($pid, $par);
- return $kid if $kid != 0; # AS 5.6/Win32 returns negative PIDs.
- }
- return $kid;
- } else { # Normal waitpid() call.
- return waitpid($pid, $par);
- }
- }
- {
- local $^W = 0;
- if ($^O eq 'NT' or $^O eq 'MSWin32') {
- *_waitpid = \&_NT_waitpid;
- }
- }
- 1;
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement