Pastebin launched a little side project called VERYVIRAL.com, check it out ;-) Want more features on Pastebin? Sign Up, it's FREE!
Guest

Untitled

By: a guest on Dec 6th, 2012  |  syntax: PHP  |  size: 2.26 KB  |  views: 74  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1.     public function startWorker($zProgramId, $aProducer_host, $bProducer_port, $cControl_port)
  2.     {
  3.         set_time_limit(0);
  4.  
  5.         $this->initialiseProgramSettings($zProgramId);
  6.                 // Connect to the control + producer socket (Unique per program)
  7.                 $control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB);
  8.                 //$control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
  9.                 echo 'Connecting worker ' . getmypid() . ' to control socket at tcp://' . $aProducer_host . ':' . $cControl_port . PHP_EOL;
  10.                 $control->connect('tcp://' . $aProducer_host . ':' . $cControl_port);
  11.         echo 'Connected' . PHP_EOL;
  12.                 $producer = $this->getZmqContext()->getSocket(ZMQ::SOCKET_PULL);
  13.                 echo 'Connecting worker ' . getmypid() . ' to producer socket at tcp://' . $aProducer_host . ':' . $bProducer_port . PHP_EOL;
  14.                 $producer->connect('tcp://' . $aProducer_host . ':' . $bProducer_port);
  15.                 echo 'Connected' . PHP_EOL;
  16.  
  17.                 $poll = new ZMQPoll();
  18.                 $poll->add($producer, ZMQ::POLL_IN);
  19.                 $poll->add($control, ZMQ::POLL_IN);
  20.                 $read = $write = array();
  21.  
  22.                 while(true)
  23.                 {
  24.                     try {
  25.                         // Don't set the timeout to -1, this will hang and prevent a lookup on the control port
  26.                         $event = $poll->poll($read, $write, 5000);
  27.                     } catch (\ZMQPollException $e)
  28.                     {
  29.                         echo $e->getMessage();
  30.                     }
  31.  
  32.                     // Start scanning for events on the poll
  33.                     if($event)
  34.                     {
  35.                         foreach ($read as $read_socket)
  36.                         {
  37.                     if ($read_socket === $producer)
  38.                         {
  39.                             echo 'It happened on the producer socket' . PHP_EOL;
  40.                             $this->processRecords(json_decode($producer->recv(), true));
  41.                         }
  42.  
  43.                         if ($read_socket === $control)
  44.                         {
  45.                             $ctlMessage = $control->recv(ZMQ::MODE_NOBLOCK);
  46.                             echo 'Control message received' . PHP_EOL;
  47.                                 switch ($ctlMessage)
  48.                                 {
  49.                                     case System_ProgramDataWorker::WORKER_CONTROL_TERMINATE:
  50.                                         echo "Received a termination command on the control socket, closing worker: " . getmypid() . PHP_EOL;
  51.                                         exit();
  52.                                         break;
  53.                                 }
  54.                         }
  55.                         }
  56.  
  57.                     }
  58.  
  59.                 }
  60.     }