public function startWorker($zProgramId, $aProducer_host, $bProducer_port, $cControl_port) { set_time_limit(0); $this->initialiseProgramSettings($zProgramId); // Connect to the control + producer socket (Unique per program) $control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB); //$control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, ''); echo 'Connecting worker ' . getmypid() . ' to control socket at tcp://' . $aProducer_host . ':' . $cControl_port . PHP_EOL; $control->connect('tcp://' . $aProducer_host . ':' . $cControl_port); echo 'Connected' . PHP_EOL; $producer = $this->getZmqContext()->getSocket(ZMQ::SOCKET_PULL); echo 'Connecting worker ' . getmypid() . ' to producer socket at tcp://' . $aProducer_host . ':' . $bProducer_port . PHP_EOL; $producer->connect('tcp://' . $aProducer_host . ':' . $bProducer_port); echo 'Connected' . PHP_EOL; $poll = new ZMQPoll(); $poll->add($producer, ZMQ::POLL_IN); $poll->add($control, ZMQ::POLL_IN); $read = $write = array(); while(true) { try { // Don't set the timeout to -1, this will hang and prevent a lookup on the control port $event = $poll->poll($read, $write, 5000); } catch (\ZMQPollException $e) { echo $e->getMessage(); } // Start scanning for events on the poll if($event) { foreach ($read as $read_socket) { if ($read_socket === $producer) { echo 'It happened on the producer socket' . PHP_EOL; $this->processRecords(json_decode($producer->recv(), true)); } if ($read_socket === $control) { $ctlMessage = $control->recv(ZMQ::MODE_NOBLOCK); echo 'Control message received' . PHP_EOL; switch ($ctlMessage) { case System_ProgramDataWorker::WORKER_CONTROL_TERMINATE: echo "Received a termination command on the control socket, closing worker: " . getmypid() . PHP_EOL; exit(); break; } } } } } }