Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- <?php
- namespace Adapters;
- use Vendor\PhpAmqpLib\Connection\AMQPStreamConnection;
- use Vendor\PhpAmqpLib\Message\AMQPMessage;
- class RabbitAdapter
- {
- /**
- * @var \Vendor\PhpAmqpLib\Channel\AMQPChannel
- */
- protected $channel;
- /**
- * @var AMQPStreamConnection
- */
- protected $connection;
- /**
- * RabbitAdapter constructor.
- * @throws \Fabrikant\Exception
- */
- public function __construct()
- {
- $this->connection = $this->getConnection();
- $this->channel = $this->connection->channel();
- }
- /**
- * @return RabbitAdapter
- * @throws \Fabrikant\Exception
- */
- public static function get()
- {
- return new self();
- }
- /**
- * @return \Vendor\PhpAmqpLib\Channel\AMQPChannel
- */
- public function getChannel()
- {
- return $this->channel;
- }
- /**
- * @param $queue
- * @param bool $passive
- * @param bool $durable
- * @param bool $exclusive
- * @param bool $autoDelete
- * @param bool $nowait
- * @param array $arguments
- * @param null $ticket
- */
- public function declareQueue
- (
- $queue,
- $passive = false,
- $durable = true,
- $exclusive = false,
- $autoDelete = false,
- $nowait = false,
- $arguments = array(),
- $ticket = null
- )
- {
- $this->channel->queue_declare($queue, $passive, $durable, $exclusive, $autoDelete, $nowait, $arguments, $ticket);
- }
- /**
- * @param $exchange
- * @param string $type
- * @param bool $passive
- * @param bool $durable
- * @param bool $autoDelete
- * @param bool $internal
- * @param bool $nowait
- * @param array $arguments
- * @param null $ticket
- */
- public function declareExchange(
- $exchange,
- $type = 'direct',
- $passive = false,
- $durable = true,
- $autoDelete = false,
- $internal = false,
- $nowait = false,
- $arguments = array(),
- $ticket = null
- )
- {
- $this->channel->exchange_declare(
- $exchange,
- $type,
- $passive,
- $durable,
- $autoDelete,
- $internal,
- $nowait,
- $arguments,
- $ticket
- );
- }
- /**
- * @param Message $message
- * @param string $exchange
- * @param string $routing_key
- * @param bool $mandatory
- * @param bool $immediate
- * @param null $ticket
- */
- public function pushMessage(AMQPMessage $message, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null)
- {
- $this->channel->basic_publish($message, $exchange, $routing_key, $mandatory, $immediate, $ticket);
- }
- /**
- * @param $queue
- * @param $exchange
- * @param null $routingKey
- */
- public function queueBind($queue, $exchange, $routingKey = null)
- {
- !is_null($routingKey) || $routingKey = $queue;
- $this->channel->queue_bind(
- $queue,
- $exchange,
- $routingKey
- );
- }
- /**
- * Удаляет очередь
- * @param string $queue
- * @param bool $if_unused
- * @param bool $if_empty
- * @param bool $nowait
- * @param null $ticket
- *
- * @return mixed|null
- */
- public function deleteQueue($queue = '', $if_unused = false, $if_empty = false, $nowait = false, $ticket = null)
- {
- return $this->channel->queue_delete($queue, $if_unused, $if_empty, $nowait, $ticket);
- }
- /**
- * Удаляет обменник
- * @param $exchange
- * @param bool $if_unused
- * @param bool $nowait
- * @param null $ticket
- *
- * @return mixed|null
- */
- public function deleteExchange($exchange, $if_unused = false, $nowait = false, $ticket = null)
- {
- return $this->channel->exchange_delete($exchange, $if_unused, $nowait, $ticket);
- }
- /**
- * @return AMQPStreamConnection
- * @throws \Fabrikant\Exception
- */
- protected function getConnection()
- {
- $config = [];//your config
- $connection = new AMQPStreamConnection(
- $config['host'],
- $config['port'],
- $config['login'],
- $config['password'],
- $config['vhost']
- );
- return $connection;
- }
- /**
- * Destructor
- * close connections
- */
- public function __destruct()
- {
- $this->channel->close();
- $this->connection->close();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement