Guest User

Untitled

a guest
Jul 16th, 2018
109
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.11 KB | None | 0 0
  1. <?php
  2. $num_workers = 5;
  3.  
  4. $pid = 1;
  5. for($i = 0; $i < $num_workers; $i++) {
  6. $pid = pcntl_fork();
  7. if($pid == 0) {
  8. break;
  9. }
  10. }
  11.  
  12. if($pid == 0) {
  13. work($i);
  14. } else {
  15. serve($num_workers);
  16. }
  17.  
  18. function work($id) {
  19. $context = new ZMQContext();
  20. $workpipe = new ZMQSocket($context, ZMQ::SOCKET_PULL);
  21. $workpipe->connect("ipc://work.ipc");
  22. $poll = new ZMQPoll();
  23. $poll->add($workpipe, ZMQ::POLL_IN);
  24.  
  25. $controlpipe = new ZMQSocket($context, ZMQ::SOCKET_SUB);
  26. $controlpipe->connect("ipc://control.ipc");
  27. $controlpipe->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");
  28. $controlpipe->setSockOpt(ZMQ::SOCKOPT_IDENTITY, "worker" . $id);
  29. $controlpoll = new ZMQPoll();
  30. $controlpoll->add($controlpipe, ZMQ::POLL_IN);
  31.  
  32. $syncpipe = new ZMQSOcket($context, ZMQ::SOCKET_PUSH);
  33. $syncpipe->connect("ipc://sync.ipc");
  34. $syncpipe->send($id);
  35.  
  36. $readable = array();
  37. $worked = 0;
  38. while(true) {
  39. $events = $poll->poll($readable, null, 5000);
  40. if($events > 0) {
  41. foreach($readable as $r) {
  42. $message = $r->recv();
  43. //echo "Worker $id got message " . $message . "\n";
  44. usleep(250);
  45. $worked++;
  46. }
  47. } else {
  48. $events = $controlpoll->poll($readable, null, 100);
  49. if($events > 0) {
  50. $message = $readable[0]->recv();
  51. echo "$id Ending!\n";
  52. break;
  53. }
  54. }
  55. }
  56. echo "Worker $id worked $worked times \n";
  57. }
  58.  
  59. function serve($workers) {
  60. $context = new ZMQContext();
  61. $controlpipe = new ZMQSocket($context, ZMQ::SOCKET_PUB);
  62. $controlpipe->bind("ipc://control.ipc");
  63. $workpipe = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
  64. $workpipe->bind("ipc://work.ipc");
  65.  
  66. // Sync up with workers
  67. $syncpipe = new ZMQSocket($context, ZMQ::SOCKET_PULL);
  68. $syncpipe->bind("ipc://sync.ipc");
  69. $found = 0;
  70. $poll = new ZMQPoll();
  71. $poll->add($syncpipe, ZMQ::POLL_IN);
  72. $readable = array();
  73. while(true) {
  74. $found += $poll->poll($readable, null);;
  75. foreach($readable as $r) {
  76. $message = $r->recv();
  77. echo "Hello $message!\n";
  78. }
  79. if($found == $workers) {
  80. echo "All workers checked in\n";
  81. break;
  82. }
  83. }
  84.  
  85. foreach(range(1, 10000) as $message) {
  86. //echo "Sending $message\n";
  87. $workpipe->send($message);
  88. }
  89. echo "Sending END\n";
  90. $controlpipe->send("END");
  91. }
Add Comment
Please, Sign In to add comment