This week only. Pastebin PRO Accounts Christmas Special! Don't miss out!Want more features on Pastebin? Sign Up, it's FREE!
Guest

Untitled

By: a guest on Dec 6th, 2012  |  syntax: PHP  |  size: 2.92 KB  |  views: 101  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
This paste has a previous version, view the difference. Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. /*
  2. * Workers are fired up
  3. * Data is sent
  4. * TERMINATE is sent through the control socket
  5. */
  6.  
  7.  
  8. /**
  9.  *  ##################################################################################
  10.  *       PRODUCER - dispatches the work load
  11.  *  ##################################################################################
  12.  */
  13.     protected function getControlSocket()
  14.     {
  15.         if (!$this->controlSocket instanceof \ZMQSocket)
  16.         {
  17.             $this->controlSocket = $this->zmqContext->getSocket(ZMQ::SOCKET_PUB);
  18.             // Bind to the control port - the host will be the same as the producer host
  19.             $this->controlSocket->bind('tcp://172.0.0.1:51001);
  20.        }
  21.        return $this->controlSocket;
  22.    }
  23.  
  24.  
  25.    public function killWorkers()
  26.    {
  27.        if ($this->workerState == self::WORKER_STATE_STARTED)
  28.        {
  29.            echo 'Sending KILL to workers' . PHP_EOL;
  30.            $this->getControlSocket()->send(self::WORKER_CONTROL_TERMINATE);
  31.  
  32.            $this->workerState = self::WORKER_STATE_KILLED;
  33.        }
  34.    }
  35.  
  36. /**
  37. *  ##################################################################################
  38. *      WORKER
  39. *  ##################################################################################
  40. */
  41.  
  42.    public function startWorker($zProgramId, $aProducer_host, $bProducer_port, $cControl_port)
  43.    {
  44.        set_time_limit(0);
  45.  
  46.        $this->initialiseProgramSettings($zProgramId);
  47.                 // Connect to the control + producer socket (Unique per program)
  48.                 $control = $this->getZmqContext()->getSocket(ZMQ::SOCKET_SUB);
  49.                 $control->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, '');
  50.                 $control->connect('tcp://172.0.0.1:51001');
  51.  
  52.                 $producer = $this->getZmqContext()->getSocket(ZMQ::SOCKET_PULL);
  53.                 $producer->connect('tcp://' . $aProducer_host . ':' . $bProducer_port);
  54.  
  55.                 $producerPoll = new ZMQPoll();
  56.                 $producerPoll->add($producer, ZMQ::POLL_IN);
  57.                 $read = $write = array();
  58.  
  59.                 while(true)
  60.                 {
  61.                     try {
  62.                         $producerEvents = $producerPoll->poll($read, $write, 5000);
  63.                     } catch (\ZMQPollException $e)
  64.                     {
  65.                         echo $e->getMessage();
  66.                     }
  67.  
  68.                     // Start scanning for events on the poll
  69.                     if($producerEvents)
  70.                     {
  71.                         $this->processRecords(json_decode($producer->recv(), true));
  72.                     } else
  73.                     {
  74.                         echo 'Checking control socket for messages' . PHP_EOL;
  75.                     if ($ctlMessage = $control->recv(ZMQ::MODE_NOBLOCK))
  76.                     {
  77.                         echo 'Control message received' . PHP_EOL;
  78.                         switch ($ctlMessage)
  79.                         {
  80.                             case System_ProgramDataWorker::WORKER_CONTROL_TERMINATE:
  81.                                 echo "Received a termination command on the control socket, closing worker: " . getmypid() . PHP_EOL;
  82.                                 exit();
  83.                                 break;
  84.                         }
  85.                     } else
  86.                     {
  87.                         echo 'Nothing received' . PHP_EOL;
  88.                     }
  89.                     }
  90.  
  91.  
  92.                 }
  93.     }
clone this paste RAW Paste Data