public function startWorker($zProgramId, $aProducer_host, $bProducer_port, $cControl_port)
{
set_time_limit(0);
$this->initialiseProgramSettings($zProgramId);
// Connect to the control + producer socket (Unique per program)
$control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB);
//$control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
echo 'Connecting worker ' . getmypid() . ' to control socket at tcp://' . $aProducer_host . ':' . $cControl_port . PHP_EOL;
$control->connect('tcp://' . $aProducer_host . ':' . $cControl_port);
echo 'Connected' . PHP_EOL;
$producer = $this->getZmqContext()->getSocket(ZMQ::SOCKET_PULL);
echo 'Connecting worker ' . getmypid() . ' to producer socket at tcp://' . $aProducer_host . ':' . $bProducer_port . PHP_EOL;
$producer->connect('tcp://' . $aProducer_host . ':' . $bProducer_port);
echo 'Connected' . PHP_EOL;
$poll = new ZMQPoll();
$poll->add($producer, ZMQ::POLL_IN);
$poll->add($control, ZMQ::POLL_IN);
$read = $write = array();
while(true)
{
try {
// Don't set the timeout to -1, this will hang and prevent a lookup on the control port
$event = $poll->poll($read, $write, 5000);
} catch (\ZMQPollException $e)
{
echo $e->getMessage();
}
// Start scanning for events on the poll
if($event)
{
foreach ($read as $read_socket)
{
if ($read_socket === $producer)
{
echo 'It happened on the producer socket' . PHP_EOL;
$this->processRecords(json_decode($producer->recv(), true));
}
if ($read_socket === $control)
{
$ctlMessage = $control->recv(ZMQ::MODE_NOBLOCK);
echo 'Control message received' . PHP_EOL;
switch ($ctlMessage)
{
case System_ProgramDataWorker::WORKER_CONTROL_TERMINATE:
echo "Received a termination command on the control socket, closing worker: " . getmypid() . PHP_EOL;
exit();
break;
}
}
}
}
}
}