Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php namespace robbmj;
- /* The MIT License (MIT) */
- interface ChildWorker {
- /**
- * produce must return a string
- */
- function produce();
- }
- interface ParentWorker {
- /**
- * $input will always be a string
- */
- function consume($input);
- }
- class SocketPair {
- private $clientSock, $serverSock, $createTime;
- function __construct($clientSock, $serverSock, $createTime = null) {
- $this->clientSock = $clientSock;
- $this->serverSock = $serverSock;
- $this->createTime = isset($createTime) ? $createTime : time();
- }
- function passedAllotedTime($allotedTime) {
- return $this->createTime + $allotedTime <= time();
- }
- function clientSock() {
- return $this->clientSock;
- }
- function serverSock() {
- return $this->serverSock;
- }
- function closeClient() {
- socket_close($this->clientSock);
- }
- function closeServer() {
- socket_close($this->serverSock);
- }
- }
- class IPC {
- // indented properly in the file, just trying to avoid horizontal scroll
- private $pWorker, $cWorkers, $maxChildren, $maxWaitTime;
- function __construct(ParentWorker $pWorker, array /* ChildWorker */ $cWorkers) {
- $this->pWorker = $pWorker;
- $this->cWorkers = $cWorkers;
- $this->maxChildren = 0;
- $this->maxWaitTime = 0;
- }
- /**
- * Sets the maximum number of Child Processes that can be running at any one time.
- * If set to 0, There is no limit.
- *
- * If $max is not a integer of is less than 0 an InvalidArgumentException is thrown
- */
- public function maxChildren($max) {
- if (!is_int($max) || $max < 0) {
- throw new InvalidArgumentException("max must be greater than or equal to 0");
- }
- $this->maxChildren = $max;
- return $this;
- }
- /**
- * Sets the maximum amount of time a child process can run for before the process is terminated.
- * If set to 0, There is no limit.
- *
- * If $seconds is not a integer of is less than 0 an InvalidArgumentException is thrown
- */
- public function maxWaitTime($seconds) {
- if (!is_int($seconds) || $seconds < 0) {
- throw new InvalidArgumentException("seconds must be greater than or equal to 0");
- }
- $this->maxWaitTime = $seconds;
- return $this;
- }
- function start() {
- $pids = array();
- $sockets = array();
- foreach ($this->cWorkers as $i => $cWorker) {
- $socketPair = $this->makeSocketPair();
- if (!$socketPair) {
- continue;
- }
- $pid = pcntl_fork();
- if ($pid === 0) {
- $this->childProcess($socketPair, $cWorker);
- exit(0);
- }
- else if ($pid > 0) {
- $sockets[$pid] = $socketPair;
- if ($this->maxChildren > 0 && (count($sockets) >= $this->maxChildren)) {
- $this->reduceProcessCount($sockets, $this->maxChildren - 1);
- }
- }
- }
- $this->reduceProcessCount($sockets, 0);
- }
- protected function childProcess(SocketPair $socketPair, ChildWorker $cWorker) {
- $socketPair->closeClient();
- $output = $cWorker->produce();
- socket_set_nonblock($socketPair->serverSock());
- $output = ($output) ? trim($output) : '';
- while ((strlen($output) > 0) && ($wrote = socket_write($socketPair->serverSock(), $output))) {
- $output = substr($output, $wrote);
- }
- $socketPair->closeServer();
- }
- protected function parentProcess(SocketPair $socketPair) {
- $socketPair->closeServer();
- $content = '';
- while ($line = socket_read($socketPair->clientSock(), 1129)) {
- $len = strlen($content);
- $content .= $line;
- }
- $socketPair->closeClient();
- $this->pWorker->consume($content);
- }
- protected function makeSocketPair() {
- $pair = array();
- if (socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $pair) === false) {
- // TODO: install a logger, echoing is not cool
- echo "socket_create_pair failed. Reason: " . socket_strerror(socket_last_error());
- return null;
- }
- return new SocketPair($pair[0], $pair[1]);
- }
- protected function reduceProcessCount(array &$sockets, $to) {
- while (count($sockets) > $to) {
- $pid = pcntl_wait($status, WNOHANG);
- if ($pid > 0) {
- $this->parentProcess($sockets[$pid]);
- unset($sockets[$pid]);
- }
- else {
- $this->killExpiredProcesses($sockets);
- var_dump(count($sockets));
- usleep(200000);
- }
- }
- }
- protected function killExpiredProcesses(array &$sockets) {
- if ($this->maxWaitTime) {
- foreach ($sockets as $pid => $pair) {
- if ($pair->passedAllotedTime($this->maxWaitTime)) {
- $pair->closeServer();
- $pair->closeClient();
- unset($sockets[$pid]);
- // TODO: install a logger, echoing is not cool
- echo "PID: $pid took to longn";
- posix_kill($pid, SIGINT);
- }
- }
- }
- }} // closes the class
- <?php
- require_once 'ipc.php';
- class CurlWorker implements robbmjChildWorker {
- private $url;
- public function __construct($url) {
- $this->url = $url;
- }
- public function produce() {
- $ch = curl_init($this->url);
- curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, 0);
- curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, 0);
- curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
- $content = curl_exec($ch);
- curl_close($ch);
- return $content;
- }
- }
- class PWorker implements robbmjParentWorker {
- private $content = array();
- public function consume($input) {
- $this->content[] = $input;
- }
- public function getContent() {
- foreach ($this->content as $value) {
- $r[] = strlen($value);
- }
- return $r;
- }
- }
- $cWorkers = [new CurlWorker('https://www.google.ca/'), new CurlWorker('http://php.net/')];
- $p = new PWorker();
- $ipc = (new robbmjIPC($p, $cWorkers))
- ->maxWaitTime(0)
- ->maxChildren(1);
- $ipc->start();
- var_dump($p->getContent());
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement