Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Linq;
- using System.Runtime.Serialization;
- using System.Runtime.Serialization.Formatters.Binary;
- using System.Text;
- namespace Forte
- {
- public class RabbitMQConfiguration
- {
- public string UserName { get; set; }
- public string Password { get; set; }
- public string HostName { get; set; }
- public string VirtualHostName { get; set; }
- public int[] BackOffs { get; set; }
- }
- public interface IProcessQueue<T> : IDisposable
- {
- void Enqueue(T content);
- void StartListener(Func<T, bool> func);
- void StopListener();
- }
- public class ProcessQueue<T> : IProcessQueue<T>
- {
- private const string DeadLetterExchangeVariableName = "x-dead-letter-exchange";
- private const string MessageTTLVariableName = "x-message-ttl";
- private const string DeathVariableName = "x-death";
- protected IConnection _connection;
- protected IModel _model;
- protected readonly ConnectionFactory connectionFactory;
- protected readonly Encoding encoding = Encoding.UTF8;
- protected string consumerTag;
- protected IFormatter formatter = new BinaryFormatter();
- public string ExchangeName { get; }
- public string QueueName { get; }
- protected int[] BackOffs { get; }
- public ProcessQueue(RabbitMQConfiguration configuration) : this(
- typeof(ProcessQueue<>).FullName.Replace("`1", string.Empty),
- typeof(T).FullName,
- configuration)
- { }
- public ProcessQueue(string exchangeName, string queueName, RabbitMQConfiguration configuration)
- {
- ExchangeName = exchangeName;
- QueueName = queueName;
- BackOffs = configuration.BackOffs ?? new int[0];
- connectionFactory = new ConnectionFactory
- {
- UserName = configuration.UserName,
- Password = configuration.Password,
- VirtualHost = configuration.VirtualHostName,
- HostName = configuration.HostName
- };
- EnsureQueuesAndExchangesAreSetup();
- }
- protected IConnection Connection
- {
- get
- {
- if (_connection == null) { _connection = connectionFactory.CreateConnection(); }
- return _connection;
- }
- }
- protected IModel Model
- {
- get
- {
- if (_model == null) { _model = Connection.CreateModel(); }
- return _model;
- }
- }
- protected IBasicProperties CreatePersistentTextProperties()
- {
- var props = Model.CreateBasicProperties(); props.ContentType = "text/plain"; props.DeliveryMode = 2;
- return props;
- }
- protected void CreateBackOffQueuesAndExchanges()
- {
- for (var index = 1; index < BackOffs.Length + 1; index++)
- {
- var currentBackOff = BackOffs[index - 1];
- var backOffExchangeName = $"{ExchangeName}.backoff.{index}";
- var backOffQueueName = $"{QueueName}/ttl={currentBackOff}";
- Model.ExchangeDeclare(backOffExchangeName, ExchangeType.Direct, true, false);
- Model.QueueDeclare(backOffQueueName, true, false, false, new Dictionary<string, object> {
- { DeadLetterExchangeVariableName, ExchangeName },
- { MessageTTLVariableName, currentBackOff } });
- Model.QueueBind(backOffQueueName, backOffExchangeName, QueueName, null);
- }
- }
- protected void CreateDeadLetterQueueAndExchange()
- {
- if (BackOffs.Any())
- {
- var exchangeName = $"{ExchangeName}.dead";
- var queueName = $"{QueueName}/dead";
- Model.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false);
- Model.QueueDeclare(queueName, true, false, false, null);
- Model.QueueBind(queueName, exchangeName, QueueName, null);
- }
- }
- protected void EnsureQueuesAndExchangesAreSetup()
- {
- Model.ExchangeDeclare(ExchangeName, ExchangeType.Direct, true, false);
- Model.QueueDeclare(QueueName, true, false, false, null);
- Model.QueueBind(QueueName, ExchangeName, QueueName, null);
- CreateBackOffQueuesAndExchanges();
- CreateDeadLetterQueueAndExchange();
- }
- public void Enqueue(T content)
- {
- var props = CreatePersistentTextProperties();
- using (var memoryStream = new MemoryStream())
- {
- formatter.Serialize(memoryStream, content);
- Model.BasicPublish(ExchangeName, QueueName, props, memoryStream.ToArray());
- }
- }
- public void StartListener(Func<T, bool> func)
- {
- var consumer = new EventingBasicConsumer(Model);
- consumer.Received += (ch, ea) =>
- {
- T model;
- using (MemoryStream memoryStream = new MemoryStream(ea.Body))
- {
- model = (T)formatter.Deserialize(memoryStream);
- }
- var isSuccessful = func?.Invoke(model) ?? false;
- MessageProcessed(ea, isSuccessful);
- };
- consumerTag = Model.BasicConsume(QueueName, false, consumer);
- }
- public void StopListener()
- {
- Model.BasicCancel(consumerTag);
- consumerTag = null;
- }
- protected void MessageProcessed(BasicDeliverEventArgs args, bool isSuccessful)
- {
- if (isSuccessful)
- {
- Model.BasicAck(args.DeliveryTag, false);
- }
- else
- {
- MessageFailed(args);
- }
- }
- protected void MessageFailed(BasicDeliverEventArgs args)
- {
- var failedCount = 1;
- if (args?.BasicProperties?.Headers != null && args.BasicProperties.Headers.ContainsKey(DeathVariableName))
- {
- var death = (List<object>)args.BasicProperties.Headers[DeathVariableName];
- if (death != null) { failedCount += death.Count; }
- }
- if (failedCount > BackOffs.Length)
- {
- MessageDead(args);
- }
- else
- {
- var exchangeName = $"{ExchangeName}.backoff.{failedCount}";
- Model.BasicPublish(exchangeName, args.RoutingKey, args.BasicProperties, args.Body);
- Model.BasicAck(args.DeliveryTag, false);
- }
- }
- protected void MessageDead(BasicDeliverEventArgs args)
- {
- var exchangeName = $"{ExchangeName}.dead";
- var props = args.BasicProperties;
- Model.BasicPublish(exchangeName, args.RoutingKey, props, args.Body);
- Model.BasicAck(args.DeliveryTag, false);
- }
- public void Dispose()
- {
- if (_model != null && consumerTag != null)
- {
- StopListener();
- }
- if (_connection != null && _connection.IsOpen) { _connection.Close(); }
- _model?.Dispose();
- _connection?.Dispose();
- }
- }
- }
Add Comment
Please, Sign In to add comment