Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class RabbitMQClient : IQueueClient
- {
- private IConnectionFactory _factory;
- private IConnection _connection;
- private ILoggerClient _logger;
- private IWebApiClient _webApiClient;
- private string _queueName;
- private string _dlqName;
- private string _rqName;
- private int _maxRetryCount = 0;
- private int _expiration = 0;
- private decimal _expirationExponent = 0;
- public RabbitMQClient(IConfigurationRoot config, ILoggerClient logger, IWebApiClient webApiClient)
- {
- //Setup the ConnectionFactory
- _factory = new ConnectionFactory()
- {
- UserName = config["RabbitMQSettings:Username"],
- Password = config["RabbitMQSettings:Password"],
- VirtualHost = config["RabbitMQSettings:VirtualHost"],
- HostName = config["RabbitMQSettings:HostName"],
- Port = Convert.ToInt32(config["RabbitMQSettings:Port"]),
- AutomaticRecoveryEnabled = true,
- RequestedHeartbeat = 60,
- Ssl = new SslOption()
- {
- ServerName = config["RabbitMQSettings:HostName"],
- Version = SslProtocols.Tls12,
- CertPath = config["RabbitMQSettings:SSLCertPath"],
- CertPassphrase = config["RabbitMQSettings:SSLCertPassphrase"],
- Enabled = true
- }
- };
- _logger = logger;
- _webApiClient = webApiClient;
- _queueName = config["RabbitMQSettings:QueueName"];
- _dlqName = $"{_queueName}.dlq";
- _rqName = $"{_queueName}.rq";
- _maxRetryCount = int.Parse(config["RabbitMQSettings:MessageSettings:MaxRetryCount"]);
- _expiration = int.Parse(config["RabbitMQSettings:MessageSettings:Expiration"]);
- _expirationExponent = decimal.Parse(config["RabbitMQSettings:MessageSettings:ExpirationExponent"]);
- }
- public void ProcessMessages()
- {
- using (_connection = _factory.CreateConnection())
- {
- using (var channel = _connection.CreateModel())
- {
- /*
- * Create the DLQ.
- * This is where messages will go after the retry limit has been hit.
- */
- channel.ExchangeDeclare(_dlqName, "direct");
- channel.QueueDeclare(_dlqName, true, false, false, null);
- channel.QueueBind(_dlqName, _dlqName, _queueName);
- /*
- * Create the main exchange/queue. we need to explicitly declare
- * the exchange so that we can push items back to it from the retry queue
- * once they're expired.
- */
- channel.ExchangeDeclare(_queueName, "direct");
- channel.QueueDeclare(_queueName, true, false, false, new Dictionary<String, Object>
- {
- { "x-dead-letter-exchange", _dlqName }
- });
- channel.QueueBind(_queueName, _queueName, _queueName);
- /*
- * Set the DLX of the retry queue to be the original queue
- * This is needed for the exponential backoff
- */
- channel.ExchangeDeclare(_rqName, "direct");
- channel.QueueDeclare(_rqName, true, false, false, new Dictionary<String, Object>
- {
- { "x-dead-letter-exchange", _queueName }
- });
- channel.QueueBind(_rqName, _rqName, _queueName);
- channel.BasicQos(0, 1, false);
- Subscription subscription = new Subscription(channel, _queueName, false);
- foreach (BasicDeliverEventArgs e in subscription)
- {
- Stopwatch stopWatch = new Stopwatch();
- try
- {
- var payment = (CreditCardPaymentModel)e.Body.DeSerialize(typeof(CreditCardPaymentModel));
- _logger.EventLog("Payment Dequeued", $"PaymentGuid:{payment.PaymentGuid}");
- stopWatch.Start();
- var response = //The Call to the Web API Happens here we will either get a 200 or a 400 from the WebService
- stopWatch.Stop();
- var elapsedTime = stopWatch.Elapsed.Seconds.ToString();
- if (response.ResponseStatus == HttpStatusCode.BadRequest)
- {
- var errorMessage = $"PaymentGuid: {payment.PaymentGuid} | Elapsed Call Time: {elapsedTime} | ResponseStatus: {((int)response.ResponseStatus).ToString()}"
- + $"/n ErrorMessage: {response.ResponseErrorMessage}";
- _logger.EventLog("Payment Not Processed", errorMessage);
- Retry(e, subscription, errorMessage, payment.PaymentGuid);
- }
- else
- {
- //All the Responses are making it here. But even after the ACK they are being picked up and processoed again.
- subscription.Ack(e);
- _logger.EventLog("Payment Processed", $"--- Payment Processed - PaymentGuid : {payment.PaymentGuid} | Elapsed Call Time: {elapsedTime} | SourceStore : {payment.SourceStore} | Request Response: {(int)response.ResponseStatus}");
- }
- }
- catch (Exception ex)
- {
- Retry(e, subscription, ex.Message);
- _logger.ErrorLog("Payment Not Processed", ex.ToString(), ErrorLogLevel.ERROR);
- }
- }
- }
- }
- }
- public void Retry(BasicDeliverEventArgs payload, Subscription subscription, string errorMessage, Guid paymentGuid = new Guid())
- {
- if(paymentGuid != Guid.Empty)
- {
- _logger.EventLog("Retry Called", $"Retry on Payment Guid {paymentGuid}");
- }
- else
- {
- _logger.EventLog("Retry Called", errorMessage);
- }
- //Get or set the retryCount of the message
- IDictionary<String, object> headersDict = payload.BasicProperties.Headers ?? new Dictionary<String, object>();
- var retryCount = Convert.ToInt32(headersDict.GetValueOrDefault("x-retry-count"));
- //Check if the retryCount is still less than the max and republish the message
- if (retryCount < _maxRetryCount)
- {
- var originalExpiration = Convert.ToInt32(headersDict.GetValueOrDefault("x-expiration"));
- var newExpiration = Convert.ToInt32(originalExpiration == 0 ? _expiration : originalExpiration * _expirationExponent);
- payload.BasicProperties.Expiration = newExpiration.ToString();
- headersDict["x-expiration"] = newExpiration;
- headersDict["x-retry-count"] = ++retryCount;
- payload.BasicProperties.Headers = headersDict;
- subscription.Model.BasicPublish(_rqName, _queueName, payload.BasicProperties, payload.Body);
- subscription.Ack(payload);
- }
- else //Reject the message, which will send it to the DLX / DLQ
- {
- headersDict.Add("x-error-msg", errorMessage);
- payload.BasicProperties.Headers = headersDict;
- subscription.Nack(payload, false, false);
- _logger.ErrorLog("Error", errorMessage, ErrorLogLevel.ERROR);
- }
- }
- }
- public static class DictionaryExtensions
- {
- public static TValue GetValueOrDefault<TKey, TValue>(this IDictionary<TKey, TValue> dic, TKey key)
- {
- return (dic != null && dic.TryGetValue(key, out TValue result)) ? result : default(TValue);
- }
- }
- }
- 2018-09-26T10:09:36.939447046Z EventName: Payment Dequeued | EventMessage: PaymentGuid:a4587a35-a847-4296-b62a-d3e9d797e38an|Container 2
- 2018-09-26T10:09:39.518236133Z EventName: Payment Dequeued | EventMessage: PaymentGuid:1e57c060-992b-4920-91c8-9bebfcf2c102n|Container 2
- 2018-09-26T10:09:41.172011859Z EventName: Payment Dequeued | EventMessage: PaymentGuid:6b7ed885-4b84-4343-8c53-b918a93b68ffn|Container 2
- 2018-09-26T10:09:44.084562207Z EventName: Payment Dequeued | EventMessage: PaymentGuid:ff24b271-5e85-46b5-bbea-7508ee995e84n|Container 2
- 2018-09-26T10:09:50.924115078Z EventName: Payment Dequeued | EventMessage: PaymentGuid:43ad44e4-8019-49fc-8e35-2972f69b7865n|Container 2
- 2018-09-26T10:09:54.715909341Z EventName: Payment Dequeued | EventMessage: PaymentGuid:690565a9-d77a-42b1-a790-96d70e1bc382n|Container 2
- 2018-09-26T10:09:57.913913989Z EventName: Payment Dequeued | EventMessage: PaymentGuid:7be5f003-9fcd-4a7b-9639-ade577371265n|Container 2
- 2018-09-26T10:10:02.726935131Z EventName: Payment Dequeued | EventMessage: PaymentGuid:1da12a4e-ef57-4a3f-87a8-d6eb5fcd4ae2n|Container 2
- 2018-09-26T10:10:04.76819243Z EventName: Payment Dequeued | EventMessage: PaymentGuid:e460d24c-f8fa-478a-a4bd-6a6c3b59db40n|Container 2
- 2018-09-26T10:10:08.552140347Z EventName: Payment Dequeued | EventMessage: PaymentGuid:e232c414-7470-4ba4-b608-e059eafa4e9an|Container 2
- 2018-09-26T10:10:09.72297353Z EventName: Payment Dequeued | EventMessage: PaymentGuid:b251d632-0856-485a-9b5f-f6840be50d33n|Container 2
- 2018-09-26T10:10:13.827316011Z EventName: Payment Dequeued | EventMessage: PaymentGuid:af153f61-cf85-4940-859c-d2e46a2249f8n|Container 2
- 2018-09-26T10:10:15.75094224Z EventName: Payment Dequeued | EventMessage: PaymentGuid:b48842bf-a4e6-4e6a-b46b-21158f98a612n|Container 2
- 2018-09-26T10:09:36.438923553Z EventName: Payment Dequeued | EventMessage: PaymentGuid:bc71e5dd-7ec2-44fb-b11b-e2eb8912e89fn|Container 1
- 2018-09-26T10:09:38.832588539Z EventName: Payment Dequeued | EventMessage: PaymentGuid:a4587a35-a847-4296-b62a-d3e9d797e38an|Container 1
- 2018-09-26T10:09:44.807805799Z EventName: Payment Dequeued | EventMessage: PaymentGuid:ff24b271-5e85-46b5-bbea-7508ee995e84n|Container 1
- 2018-09-26T10:09:48.005181713Z EventName: Payment Dequeued | EventMessage: PaymentGuid:fb2b804c-91cf-4ce5-be78-433c1fcc71f7n|Container 1
- 2018-09-26T10:09:50.3182799Z EventName: Payment Dequeued | EventMessage: PaymentGuid:0033d751-4793-4130-a6f4-3adaa34be5cfn|Container 1
- 2018-09-26T10:09:53.568189013Z EventName: Payment Dequeued | EventMessage: PaymentGuid:43ad44e4-8019-49fc-8e35-2972f69b7865n|Container 1
- 2018-09-26T10:09:54.741638588Z EventName: Payment Dequeued | EventMessage: PaymentGuid:9e8231ab-8c9e-476e-b590-e4fa48aa967dn|Container 1
- 2018-09-26T10:09:57.732274704Z EventName: Payment Dequeued | EventMessage: PaymentGuid:8c8f8356-0096-4895-a559-86319d51ab88n|Container 1
- 2018-09-26T10:10:00.032288718Z EventName: Payment Dequeued | EventMessage: PaymentGuid:c7ac4551-f6da-4bf2-bb0e-216c2bf93d87n|Container 1
- 2018-09-26T10:10:01.776894188Z EventName: Payment Dequeued | EventMessage: PaymentGuid:9ea63c53-fce0-4abd-8e41-73531532d6b2n|Container 1
- 2018-09-26T10:10:04.020859002Z EventName: Payment Dequeued | EventMessage: PaymentGuid:ce25e756-eee5-492e-a719-d2fa7ada77aen|Container 1
- 2018-09-26T10:10:07.944109623Z EventName: Payment Dequeued | EventMessage: PaymentGuid:e232c414-7470-4ba4-b608-e059eafa4e9an|Container 1
- 2018-09-26T10:10:11.674030928Z EventName: Payment Dequeued | EventMessage: PaymentGuid:ff05a52e-c452-4064-9e1b-a16f949cfba4n|Container 1
- 2018-09-26T10:10:13.895172992Z EventName: Payment Dequeued | EventMessage: PaymentGuid:7b820f2c-97cd-4a62-b2fd-7608f8ed9709n|Container 1
- 2018-09-26T09:05:02.464474482Z EventName: Payment Dequeued | EventMessage: PaymentGuid:474ed136-3e8e-4cc6-8637-90d2249925afn|Container 1
- 2018-09-26T09:05:11.036073154Z EventName: Payment Dequeued | EventMessage: PaymentGuid:e85b1790-4d52-4f01-8db5-15a893dc21bfn|Container 1
- 2018-09-26T09:05:08.816003168Z EventName: Payment Dequeued | EventMessage: PaymentGuid:b66108dd-18e0-4c0d-99d3-0e5377a0bf6an|Container 2
- 2018-09-26T09:05:02.659179464Z EventName: Payment Dequeued | EventMessage: PaymentGuid:474ed136-3e8e-4cc6-8637-90d2249925afn|Container 2
- 2018-09-26T08:04:56.579116448Z EventName: Payment Dequeued | EventMessage: PaymentGuid:c526d1f4-ffa2-4b0d-8c40-ada16027683dn|Container 1
- 2018-09-26T08:04:42.699204265Z EventName: Payment Dequeued | EventMessage: PaymentGuid:2639b2c8-12a8-4980-b0e5-be367d531563n|Container 1
- 2018-09-26T08:04:48.83663935Z EventName: Payment Dequeued | EventMessage: PaymentGuid:16785c00-16cd-4895-8b8d-7fd961b04c69n|Container 1
- 2018-09-26T08:04:52.425314816Z EventName: Payment Dequeued | EventMessage: PaymentGuid:22625913-23be-46f5-9ec6-43e5490ab36bn|Container 1
- 2018-09-26T08:04:54.553625628Z EventName: Payment Dequeued | EventMessage: PaymentGuid:22625913-23be-46f5-9ec6-43e5490ab36bn|Container 2
- 2018-09-26T08:04:56.038703755Z EventName: Payment Dequeued | EventMessage: PaymentGuid:2fe806e0-6e3d-4b10-86f1-adef24cd6117n|Container 2
- 2018-09-26T08:04:42.484990855Z EventName: Payment Dequeued | EventMessage: PaymentGuid:7549a8ef-4a94-4a9d-8119-2c2b10441f94n|Container 2
- 2018-09-26T08:04:49.727885876Z EventName: Payment Dequeued | EventMessage: PaymentGuid:3d747e86-471e-4fcb-b286-20f878acf1acn|Container 2
- 2018-09-26T08:04:53.628023877Z EventName: Payment Dequeued | EventMessage: PaymentGuid:22625913-23be-46f5-9ec6-43e5490ab36bn|Container 2
- 2018-09-26T07:22:40.936070765Z EventName: Payment Dequeued | EventMessage: PaymentGuid:4686ffe2-e26a-4677-a0d8-c173962ce35fn|Container 1
- 2018-09-26T07:22:47.300509043Z EventName: Payment Dequeued | EventMessage: PaymentGuid:4686ffe2-e26a-4677-a0d8-c173962ce35fn|Container 1
- 2018-09-26T07:22:34.959499982Z EventName: Payment Dequeued | EventMessage: PaymentGuid:43d2c3af-ff01-4c79-ad0e-a19d33b71bc0n|Container 1
- 2018-09-26T07:22:57.940053988Z EventName: Payment Dequeued | EventMessage: PaymentGuid:b0179f6b-c848-402d-a5db-d68e2deffcb8n|Container 1
- 2018-09-26T07:22:56.91903335Z EventName: Payment Dequeued | EventMessage: PaymentGuid:e8122fdb-86f0-4d16-8285-a4ee907b170bn|Container 1
- 2018-09-26T07:22:52.970086729Z EventName: Payment Dequeued | EventMessage: PaymentGuid:e8122fdb-86f0-4d16-8285-a4ee907b170bn|Container 1
- 2018-09-26T07:22:50.394176023Z EventName: Payment Dequeued | EventMessage: PaymentGuid:0819d65f-40a6-4474-95ae-950d668a98c5n|Container 1
- 2018-09-26T07:22:48.387969063Z EventName: Payment Dequeued | EventMessage: PaymentGuid:ff683509-e768-404e-a73b-b2132bcb7f42n|Container 1
- 2018-09-26T07:22:58.426627422Z EventName: Payment Dequeued | EventMessage: PaymentGuid:b0179f6b-c848-402d-a5db-d68e2deffcb8n|Container 2
- 2018-09-26T07:23:01.530786341Z EventName: Payment Dequeued | EventMessage: PaymentGuid:0b9dd7cd-4d95-4791-978e-7bee0c6c5c86n|Container 2
- 2018-09-26T07:23:03.626103482Z EventName: Payment Dequeued | EventMessage: PaymentGuid:0b9dd7cd-4d95-4791-978e-7bee0c6c5c86n|Container 2
- 2018-09-26T07:22:49.217614814Z EventName: Payment Dequeued | EventMessage: PaymentGuid:d1abab7f-38af-4ebf-b079-01df3992a6e6n|Container 2
- 2018-09-26T07:22:34.848011406Z EventName: Payment Dequeued | EventMessage: PaymentGuid:7fb26a92-0405-4dcb-978d-49d8806b573bn|Container 2
- 2018-09-26T07:22:44.971658054Z EventName: Payment Dequeued | EventMessage: PaymentGuid:4686ffe2-e26a-4677-a0d8-c173962ce35fn|Container 2
- 2018-09-26T07:22:51.649055039Z EventName: Payment Dequeued | EventMessage: PaymentGuid:e8122fdb-86f0-4d16-8285-a4ee907b170bn|Container 2
Add Comment
Please, Sign In to add comment