Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Options;
- using Newtonsoft.Json;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- namespace SampleApp
- {
- public interface IMessageQueueGateway
- {
- Task Process<T>(Messages.Channel channel, Func<T, Task> handle, Action<Exception, string> error, CancellationTokenSource cancel);
- }
- public class RabbitMessageQueue : IMessageQueueGateway
- {
- private readonly ILogger<RabbitMessageQueue> _logger;
- private readonly IConnectionFactory _connectionFactory;
- public RabbitMessageQueue(
- ILogger<RabbitMessageQueue> logger,
- IOptions<Configuration.RabbitMQ> configuration
- )
- {
- _logger = logger;
- _logger.LogDebug($"Connecting to RabbitMQ instance at {configuration.Value.Host}");
- _connectionFactory = new ConnectionFactory
- {
- HostName = configuration.Value.Host,
- UserName = configuration.Value.UserName,
- Password = configuration.Value.Password
- };
- }
- public Task Process<T>(Messages.Channel channel, Func<T, Task> handle, Action<Exception, string> error, CancellationTokenSource cancel)
- {
- return Task.Factory.StartNew(() =>
- {
- using (var connection = _connectionFactory.CreateConnection())
- using (var model = connection.CreateModel())
- {
- model.QueueDeclare(queue: channel.Queue,
- durable: channel.Durable,
- exclusive: channel.Exclusive,
- autoDelete: channel.AutoDelete,
- arguments: null);
- model.BasicQos(0, 1, false);
- var consumer = new EventingBasicConsumer(model);
- consumer.Received += async (obj, args) =>
- {
- var json = Encoding.UTF8.GetString(args.Body);
- var message = JsonConvert.DeserializeObject<T>(json);
- try
- {
- await handle(message);
- model.BasicAck(deliveryTag: args.DeliveryTag, multiple: false);
- }
- catch (System.Exception ex)
- {
- error(ex, json);
- }
- };
- var consumerTag = model.BasicConsume(queue: channel.Queue, autoAck: false, consumer: consumer);
- _logger.LogDebug($"Consuming messages from {channel.Queue}");
- while (true)
- {
- if (cancel.Token.IsCancellationRequested)
- {
- break;
- }
- Thread.Sleep(1000);
- }
- _logger.LogDebug($"Canceling subscription to {channel.Queue}");
- model.BasicCancel(consumerTag);
- }
- }, cancel.Token);
- }
- }
- }
Add Comment
Please, Sign In to add comment