View difference between Paste ID: myZyWQ1E and YEe10v04
SHOW: | | - or go back to the newest paste.
1
/*
2
* Workers are fired up
3
* Data is sent
4
* TERMINATE is sent through the control socket
5
*/
6
7
8
/**
9
 *  ##################################################################################
10-
 *       PUBLISHER - dispatches the work load
10+
 *       PRODUCER - dispatches the work load
11
 *  ##################################################################################
12
 */
13
    protected function getControlSocket()
14
    {
15
        if (!$this->controlSocket instanceof \ZMQSocket)
16
        {
17
            $this->controlSocket = $this->zmqContext->getSocket(ZMQ::SOCKET_PUB);
18
            // Bind to the control port - the host will be the same as the producer host
19
            $this->controlSocket->bind('tcp://172.0.0.1:51001);
20
        }
21
        return $this->controlSocket;
22
    }
23
24
25
    public function killWorkers()
26
    {
27
        if ($this->workerState == self::WORKER_STATE_STARTED)
28
        {
29
            echo 'Sending KILL to workers' . PHP_EOL;
30
            $this->getControlSocket()->send(self::WORKER_CONTROL_TERMINATE);
31
32
            $this->workerState = self::WORKER_STATE_KILLED;
33
        }
34
    }
35
36
/**
37
 *  ##################################################################################
38
 *      WORKER
39
 *  ##################################################################################
40
 */
41
42
    public function startWorker($zProgramId, $aProducer_host, $bProducer_port, $cControl_port)
43
    {
44
        set_time_limit(0);
45
46
        $this->initialiseProgramSettings($zProgramId);
47
		// Connect to the control + producer socket (Unique per program)
48
		$control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB);
49
		$control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
50
		$control->connect('tcp://172.0.0.1:51001');
51
52
		$producer = $this->getZmqContext()->getSocket(ZMQ::SOCKET_PULL);
53
		$producer->connect('tcp://' . $aProducer_host . ':' . $bProducer_port);
54
55
		$producerPoll = new ZMQPoll();
56
		$producerPoll->add($producer, ZMQ::POLL_IN);
57
		$read = $write = array();
58
59
		while(true)
60
		{
61
		    try {
62
		        $producerEvents = $producerPoll->poll($read, $write, 5000);
63
		    } catch (\ZMQPollException $e)
64
		    {
65
		        echo $e->getMessage();
66
		    }
67
68
		    // Start scanning for events on the poll
69
		    if($producerEvents)
70
		    {
71
		        $this->processRecords(json_decode($producer->recv(), true));
72
		    } else
73
		    {
74
		        echo 'Checking control socket for messages' . PHP_EOL;
75
    		    if ($ctlMessage = $control->recv(ZMQ::MODE_NOBLOCK))
76
    		    {
77
    		        echo 'Control message received' . PHP_EOL;
78
    		        switch ($ctlMessage)
79
    		        {
80
    		            case System_ProgramDataWorker::WORKER_CONTROL_TERMINATE:
81
    		                echo "Received a termination command on the control socket, closing worker: " . getmypid() . PHP_EOL;
82
    		                exit();
83
    		                break;
84
    		        }
85
    		    } else
86
    		    {
87
    		        echo 'Nothing received' . PHP_EOL;
88
    		    }
89
		    }
90
91
92
		}
93
    }