Advertisement
Guest User

Untitled

a guest
Dec 6th, 2012
307
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 2.26 KB | None | 0 0
  1.     public function startWorker($zProgramId, $aProducer_host, $bProducer_port, $cControl_port)
  2.     {
  3.         set_time_limit(0);
  4.  
  5.         $this->initialiseProgramSettings($zProgramId);
  6.         // Connect to the control + producer socket (Unique per program)
  7.         $control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB);
  8.         //$control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
  9.         echo 'Connecting worker ' . getmypid() . ' to control socket at tcp://' . $aProducer_host . ':' . $cControl_port . PHP_EOL;
  10.         $control->connect('tcp://' . $aProducer_host . ':' . $cControl_port);
  11.         echo 'Connected' . PHP_EOL;
  12.         $producer = $this->getZmqContext()->getSocket(ZMQ::SOCKET_PULL);
  13.         echo 'Connecting worker ' . getmypid() . ' to producer socket at tcp://' . $aProducer_host . ':' . $bProducer_port . PHP_EOL;
  14.         $producer->connect('tcp://' . $aProducer_host . ':' . $bProducer_port);
  15.         echo 'Connected' . PHP_EOL;
  16.  
  17.         $poll = new ZMQPoll();
  18.         $poll->add($producer, ZMQ::POLL_IN);
  19.         $poll->add($control, ZMQ::POLL_IN);
  20.         $read = $write = array();
  21.  
  22.         while(true)
  23.         {
  24.             try {
  25.                 // Don't set the timeout to -1, this will hang and prevent a lookup on the control port
  26.                 $event = $poll->poll($read, $write, 5000);
  27.             } catch (\ZMQPollException $e)
  28.             {
  29.                 echo $e->getMessage();
  30.             }
  31.  
  32.             // Start scanning for events on the poll
  33.             if($event)
  34.             {
  35.                 foreach ($read as $read_socket)
  36.                 {
  37.                     if ($read_socket === $producer)
  38.                     {
  39.                         echo 'It happened on the producer socket' . PHP_EOL;
  40.                         $this->processRecords(json_decode($producer->recv(), true));
  41.                     }
  42.  
  43.                     if ($read_socket === $control)
  44.                     {
  45.                         $ctlMessage = $control->recv(ZMQ::MODE_NOBLOCK);
  46.                         echo 'Control message received' . PHP_EOL;
  47.                         switch ($ctlMessage)
  48.                         {
  49.                             case System_ProgramDataWorker::WORKER_CONTROL_TERMINATE:
  50.                                 echo "Received a termination command on the control socket, closing worker: " . getmypid() . PHP_EOL;
  51.                                 exit();
  52.                                 break;
  53.                         }
  54.                     }
  55.                 }
  56.  
  57.             }
  58.  
  59.         }
  60.     }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement