#!/usr/bin/php
<?php
// parallel task manager
// 2010 daniel souza <thehazard@gmail.com>
error_reporting(E_ALL);
function usage() {
global $argv;
print "\nUSAGE: {$argv[0]} -c <command list> [ -p <processes> ] [ -t <timeout> ] [ -l <logfile> ]\n";
}
function validate_argument_context($context = array(), &$errors) {
$errors = '';
if ((!strlen($context['list'])) || (!is_file($context['list']))) {
$errors .= "ERROR: invalid command list.\n";
}
if ((!is_numeric($context['processes'])) || (!intval($context['processes']))) {
$errors .= "ERROR: invalid process count.\n";
}
if (!is_numeric($context['timeout'])) {
$errors .= "ERROR: invalid timeout value.\n";
}
return empty($errors);
}
function get_argument_context() {
$values = array(
'c' => NULL,
'l' => NULL,
'p' => 3,
't' => 0,
);
foreach (getopt('c:l:p:t:') as $k => $v) {
if ($v) {
$values[$k] = $v;
}
}
$output = array();
foreach (array('c' => 'list', 'l' => 'logfile', 'p' => 'processes', 't' => 'timeout') as $n => $name) {
$output[$name] = $values[$n];
}
return $output;
}
function read_command_list($filename = '') {
$output = array();
foreach (explode("\n", file_get_contents($filename)) as $line) {
if (!$line = trim($line)) {
continue;
}
if (substr($line, 0, 1) == '#') {
continue;
}
$output[] = $line;
}
return $output;
}
function output($text = '') {
global $logfile, $running_process_count, $max_process_limit, $task_total, $task_done;
$line = date('Y-m-d H:i:s') . " (R:{$running_process_count}/{$max_process_limit}) [{$task_done}/{$task_total}] -- " . $text . "\n";
print $line;
if ($logfile) {
fwrite($logfile, $line);
}
}
function run_task($task) {
global $running_process_count, $task_table;
$pid = pcntl_fork();
if ($pid == -1) {
output("failed to fork() for task: {$task}");
return FALSE;
}
if ($pid == 0) {
// child
pcntl_exec('/bin/sh', array('-c', $task), $_ENV);
} else {
// parent
$task_table[$pid] = array(
'task' => $task,
'start' => time(),
);
$running_process_count++;
output("spawned process: {$task} (pid $pid)");
}
}
function getloadavg() {
return (float) array_shift(explode(' ', file_get_contents('/proc/loadavg')));
}
function getmeminfo() {
$output = array();
foreach (file('/proc/meminfo') as $line) {
if (preg_match('/^([A-Za-z _]+):[^0-9]*([0-9]+)/', trim($line), $regs)) {
$output[$regs[1]] = (int) $regs[2];
}
}
return $output;
}
function do_wait($just_wait = FALSE) {
global $running_process_count, $max_process_limit, $task_table, $task_total, $task_done;
if (!$just_wait) {
if ($running_process_count < $max_process_limit) {
// not enough processes running yet, so keep on spawning more
return TRUE;
}
}
// we reached the # of processes, now wait for some to finish
$stat_counter = 0;
while (TRUE) {
$pid = pcntl_wait($status, WNOHANG);
//output("do_wait(): pcntl_wait() returned $pid");
if ($pid == -1) {
//output("do_wait(): pcntl_wait() returned -1.");
return FALSE;
}
if ($pid) {
$running_process_count--;
$task_done++;
$_status = array();
if (pcntl_wifexited($status)) {
$ret = pcntl_wexitstatus($status);
$_status[] = "finished normally with retcode $ret";
} else {
$_status[] = "finished abnormally";
}
if (pcntl_wifsignaled($status)) {
$sig = pcntl_wtermsig($status);
$_status[] = "termination due to signal $sig";
}
$_status = implode(', ', $_status);
if (isset($task_table[$pid])) {
$task_table[$pid]['end'] = time();
$duration = ($task_table[$pid]['end'] - $task_table[$pid]['start']);
output("process {$pid} finished: {$task_table[$pid]['task']} ($duration secs, $_status)");
} else {
output("unknown process {$pid} finished ($_status)");
}
return TRUE;
}
// might check for timeouts here on $task_table
sleep(1);
if ($stat_counter++ == 10) {
$loadavg = getloadavg();
$meminfo = getmeminfo();
$stat_counter = 0;
output("loadavg={$loadavg}, memfree={$meminfo['MemFree']}, swapfree={$meminfo['SwapFree']}");
}
}
}
$logfile = NULL;
$running_process_count = 0;
$context = get_argument_context();
$max_process_limit = intval($context['processes']);
$task_table = array();
if (!validate_argument_context($context, $errors)) {
usage();
print $errors . "\n";
exit(-1);
}
if ($context['logfile']) {
@mkdir(dirname(realpath($context['logfile'])), 0644, true);
if (!$logfile = fopen($context['logfile'], 'w')) {
print "ERROR: error opening logfile: {$context['logfile']}\n";
exit(-1);
}
}
output("taskman started (pid " . posix_getpid() . "): " . implode(' ', $argv));
$tasklist = read_command_list($context['list']);
$task_done = 0;
$task_total = sizeof($tasklist);
if (!$task_total) {
output("no tasks to run.");
exit(-1);
}
output("loaded {$task_total} tasks from {$context['list']}");
while (TRUE) {
$just_wait = (sizeof($tasklist) == 0);
if (do_wait($just_wait)) {
if (!$just_wait && ($task = array_shift($tasklist))) {
run_task($task);
}
} else {
break;
}
}
output("done.");