Guest User

Untitled

a guest
Apr 8th, 2018
131
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.27 KB | None | 0 0
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Text;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using Microsoft.Extensions.Logging;
  7. using Microsoft.Extensions.Options;
  8. using Newtonsoft.Json;
  9. using RabbitMQ.Client;
  10. using RabbitMQ.Client.Events;
  11.  
  12. namespace SampleApp
  13. {
  14. public interface IMessageQueueGateway
  15. {
  16. Task Process<T>(Messages.Channel channel, Func<T, Task> handle, Action<Exception, string> error, CancellationTokenSource cancel);
  17. }
  18.  
  19. public class RabbitMessageQueue : IMessageQueueGateway
  20. {
  21. private readonly ILogger<RabbitMessageQueue> _logger;
  22. private readonly IConnectionFactory _connectionFactory;
  23. public RabbitMessageQueue(
  24. ILogger<RabbitMessageQueue> logger,
  25. IOptions<Configuration.RabbitMQ> configuration
  26. )
  27. {
  28. _logger = logger;
  29.  
  30. _logger.LogDebug($"Connecting to RabbitMQ instance at {configuration.Value.Host}");
  31.  
  32. _connectionFactory = new ConnectionFactory
  33. {
  34. HostName = configuration.Value.Host,
  35. UserName = configuration.Value.UserName,
  36. Password = configuration.Value.Password
  37. };
  38. }
  39.  
  40. public Task Process<T>(Messages.Channel channel, Func<T, Task> handle, Action<Exception, string> error, CancellationTokenSource cancel)
  41. {
  42. return Task.Factory.StartNew(() =>
  43. {
  44. using (var connection = _connectionFactory.CreateConnection())
  45. using (var model = connection.CreateModel())
  46. {
  47. model.QueueDeclare(queue: channel.Queue,
  48. durable: channel.Durable,
  49. exclusive: channel.Exclusive,
  50. autoDelete: channel.AutoDelete,
  51. arguments: null);
  52.  
  53. model.BasicQos(0, 1, false);
  54.  
  55. var consumer = new EventingBasicConsumer(model);
  56.  
  57. consumer.Received += async (obj, args) =>
  58. {
  59. var json = Encoding.UTF8.GetString(args.Body);
  60. var message = JsonConvert.DeserializeObject<T>(json);
  61.  
  62. try
  63. {
  64. await handle(message);
  65. model.BasicAck(deliveryTag: args.DeliveryTag, multiple: false);
  66. }
  67. catch (System.Exception ex)
  68. {
  69. error(ex, json);
  70. }
  71. };
  72.  
  73. var consumerTag = model.BasicConsume(queue: channel.Queue, autoAck: false, consumer: consumer);
  74.  
  75. _logger.LogDebug($"Consuming messages from {channel.Queue}");
  76.  
  77. while (true)
  78. {
  79. if (cancel.Token.IsCancellationRequested)
  80. {
  81. break;
  82. }
  83. Thread.Sleep(1000);
  84. }
  85.  
  86. _logger.LogDebug($"Canceling subscription to {channel.Queue}");
  87. model.BasicCancel(consumerTag);
  88. }
  89. }, cancel.Token);
  90. }
  91. }
  92. }
Add Comment
Please, Sign In to add comment