Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php
- $num_workers = 5;
- $pid = 1;
- for($i = 0; $i < $num_workers; $i++) {
- $pid = pcntl_fork();
- if($pid == 0) {
- break;
- }
- }
- if($pid == 0) {
- work($i);
- } else {
- serve($num_workers);
- }
- function work($id) {
- $context = new ZMQContext();
- $workpipe = new ZMQSocket($context, ZMQ::SOCKET_PULL);
- $workpipe->connect("ipc://work.ipc");
- $poll = new ZMQPoll();
- $poll->add($workpipe, ZMQ::POLL_IN);
- $controlpipe = new ZMQSocket($context, ZMQ::SOCKET_SUB);
- $controlpipe->connect("ipc://control.ipc");
- $controlpipe->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");
- $controlpipe->setSockOpt(ZMQ::SOCKOPT_IDENTITY, "worker" . $id);
- $controlpoll = new ZMQPoll();
- $controlpoll->add($controlpipe, ZMQ::POLL_IN);
- $syncpipe = new ZMQSOcket($context, ZMQ::SOCKET_PUSH);
- $syncpipe->connect("ipc://sync.ipc");
- $syncpipe->send($id);
- $readable = array();
- $worked = 0;
- while(true) {
- $events = $poll->poll($readable, null, 5000);
- if($events > 0) {
- foreach($readable as $r) {
- $message = $r->recv();
- //echo "Worker $id got message " . $message . "\n";
- usleep(250);
- $worked++;
- }
- } else {
- $events = $controlpoll->poll($readable, null, 100);
- if($events > 0) {
- $message = $readable[0]->recv();
- echo "$id Ending!\n";
- break;
- }
- }
- }
- echo "Worker $id worked $worked times \n";
- }
- function serve($workers) {
- $context = new ZMQContext();
- $controlpipe = new ZMQSocket($context, ZMQ::SOCKET_PUB);
- $controlpipe->bind("ipc://control.ipc");
- $workpipe = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
- $workpipe->bind("ipc://work.ipc");
- // Sync up with workers
- $syncpipe = new ZMQSocket($context, ZMQ::SOCKET_PULL);
- $syncpipe->bind("ipc://sync.ipc");
- $found = 0;
- $poll = new ZMQPoll();
- $poll->add($syncpipe, ZMQ::POLL_IN);
- $readable = array();
- while(true) {
- $found += $poll->poll($readable, null);;
- foreach($readable as $r) {
- $message = $r->recv();
- echo "Hello $message!\n";
- }
- if($found == $workers) {
- echo "All workers checked in\n";
- break;
- }
- }
- foreach(range(1, 10000) as $message) {
- //echo "Sending $message\n";
- $workpipe->send($message);
- }
- echo "Sending END\n";
- $controlpipe->send("END");
- }
Add Comment
Please, Sign In to add comment