Don't like ads? PRO users don't see any ads ;-)
Guest

taskman.php - parallel task manager

By: a guest on May 7th, 2012  |  syntax: PHP  |  size: 5.16 KB  |  hits: 24  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. #!/usr/bin/php
  2. <?php
  3.  
  4. // parallel task manager
  5. // 2010 daniel souza <thehazard@gmail.com>
  6.  
  7. error_reporting(E_ALL);
  8.  
  9. function usage() {
  10.         global $argv;
  11.        
  12.         print "\nUSAGE: {$argv[0]} -c <command list> [ -p <processes> ] [ -t <timeout> ] [ -l <logfile> ]\n";
  13. }
  14.  
  15. function validate_argument_context($context = array(), &$errors) {
  16.         $errors = '';
  17.         if ((!strlen($context['list'])) || (!is_file($context['list']))) {
  18.                 $errors .= "ERROR: invalid command list.\n";
  19.         }
  20.         if ((!is_numeric($context['processes'])) || (!intval($context['processes']))) {
  21.                 $errors .= "ERROR: invalid process count.\n";
  22.         }
  23.         if (!is_numeric($context['timeout'])) {
  24.                 $errors .= "ERROR: invalid timeout value.\n";
  25.         }
  26.         return empty($errors);
  27. }
  28.  
  29. function get_argument_context() {
  30.         $values = array(
  31.                 'c' => NULL,
  32.                 'l' => NULL,
  33.                 'p' => 3,
  34.                 't' => 0,
  35.         );
  36.         foreach (getopt('c:l:p:t:') as $k => $v) {
  37.                 if ($v) {
  38.                         $values[$k] = $v;
  39.                 }
  40.         }
  41.         $output = array();
  42.         foreach (array('c' => 'list', 'l' => 'logfile', 'p' => 'processes', 't' => 'timeout') as $n => $name) {
  43.                 $output[$name] = $values[$n];
  44.         }
  45.         return $output;
  46. }
  47.  
  48. function read_command_list($filename = '') {
  49.         $output = array();
  50.         foreach (explode("\n", file_get_contents($filename)) as $line) {
  51.                 if (!$line = trim($line)) {
  52.                         continue;
  53.                 }
  54.                 if (substr($line, 0, 1) == '#') {
  55.                         continue;
  56.                 }
  57.                 $output[] = $line;
  58.         }
  59.         return $output;
  60. }
  61.  
  62. function output($text = '') {
  63.         global $logfile, $running_process_count, $max_process_limit, $task_total, $task_done;
  64.        
  65.         $line = date('Y-m-d H:i:s') . " (R:{$running_process_count}/{$max_process_limit}) [{$task_done}/{$task_total}] -- " . $text . "\n";
  66.         print $line;
  67.         if ($logfile) {
  68.                 fwrite($logfile, $line);
  69.         }
  70. }
  71.  
  72. function run_task($task) {
  73.         global $running_process_count, $task_table;
  74.        
  75.         $pid = pcntl_fork();
  76.         if ($pid == -1) {
  77.                 output("failed to fork() for task: {$task}");
  78.                 return FALSE;
  79.         }
  80.         if ($pid == 0) {
  81.                 // child
  82.                 pcntl_exec('/bin/sh', array('-c', $task), $_ENV);
  83.         } else {
  84.                 // parent
  85.                 $task_table[$pid] = array(
  86.                         'task' => $task,
  87.                         'start' => time(),
  88.                 );
  89.                 $running_process_count++;
  90.                 output("spawned process: {$task} (pid $pid)");
  91.         }
  92. }
  93.  
  94. function getloadavg() {
  95.         return (float) array_shift(explode(' ', file_get_contents('/proc/loadavg')));
  96. }
  97.  
  98. function getmeminfo() {
  99.         $output = array();
  100.         foreach (file('/proc/meminfo') as $line) {
  101.                 if (preg_match('/^([A-Za-z _]+):[^0-9]*([0-9]+)/', trim($line), $regs)) {
  102.                         $output[$regs[1]] = (int) $regs[2];
  103.                 }
  104.         }
  105.         return $output;
  106. }
  107.  
  108. function do_wait($just_wait = FALSE) {
  109.         global $running_process_count, $max_process_limit, $task_table, $task_total, $task_done;
  110.        
  111.         if (!$just_wait) {
  112.                 if ($running_process_count < $max_process_limit) {
  113.                         // not enough processes running yet, so keep on spawning more
  114.                         return TRUE;
  115.                 }
  116.         }
  117.         // we reached the # of processes, now wait for some to finish
  118.         $stat_counter = 0;
  119.         while (TRUE) {
  120.                 $pid = pcntl_wait($status, WNOHANG);
  121.                 //output("do_wait(): pcntl_wait() returned $pid");
  122.                 if ($pid == -1) {
  123.                         //output("do_wait(): pcntl_wait() returned -1.");
  124.                         return FALSE;
  125.                 }
  126.                 if ($pid) {
  127.                         $running_process_count--;
  128.                         $task_done++;
  129.                        
  130.                         $_status = array();
  131.                         if (pcntl_wifexited($status)) {
  132.                                 $ret = pcntl_wexitstatus($status);
  133.                                 $_status[] = "finished normally with retcode $ret";
  134.                         } else {
  135.                                 $_status[] = "finished abnormally";
  136.                         }
  137.                         if (pcntl_wifsignaled($status)) {
  138.                                 $sig = pcntl_wtermsig($status);
  139.                                 $_status[] = "termination due to signal $sig";
  140.                         }
  141.                         $_status = implode(', ', $_status);
  142.                        
  143.                         if (isset($task_table[$pid])) {
  144.                                 $task_table[$pid]['end'] = time();
  145.                                 $duration = ($task_table[$pid]['end'] - $task_table[$pid]['start']);
  146.                                 output("process {$pid} finished: {$task_table[$pid]['task']} ($duration secs, $_status)");
  147.                         } else {
  148.                                 output("unknown process {$pid} finished ($_status)");
  149.                         }
  150.                         return TRUE;
  151.                 }
  152.                 // might check for timeouts here on $task_table
  153.                 sleep(1);
  154.                 if ($stat_counter++ == 10) {
  155.                         $loadavg = getloadavg();
  156.                         $meminfo = getmeminfo();
  157.                         $stat_counter = 0;
  158.                         output("loadavg={$loadavg}, memfree={$meminfo['MemFree']}, swapfree={$meminfo['SwapFree']}");
  159.                 }
  160.         }
  161. }
  162.  
  163. $logfile = NULL;
  164. $running_process_count = 0;
  165. $context = get_argument_context();
  166. $max_process_limit = intval($context['processes']);
  167. $task_table = array();
  168.  
  169. if (!validate_argument_context($context, $errors)) {
  170.         usage();
  171.         print $errors . "\n";
  172.         exit(-1);
  173. }
  174.  
  175. if ($context['logfile']) {
  176.         @mkdir(dirname(realpath($context['logfile'])), 0644, true);
  177.         if (!$logfile = fopen($context['logfile'], 'w')) {
  178.                 print "ERROR: error opening logfile: {$context['logfile']}\n";
  179.                 exit(-1);
  180.         }      
  181. }
  182.  
  183. output("taskman started (pid " . posix_getpid() . "): " . implode(' ', $argv));
  184.  
  185. $tasklist = read_command_list($context['list']);
  186. $task_done = 0;
  187. $task_total = sizeof($tasklist);
  188. if (!$task_total) {
  189.         output("no tasks to run.");
  190.         exit(-1);
  191. }
  192. output("loaded {$task_total} tasks from {$context['list']}");
  193.  
  194. while (TRUE) {
  195.         $just_wait = (sizeof($tasklist) == 0);
  196.         if (do_wait($just_wait)) {
  197.                 if (!$just_wait && ($task = array_shift($tasklist))) {
  198.                         run_task($task);
  199.                 }
  200.         } else {
  201.                 break;
  202.         }
  203. }
  204.  
  205. output("done.");