Guest User

Untitled

a guest
Oct 11th, 2018
116
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.31 KB | None | 0 0
  1. using RabbitMQ.Client;
  2. using RabbitMQ.Client.Events;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.IO;
  6. using System.Linq;
  7. using System.Runtime.Serialization;
  8. using System.Runtime.Serialization.Formatters.Binary;
  9. using System.Text;
  10.  
  11. namespace Forte
  12. {
  13. public class RabbitMQConfiguration
  14. {
  15. public string UserName { get; set; }
  16. public string Password { get; set; }
  17. public string HostName { get; set; }
  18. public string VirtualHostName { get; set; }
  19. public int[] BackOffs { get; set; }
  20. }
  21.  
  22. public interface IProcessQueue<T> : IDisposable
  23. {
  24. void Enqueue(T content);
  25. void StartListener(Func<T, bool> func);
  26. void StopListener();
  27. }
  28.  
  29. public class ProcessQueue<T> : IProcessQueue<T>
  30. {
  31. private const string DeadLetterExchangeVariableName = "x-dead-letter-exchange";
  32. private const string MessageTTLVariableName = "x-message-ttl";
  33. private const string DeathVariableName = "x-death";
  34.  
  35. protected IConnection _connection;
  36. protected IModel _model;
  37. protected readonly ConnectionFactory connectionFactory;
  38. protected readonly Encoding encoding = Encoding.UTF8;
  39. protected string consumerTag;
  40. protected IFormatter formatter = new BinaryFormatter();
  41.  
  42. public string ExchangeName { get; }
  43. public string QueueName { get; }
  44. protected int[] BackOffs { get; }
  45.  
  46. public ProcessQueue(RabbitMQConfiguration configuration) : this(
  47. typeof(ProcessQueue<>).FullName.Replace("`1", string.Empty),
  48. typeof(T).FullName,
  49. configuration)
  50. { }
  51.  
  52. public ProcessQueue(string exchangeName, string queueName, RabbitMQConfiguration configuration)
  53. {
  54. ExchangeName = exchangeName;
  55. QueueName = queueName;
  56. BackOffs = configuration.BackOffs ?? new int[0];
  57.  
  58. connectionFactory = new ConnectionFactory
  59. {
  60. UserName = configuration.UserName,
  61. Password = configuration.Password,
  62. VirtualHost = configuration.VirtualHostName,
  63. HostName = configuration.HostName
  64. };
  65.  
  66. EnsureQueuesAndExchangesAreSetup();
  67. }
  68.  
  69. protected IConnection Connection
  70. {
  71. get
  72. {
  73. if (_connection == null) { _connection = connectionFactory.CreateConnection(); }
  74. return _connection;
  75. }
  76. }
  77.  
  78. protected IModel Model
  79. {
  80. get
  81. {
  82. if (_model == null) { _model = Connection.CreateModel(); }
  83. return _model;
  84. }
  85. }
  86.  
  87. protected IBasicProperties CreatePersistentTextProperties()
  88. {
  89. var props = Model.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2;
  90. return props;
  91. }
  92.  
  93. protected void CreateBackOffQueuesAndExchanges()
  94. {
  95. for (var index = 1; index < BackOffs.Length + 1; index++)
  96. {
  97. var currentBackOff = BackOffs[index - 1];
  98. var backOffExchangeName = $"{ExchangeName}.backoff.{index}";
  99. var backOffQueueName = $"{QueueName}/ttl={currentBackOff}";
  100.  
  101. Model.ExchangeDeclare(backOffExchangeName, ExchangeType.Direct, true, false);
  102. Model.QueueDeclare(backOffQueueName, true, false, false, new Dictionary<string, object> {
  103. { DeadLetterExchangeVariableName, ExchangeName },
  104. { MessageTTLVariableName, currentBackOff } });
  105. Model.QueueBind(backOffQueueName, backOffExchangeName, QueueName, null);
  106. }
  107. }
  108.  
  109. protected void CreateDeadLetterQueueAndExchange()
  110. {
  111. if (BackOffs.Any())
  112. {
  113. var exchangeName = $"{ExchangeName}.dead";
  114. var queueName = $"{QueueName}/dead";
  115.  
  116. Model.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false);
  117. Model.QueueDeclare(queueName, true, false, false, null);
  118. Model.QueueBind(queueName, exchangeName, QueueName, null);
  119. }
  120. }
  121.  
  122. protected void EnsureQueuesAndExchangesAreSetup()
  123. {
  124. Model.ExchangeDeclare(ExchangeName, ExchangeType.Direct, true, false);
  125. Model.QueueDeclare(QueueName, true, false, false, null);
  126. Model.QueueBind(QueueName, ExchangeName, QueueName, null);
  127. CreateBackOffQueuesAndExchanges();
  128. CreateDeadLetterQueueAndExchange();
  129. }
  130.  
  131. public void Enqueue(T content)
  132. {
  133. var props = CreatePersistentTextProperties();
  134. using (var memoryStream = new MemoryStream())
  135. {
  136. formatter.Serialize(memoryStream, content);
  137. Model.BasicPublish(ExchangeName, QueueName, props, memoryStream.ToArray());
  138. }
  139. }
  140.  
  141. public void StartListener(Func<T, bool> func)
  142. {
  143. var consumer = new EventingBasicConsumer(Model);
  144. consumer.Received += (ch, ea) =>
  145. {
  146. T model;
  147.  
  148. using (MemoryStream memoryStream = new MemoryStream(ea.Body))
  149. {
  150. model = (T)formatter.Deserialize(memoryStream);
  151. }
  152.  
  153. var isSuccessful = func?.Invoke(model) ?? false;
  154. MessageProcessed(ea, isSuccessful);
  155. };
  156. consumerTag = Model.BasicConsume(QueueName, false, consumer);
  157. }
  158.  
  159. public void StopListener()
  160. {
  161. Model.BasicCancel(consumerTag);
  162. consumerTag = null;
  163. }
  164.  
  165. protected void MessageProcessed(BasicDeliverEventArgs args, bool isSuccessful)
  166. {
  167. if (isSuccessful)
  168. {
  169. Model.BasicAck(args.DeliveryTag, false);
  170. }
  171. else
  172. {
  173. MessageFailed(args);
  174. }
  175. }
  176.  
  177. protected void MessageFailed(BasicDeliverEventArgs args)
  178. {
  179. var failedCount = 1;
  180. if (args?.BasicProperties?.Headers != null && args.BasicProperties.Headers.ContainsKey(DeathVariableName))
  181. {
  182. var death = (List<object>)args.BasicProperties.Headers[DeathVariableName];
  183. if (death != null) { failedCount += death.Count; }
  184. }
  185.  
  186. if (failedCount > BackOffs.Length)
  187. {
  188. MessageDead(args);
  189. }
  190. else
  191. {
  192. var exchangeName = $"{ExchangeName}.backoff.{failedCount}";
  193. Model.BasicPublish(exchangeName, args.RoutingKey, args.BasicProperties, args.Body);
  194. Model.BasicAck(args.DeliveryTag, false);
  195. }
  196. }
  197.  
  198. protected void MessageDead(BasicDeliverEventArgs args)
  199. {
  200. var exchangeName = $"{ExchangeName}.dead";
  201. var props = args.BasicProperties;
  202. Model.BasicPublish(exchangeName, args.RoutingKey, props, args.Body);
  203. Model.BasicAck(args.DeliveryTag, false);
  204. }
  205.  
  206. public void Dispose()
  207. {
  208. if (_model != null && consumerTag != null)
  209. {
  210. StopListener();
  211. }
  212.  
  213. if (_connection != null && _connection.IsOpen) { _connection.Close(); }
  214. _model?.Dispose();
  215. _connection?.Dispose();
  216. }
  217. }
  218. }
Add Comment
Please, Sign In to add comment