/*
* Workers are fired up
* Data is sent
* TERMINATE is sent through the control socket
*/
/**
* ##################################################################################
* PRODUCER - dispatches the work load
* ##################################################################################
*/
protected function getControlSocket()
{
if (!$this->controlSocket instanceof \ZMQSocket)
{
$this->controlSocket = $this->zmqContext->getSocket(ZMQ::SOCKET_PUB);
// Bind to the control port - the host will be the same as the producer host
$this->controlSocket->bind('tcp://172.0.0.1:51001);
}
return $this->controlSocket;
}
public function killWorkers()
{
if ($this->workerState == self::WORKER_STATE_STARTED)
{
echo 'Sending KILL to workers' . PHP_EOL;
$this->getControlSocket()->send(self::WORKER_CONTROL_TERMINATE);
$this->workerState = self::WORKER_STATE_KILLED;
}
}
/**
* ##################################################################################
* WORKER
* ##################################################################################
*/
public function startWorker($zProgramId, $aProducer_host, $bProducer_port, $cControl_port)
{
set_time_limit(0);
$this->initialiseProgramSettings($zProgramId);
// Connect to the control + producer socket (Unique per program)
$control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB);
$control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
$control->connect('tcp://172.0.0.1:51001');
$producer = $this->getZmqContext()->getSocket(ZMQ::SOCKET_PULL);
$producer->connect('tcp://' . $aProducer_host . ':' . $bProducer_port);
$producerPoll = new ZMQPoll();
$producerPoll->add($producer, ZMQ::POLL_IN);
$read = $write = array();
while(true)
{
try {
$producerEvents = $producerPoll->poll($read, $write, 5000);
} catch (\ZMQPollException $e)
{
echo $e->getMessage();
}
// Start scanning for events on the poll
if($producerEvents)
{
$this->processRecords(json_decode($producer->recv(), true));
} else
{
echo 'Checking control socket for messages' . PHP_EOL;
if ($ctlMessage = $control->recv(ZMQ::MODE_NOBLOCK))
{
echo 'Control message received' . PHP_EOL;
switch ($ctlMessage)
{
case System_ProgramDataWorker::WORKER_CONTROL_TERMINATE:
echo "Received a termination command on the control socket, closing worker: " . getmypid() . PHP_EOL;
exit();
break;
}
} else
{
echo 'Nothing received' . PHP_EOL;
}
}
}
}