Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- $channel = null;
- return $this->client
- ->connect()
- ->then(
- function(Client $client) {
- return $client
- ->channel();
- })
- ->then(
- function(Channel $receivedChannel) use (&$channel)
- {
- $channel = $receivedChannel;
- return $receivedChannel
- ->qos(
- $this->configuration->getQosConfig()->get('pre_fetch_size'),
- $this->configuration->getQosConfig()->get('pre_fetch_count'),
- $this->configuration->getQosConfig()->get('global')
- );
- }
- )
- ->then(
- function() use ($clients, $entryPointName, $clients, $channel)
- {
- return $channel
- /** Application main exchange */
- ->exchangeDeclare($entryPointName, self::EXCHANGE_TYPE_DIRECT);
- /** Events exchanges */
- }
- )->then(
- function() use ($channel, $entryPointName)
- {
- return $channel
- ->exchangeDeclare(
- \sprintf('%s.events', $entryPointName),
- self::EXCHANGE_TYPE_DIRECT
- );
- }
- )
- /** Messages (internal usage) queue */
- ->then(
- function() use ($channel, $entryPointName)
- {
- return $channel->queueDeclare(
- \sprintf('%s.messages', $entryPointName),
- false, true
- );
- }
- )
- /** Configure routing keys for clients */
- ->then(
- function(MethodQueueDeclareOkFrame $frame) use (
- $channel, $clients, $entryPointName
- )
- {
- $promises = \array_map(
- function($routingKey) use ($frame, $channel, $entryPointName)
- {
- return $channel->queueBind(
- $frame->queue,
- $entryPointName,
- $routingKey
- );
- },
- $clients
- );
- return \React\Promise\all($promises)
- ->then(
- function() use ($frame)
- {
- return $frame;
- }
- );
- }
- )
- ->then(
- function(MethodQueueDeclareOkFrame $frame) use ($channel)
- {
- $this->logger->info('RabbitMQ subscription started');
- return RabbitMqChannelData::create($channel, $frame->queue);
- },
- $this->onFailedCallable
- );
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement