Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public function receiveMessages($callbackFunction)
- {
- if ($this->queue === null) {
- throw new Exception('Błąd przy odbieraniu przychodzącej wiadomości - brak ustawionej kolejki RabbitMQ');
- }
- $this->channel->basic_consume($this->queue, '', false, false, false, false, $callbackFunction);
- while (true) {
- try {
- while (count($this->channel->callbacks)) {
- $this->channel->wait(null, false, 600);
- }
- } catch (Exception $exception) {
- while (true) {
- try {
- if (!($exception instanceof \PhpAmqpLib\Exception\AMQPTimeoutException)) {
- new Exception('Zgubiono połączenie z RabbitMQ, ponowne łączenie');
- }
- $this->reconnect();
- $this->channel->basic_consume($this->queue, '', false, false, false, false, $callbackFunction);
- break;
- } catch (Exception $exception) {
- sleep(10);
- }
- }
- }
- }
- }
- /**
- * Ponawia zerwane połączenie
- */
- public function reconnect()
- {
- try {
- $this->channel->close();
- $this->connection->close();
- } catch (Exception $e) {
- //w kosmos, podłączyć ponownie się trzeba i tak
- }
- $this->connection = new PhpAmqpLib\Connection\AMQPConnection($this->host, 5672, $this->user, $this->password, $this->vhost, false, 'AMQPLAIN', null, 'en_US', 3, 60, null, false, 30);
- $this->channel = $this->connection->channel();
- }
- /**
- * Zamuka istniejące połączenie
- */
- public function closeConnection()
- {
- $this->channel->close();
- $this->connection->close();
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement