Advertisement
Guest User

Untitled

a guest
Feb 23rd, 2019
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.49 KB | None | 0 0
  1. <?php
  2.  
  3. namespace Adapters;
  4.  
  5. use Vendor\PhpAmqpLib\Connection\AMQPStreamConnection;
  6. use Vendor\PhpAmqpLib\Message\AMQPMessage;
  7.  
  8. class RabbitAdapter
  9. {
  10.  
  11. /**
  12. * @var \Vendor\PhpAmqpLib\Channel\AMQPChannel
  13. */
  14. protected $channel;
  15.  
  16. /**
  17. * @var AMQPStreamConnection
  18. */
  19. protected $connection;
  20.  
  21. /**
  22. * RabbitAdapter constructor.
  23. * @throws \Fabrikant\Exception
  24. */
  25. public function __construct()
  26. {
  27. $this->connection = $this->getConnection();
  28. $this->channel = $this->connection->channel();
  29. }
  30.  
  31. /**
  32. * @return RabbitAdapter
  33. * @throws \Fabrikant\Exception
  34. */
  35. public static function get()
  36. {
  37. return new self();
  38. }
  39.  
  40. /**
  41. * @return \Vendor\PhpAmqpLib\Channel\AMQPChannel
  42. */
  43. public function getChannel()
  44. {
  45. return $this->channel;
  46. }
  47.  
  48. /**
  49. * @param $queue
  50. * @param bool $passive
  51. * @param bool $durable
  52. * @param bool $exclusive
  53. * @param bool $autoDelete
  54. * @param bool $nowait
  55. * @param array $arguments
  56. * @param null $ticket
  57. */
  58. public function declareQueue
  59. (
  60. $queue,
  61. $passive = false,
  62. $durable = true,
  63. $exclusive = false,
  64. $autoDelete = false,
  65. $nowait = false,
  66. $arguments = array(),
  67. $ticket = null
  68. )
  69. {
  70. $this->channel->queue_declare($queue, $passive, $durable, $exclusive, $autoDelete, $nowait, $arguments, $ticket);
  71. }
  72.  
  73. /**
  74. * @param $exchange
  75. * @param string $type
  76. * @param bool $passive
  77. * @param bool $durable
  78. * @param bool $autoDelete
  79. * @param bool $internal
  80. * @param bool $nowait
  81. * @param array $arguments
  82. * @param null $ticket
  83. */
  84. public function declareExchange(
  85. $exchange,
  86. $type = 'direct',
  87. $passive = false,
  88. $durable = true,
  89. $autoDelete = false,
  90. $internal = false,
  91. $nowait = false,
  92. $arguments = array(),
  93. $ticket = null
  94. )
  95. {
  96. $this->channel->exchange_declare(
  97. $exchange,
  98. $type,
  99. $passive,
  100. $durable,
  101. $autoDelete,
  102. $internal,
  103. $nowait,
  104. $arguments,
  105. $ticket
  106. );
  107. }
  108.  
  109. /**
  110. * @param Message $message
  111. * @param string $exchange
  112. * @param string $routing_key
  113. * @param bool $mandatory
  114. * @param bool $immediate
  115. * @param null $ticket
  116. */
  117. public function pushMessage(AMQPMessage $message, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null)
  118. {
  119. $this->channel->basic_publish($message, $exchange, $routing_key, $mandatory, $immediate, $ticket);
  120. }
  121.  
  122. /**
  123. * @param $queue
  124. * @param $exchange
  125. * @param null $routingKey
  126. */
  127. public function queueBind($queue, $exchange, $routingKey = null)
  128. {
  129. !is_null($routingKey) || $routingKey = $queue;
  130.  
  131. $this->channel->queue_bind(
  132. $queue,
  133. $exchange,
  134. $routingKey
  135. );
  136. }
  137.  
  138. /**
  139. * Удаляет очередь
  140. * @param string $queue
  141. * @param bool $if_unused
  142. * @param bool $if_empty
  143. * @param bool $nowait
  144. * @param null $ticket
  145. *
  146. * @return mixed|null
  147. */
  148. public function deleteQueue($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null)
  149. {
  150. return $this->channel->queue_delete($queue, $if_unused, $if_empty, $nowait, $ticket);
  151. }
  152.  
  153. /**
  154. * Удаляет обменник
  155. * @param $exchange
  156. * @param bool $if_unused
  157. * @param bool $nowait
  158. * @param null $ticket
  159. *
  160. * @return mixed|null
  161. */
  162. public function deleteExchange($exchange, $if_unused = false, $nowait = false, $ticket = null)
  163. {
  164. return $this->channel->exchange_delete($exchange, $if_unused, $nowait, $ticket);
  165. }
  166.  
  167. /**
  168. * @return AMQPStreamConnection
  169. * @throws \Fabrikant\Exception
  170. */
  171. protected function getConnection()
  172. {
  173. $config = [];//your config
  174.  
  175. $connection = new AMQPStreamConnection(
  176. $config['host'],
  177. $config['port'],
  178. $config['login'],
  179. $config['password'],
  180. $config['vhost']
  181. );
  182.  
  183. return $connection;
  184. }
  185.  
  186. /**
  187. * Destructor
  188. * close connections
  189. */
  190. public function __destruct()
  191. {
  192. $this->channel->close();
  193. $this->connection->close();
  194. }
  195.  
  196. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement