Advertisement
Guest User

Untitled

a guest
Dec 9th, 2019
114
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
PHP 1.91 KB | None | 0 0
  1.  public function receiveMessages($callbackFunction)
  2.     {
  3.         if ($this->queue === null) {
  4.             throw new Exception('Błąd przy odbieraniu przychodzącej wiadomości - brak ustawionej kolejki RabbitMQ');
  5.         }
  6.         $this->channel->basic_consume($this->queue, '', false, false, false, false, $callbackFunction);
  7.         while (true) {
  8.             try {
  9.                 while (count($this->channel->callbacks)) {
  10.                     $this->channel->wait(null, false, 600);
  11.                 }
  12.             } catch (Exception $exception) {
  13.                 while (true) {
  14.                     try {
  15.                         if (!($exception instanceof \PhpAmqpLib\Exception\AMQPTimeoutException)) {
  16.                             new Exception('Zgubiono połączenie z RabbitMQ, ponowne łączenie');
  17.                         }
  18.                         $this->reconnect();
  19.                         $this->channel->basic_consume($this->queue, '', false, false, false, false, $callbackFunction);
  20.                         break;
  21.                     } catch (Exception $exception) {
  22.                         sleep(10);
  23.                     }
  24.                 }
  25.             }
  26.         }
  27.     }
  28.  
  29.     /**
  30.      * Ponawia zerwane połączenie
  31.      */
  32.     public function reconnect()
  33.     {
  34.         try {
  35.             $this->channel->close();
  36.             $this->connection->close();
  37.         } catch (Exception $e) {
  38.             //w kosmos, podłączyć ponownie się trzeba i tak
  39.         }
  40.         $this->connection = new PhpAmqpLib\Connection\AMQPConnection($this->host, 5672, $this->user, $this->password, $this->vhost, false, 'AMQPLAIN', null, 'en_US', 3, 60, null, false, 30);
  41.         $this->channel = $this->connection->channel();
  42.     }
  43.  
  44.     /**
  45.      * Zamuka istniejące połączenie
  46.      */
  47.     public function closeConnection()
  48.     {
  49.         $this->channel->close();
  50.         $this->connection->close();
  51.     }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement