/* * Workers are fired up * Data is sent * TERMINATE is sent through the control socket */ /** * ################################################################################## * PRODUCER - dispatches the work load * ################################################################################## */ protected function getControlSocket() { if (!$this->controlSocket instanceof \ZMQSocket) { $this->controlSocket = $this->zmqContext->getSocket(ZMQ::SOCKET_PUB); // Bind to the control port - the host will be the same as the producer host $this->controlSocket->bind('tcp://172.0.0.1:51001); } return $this->controlSocket; } public function killWorkers() { if ($this->workerState == self::WORKER_STATE_STARTED) { echo 'Sending KILL to workers' . PHP_EOL; $this->getControlSocket()->send(self::WORKER_CONTROL_TERMINATE); $this->workerState = self::WORKER_STATE_KILLED; } } /** * ################################################################################## * WORKER * ################################################################################## */ public function startWorker($zProgramId, $aProducer_host, $bProducer_port, $cControl_port) { set_time_limit(0); $this->initialiseProgramSettings($zProgramId); // Connect to the control + producer socket (Unique per program) $control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB); $control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, ''); $control->connect('tcp://172.0.0.1:51001'); $producer = $this->getZmqContext()->getSocket(ZMQ::SOCKET_PULL); $producer->connect('tcp://' . $aProducer_host . ':' . $bProducer_port); $producerPoll = new ZMQPoll(); $producerPoll->add($producer, ZMQ::POLL_IN); $read = $write = array(); while(true) { try { $producerEvents = $producerPoll->poll($read, $write, 5000); } catch (\ZMQPollException $e) { echo $e->getMessage(); } // Start scanning for events on the poll if($producerEvents) { $this->processRecords(json_decode($producer->recv(), true)); } else { echo 'Checking control socket for messages' . PHP_EOL; if ($ctlMessage = $control->recv(ZMQ::MODE_NOBLOCK)) { echo 'Control message received' . PHP_EOL; switch ($ctlMessage) { case System_ProgramDataWorker::WORKER_CONTROL_TERMINATE: echo "Received a termination command on the control socket, closing worker: " . getmypid() . PHP_EOL; exit(); break; } } else { echo 'Nothing received' . PHP_EOL; } } } }