Advertisement
Guest User

Untitled

a guest
Nov 22nd, 2014
168
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.89 KB | None | 0 0
  1. <?php namespace robbmj;
  2.  
  3. /* The MIT License (MIT) */
  4.  
  5. interface ChildWorker {
  6. /**
  7. * produce must return a string
  8. */
  9. function produce();
  10. }
  11.  
  12. interface ParentWorker {
  13. /**
  14. * $input will always be a string
  15. */
  16. function consume($input);
  17. }
  18.  
  19. class SocketPair {
  20. private $clientSock, $serverSock, $createTime;
  21.  
  22. function __construct($clientSock, $serverSock, $createTime = null) {
  23. $this->clientSock = $clientSock;
  24. $this->serverSock = $serverSock;
  25. $this->createTime = isset($createTime) ? $createTime : time();
  26. }
  27.  
  28. function passedAllotedTime($allotedTime) {
  29. return $this->createTime + $allotedTime <= time();
  30. }
  31.  
  32. function clientSock() {
  33. return $this->clientSock;
  34. }
  35.  
  36. function serverSock() {
  37. return $this->serverSock;
  38. }
  39.  
  40. function closeClient() {
  41. socket_close($this->clientSock);
  42. }
  43.  
  44. function closeServer() {
  45. socket_close($this->serverSock);
  46. }
  47. }
  48.  
  49. class IPC {
  50. // indented properly in the file, just trying to avoid horizontal scroll
  51. private $pWorker, $cWorkers, $maxChildren, $maxWaitTime;
  52.  
  53. function __construct(ParentWorker $pWorker, array /* ChildWorker */ $cWorkers) {
  54. $this->pWorker = $pWorker;
  55. $this->cWorkers = $cWorkers;
  56. $this->maxChildren = 0;
  57. $this->maxWaitTime = 0;
  58. }
  59.  
  60. /**
  61. * Sets the maximum number of Child Processes that can be running at any one time.
  62. * If set to 0, There is no limit.
  63. *
  64. * If $max is not a integer of is less than 0 an InvalidArgumentException is thrown
  65. */
  66. public function maxChildren($max) {
  67. if (!is_int($max) || $max < 0) {
  68. throw new InvalidArgumentException("max must be greater than or equal to 0");
  69. }
  70. $this->maxChildren = $max;
  71. return $this;
  72. }
  73.  
  74. /**
  75. * Sets the maximum amount of time a child process can run for before the process is terminated.
  76. * If set to 0, There is no limit.
  77. *
  78. * If $seconds is not a integer of is less than 0 an InvalidArgumentException is thrown
  79. */
  80. public function maxWaitTime($seconds) {
  81. if (!is_int($seconds) || $seconds < 0) {
  82. throw new InvalidArgumentException("seconds must be greater than or equal to 0");
  83. }
  84. $this->maxWaitTime = $seconds;
  85. return $this;
  86. }
  87.  
  88. function start() {
  89. $pids = array();
  90. $sockets = array();
  91.  
  92. foreach ($this->cWorkers as $i => $cWorker) {
  93.  
  94. $socketPair = $this->makeSocketPair();
  95. if (!$socketPair) {
  96. continue;
  97. }
  98.  
  99. $pid = pcntl_fork();
  100. if ($pid === 0) {
  101. $this->childProcess($socketPair, $cWorker);
  102. exit(0);
  103. }
  104. else if ($pid > 0) {
  105. $sockets[$pid] = $socketPair;
  106. if ($this->maxChildren > 0 && (count($sockets) >= $this->maxChildren)) {
  107. $this->reduceProcessCount($sockets, $this->maxChildren - 1);
  108. }
  109. }
  110. }
  111. $this->reduceProcessCount($sockets, 0);
  112. }
  113.  
  114. protected function childProcess(SocketPair $socketPair, ChildWorker $cWorker) {
  115. $socketPair->closeClient();
  116. $output = $cWorker->produce();
  117. socket_set_nonblock($socketPair->serverSock());
  118. $output = ($output) ? trim($output) : '';
  119. while ((strlen($output) > 0) && ($wrote = socket_write($socketPair->serverSock(), $output))) {
  120. $output = substr($output, $wrote);
  121. }
  122. $socketPair->closeServer();
  123. }
  124.  
  125. protected function parentProcess(SocketPair $socketPair) {
  126. $socketPair->closeServer();
  127. $content = '';
  128. while ($line = socket_read($socketPair->clientSock(), 1129)) {
  129. $len = strlen($content);
  130. $content .= $line;
  131. }
  132. $socketPair->closeClient();
  133. $this->pWorker->consume($content);
  134. }
  135.  
  136. protected function makeSocketPair() {
  137. $pair = array();
  138. if (socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $pair) === false) {
  139. // TODO: install a logger, echoing is not cool
  140. echo "socket_create_pair failed. Reason: " . socket_strerror(socket_last_error());
  141. return null;
  142. }
  143. return new SocketPair($pair[0], $pair[1]);
  144. }
  145.  
  146. protected function reduceProcessCount(array &$sockets, $to) {
  147. while (count($sockets) > $to) {
  148. $pid = pcntl_wait($status, WNOHANG);
  149. if ($pid > 0) {
  150. $this->parentProcess($sockets[$pid]);
  151. unset($sockets[$pid]);
  152. }
  153. else {
  154. $this->killExpiredProcesses($sockets);
  155. var_dump(count($sockets));
  156. usleep(200000);
  157. }
  158. }
  159. }
  160.  
  161. protected function killExpiredProcesses(array &$sockets) {
  162. if ($this->maxWaitTime) {
  163. foreach ($sockets as $pid => $pair) {
  164. if ($pair->passedAllotedTime($this->maxWaitTime)) {
  165. $pair->closeServer();
  166. $pair->closeClient();
  167. unset($sockets[$pid]);
  168. // TODO: install a logger, echoing is not cool
  169. echo "PID: $pid took to longn";
  170. posix_kill($pid, SIGINT);
  171. }
  172. }
  173. }
  174. }} // closes the class
  175.  
  176. <?php
  177.  
  178. require_once 'ipc.php';
  179.  
  180. class CurlWorker implements robbmjChildWorker {
  181. private $url;
  182. public function __construct($url) {
  183. $this->url = $url;
  184. }
  185. public function produce() {
  186. $ch = curl_init($this->url);
  187. curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, 0);
  188. curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, 0);
  189. curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
  190. $content = curl_exec($ch);
  191. curl_close($ch);
  192. return $content;
  193. }
  194. }
  195.  
  196. class PWorker implements robbmjParentWorker {
  197. private $content = array();
  198. public function consume($input) {
  199. $this->content[] = $input;
  200. }
  201. public function getContent() {
  202. foreach ($this->content as $value) {
  203. $r[] = strlen($value);
  204. }
  205. return $r;
  206. }
  207. }
  208.  
  209. $cWorkers = [new CurlWorker('https://www.google.ca/'), new CurlWorker('http://php.net/')];
  210. $p = new PWorker();
  211. $ipc = (new robbmjIPC($p, $cWorkers))
  212. ->maxWaitTime(0)
  213. ->maxChildren(1);
  214.  
  215. $ipc->start();
  216.  
  217. var_dump($p->getContent());
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement