SHOW:
|
|
- or go back to the newest paste.
1 | /* | |
2 | * Workers are fired up | |
3 | * Data is sent | |
4 | * TERMINATE is sent through the control socket | |
5 | */ | |
6 | ||
7 | ||
8 | /** | |
9 | * ################################################################################## | |
10 | - | * PUBLISHER - dispatches the work load |
10 | + | * PRODUCER - dispatches the work load |
11 | * ################################################################################## | |
12 | */ | |
13 | protected function getControlSocket() | |
14 | { | |
15 | if (!$this->controlSocket instanceof \ZMQSocket) | |
16 | { | |
17 | $this->controlSocket = $this->zmqContext->getSocket(ZMQ::SOCKET_PUB); | |
18 | // Bind to the control port - the host will be the same as the producer host | |
19 | $this->controlSocket->bind('tcp://172.0.0.1:51001); | |
20 | } | |
21 | return $this->controlSocket; | |
22 | } | |
23 | ||
24 | ||
25 | public function killWorkers() | |
26 | { | |
27 | if ($this->workerState == self::WORKER_STATE_STARTED) | |
28 | { | |
29 | echo 'Sending KILL to workers' . PHP_EOL; | |
30 | $this->getControlSocket()->send(self::WORKER_CONTROL_TERMINATE); | |
31 | ||
32 | $this->workerState = self::WORKER_STATE_KILLED; | |
33 | } | |
34 | } | |
35 | ||
36 | /** | |
37 | * ################################################################################## | |
38 | * WORKER | |
39 | * ################################################################################## | |
40 | */ | |
41 | ||
42 | public function startWorker($zProgramId, $aProducer_host, $bProducer_port, $cControl_port) | |
43 | { | |
44 | set_time_limit(0); | |
45 | ||
46 | $this->initialiseProgramSettings($zProgramId); | |
47 | // Connect to the control + producer socket (Unique per program) | |
48 | $control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB); | |
49 | $control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, ''); | |
50 | $control->connect('tcp://172.0.0.1:51001'); | |
51 | ||
52 | $producer = $this->getZmqContext()->getSocket(ZMQ::SOCKET_PULL); | |
53 | $producer->connect('tcp://' . $aProducer_host . ':' . $bProducer_port); | |
54 | ||
55 | $producerPoll = new ZMQPoll(); | |
56 | $producerPoll->add($producer, ZMQ::POLL_IN); | |
57 | $read = $write = array(); | |
58 | ||
59 | while(true) | |
60 | { | |
61 | try { | |
62 | $producerEvents = $producerPoll->poll($read, $write, 5000); | |
63 | } catch (\ZMQPollException $e) | |
64 | { | |
65 | echo $e->getMessage(); | |
66 | } | |
67 | ||
68 | // Start scanning for events on the poll | |
69 | if($producerEvents) | |
70 | { | |
71 | $this->processRecords(json_decode($producer->recv(), true)); | |
72 | } else | |
73 | { | |
74 | echo 'Checking control socket for messages' . PHP_EOL; | |
75 | if ($ctlMessage = $control->recv(ZMQ::MODE_NOBLOCK)) | |
76 | { | |
77 | echo 'Control message received' . PHP_EOL; | |
78 | switch ($ctlMessage) | |
79 | { | |
80 | case System_ProgramDataWorker::WORKER_CONTROL_TERMINATE: | |
81 | echo "Received a termination command on the control socket, closing worker: " . getmypid() . PHP_EOL; | |
82 | exit(); | |
83 | break; | |
84 | } | |
85 | } else | |
86 | { | |
87 | echo 'Nothing received' . PHP_EOL; | |
88 | } | |
89 | } | |
90 | ||
91 | ||
92 | } | |
93 | } |