Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- * Workers are fired up
- * Data is sent
- * TERMINATE is sent through the control socket
- */
- /**
- * ##################################################################################
- * PRODUCER - dispatches the work load
- * ##################################################################################
- */
- protected function getControlSocket()
- {
- if (!$this->controlSocket instanceof \ZMQSocket)
- {
- $this->controlSocket = $this->zmqContext->getSocket(ZMQ::SOCKET_PUB);
- // Bind to the control port - the host will be the same as the producer host
- $this->controlSocket->bind('tcp://172.0.0.1:51001);
- }
- return $this->controlSocket;
- }
- public function killWorkers()
- {
- if ($this->workerState == self::WORKER_STATE_STARTED)
- {
- echo 'Sending KILL to workers' . PHP_EOL;
- $this->getControlSocket()->send(self::WORKER_CONTROL_TERMINATE);
- $this->workerState = self::WORKER_STATE_KILLED;
- }
- }
- /**
- * ##################################################################################
- * WORKER
- * ##################################################################################
- */
- public function startWorker($zProgramId, $aProducer_host, $bProducer_port, $cControl_port)
- {
- set_time_limit(0);
- $this->initialiseProgramSettings($zProgramId);
- // Connect to the control + producer socket (Unique per program)
- $control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB);
- $control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
- $control->connect('tcp://172.0.0.1:51001');
- $producer = $this->getZmqContext()->getSocket(ZMQ::SOCKET_PULL);
- $producer->connect('tcp://' . $aProducer_host . ':' . $bProducer_port);
- $producerPoll = new ZMQPoll();
- $producerPoll->add($producer, ZMQ::POLL_IN);
- $read = $write = array();
- while(true)
- {
- try {
- $producerEvents = $producerPoll->poll($read, $write, 5000);
- } catch (\ZMQPollException $e)
- {
- echo $e->getMessage();
- }
- // Start scanning for events on the poll
- if($producerEvents)
- {
- $this->processRecords(json_decode($producer->recv(), true));
- } else
- {
- echo 'Checking control socket for messages' . PHP_EOL;
- if ($ctlMessage = $control->recv(ZMQ::MODE_NOBLOCK))
- {
- echo 'Control message received' . PHP_EOL;
- switch ($ctlMessage)
- {
- case System_ProgramDataWorker::WORKER_CONTROL_TERMINATE:
- echo "Received a termination command on the control socket, closing worker: " . getmypid() . PHP_EOL;
- exit();
- break;
- }
- } else
- {
- echo 'Nothing received' . PHP_EOL;
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement