Advertisement
Guest User

Web Hook - Retry Level queuing with RabbitMq

a guest
Sep 26th, 2013
1,240
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 7.56 KB | None | 0 0
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Net;
  5. using ProtoBuf;
  6. using RabbitMQ.Client;
  7. using RabbitMQ.Client.Events;
  8. using RabbitMQ.Client.Exceptions;
  9. using ServiceStack.Text;
  10.  
  11. namespace WebHooks
  12. {
  13.     internal class Program
  14.     {
  15.         public static class Exchange
  16.         {
  17.             public const string Default = "WebHookExchange";
  18.             public const string Retry = "WebHookRetryExchange";
  19.         }
  20.  
  21.  
  22.         public static class Queue
  23.         {
  24.             public const string Default = "WebHookQueue";
  25.             public const string Retry = "WebHookRetry";
  26.         }
  27.  
  28.  
  29.         private static void ConfigureModel(IModel channel) {
  30.             channel.ExchangeDeclare(
  31.                 Exchange.Default,
  32.                 ExchangeType.Fanout,
  33.                 /* Durable      */ true,
  34.                 /* Auto-Delete  */ false,
  35.                 null);
  36.             channel.QueueDeclare(
  37.                 Queue.Default,
  38.                 /* Durable      */ true,
  39.                 /* Exclusive    */ false,
  40.                 /* Auto-Delete  */ false,
  41.                 null);
  42.             channel.QueueBind(Queue.Default, Exchange.Default, "", null);
  43.  
  44.             channel.ExchangeDeclare(
  45.                 Exchange.Retry,
  46.                 ExchangeType.Fanout,
  47.                 /* Durable      */ true,
  48.                 /* Auto-Delete  */ false,
  49.                 null);
  50.             channel.QueueDeclare(
  51.                 Queue.Retry,
  52.                 /* Durable      */ true,
  53.                 /* Exclusive    */ false,
  54.                 /* Auto-Delete  */ false,
  55.                 new Dictionary<string, object> {
  56.                     {"x-dead-letter-exchange", Exchange.Default}
  57.                 });
  58.             channel.QueueBind(Queue.Retry, Exchange.Retry, "", null);
  59.  
  60.             //Enable options
  61.             channel.ConfirmSelect();
  62.         }
  63.  
  64.         private static byte[] ProtoSerialize<T>(T value)
  65.             where T : class {
  66.             using (var ms = new MemoryStream()) {
  67.                 Serializer.Serialize(ms, value);
  68.                 return ms.ToArray();
  69.             }
  70.         }
  71.  
  72.         private static T ProtoDeserialize<T>(byte[] data) {
  73.             using (var ms = new MemoryStream(data)) {
  74.                 return Serializer.Deserialize<T>(ms);
  75.             }
  76.         }
  77.  
  78.         private static void Enqueue(IModel channel, WebHookCallback callback) {
  79.             var envelope = new RetryEnvelope {
  80.                 Retries = 0,
  81.                 Data = ProtoSerialize(callback)
  82.             };
  83.             var props = channel.CreateBasicProperties();
  84.             props.DeliveryMode = 2;
  85.             props.ContentType = "application/protobuf";
  86.             channel.BasicPublish(Exchange.Default, "", props, ProtoSerialize(envelope));
  87.             channel.WaitForConfirms();
  88.         }
  89.  
  90.         private static readonly IDictionary<int, TimeSpan> RetryLevelDelays = new Dictionary<int, TimeSpan> {
  91.             {1, TimeSpan.FromSeconds(5)},
  92.             {2, TimeSpan.FromSeconds(5)},
  93.             {3, TimeSpan.FromSeconds(5)},
  94.             {4, TimeSpan.FromSeconds(5)}
  95.         };
  96.  
  97.         private static void ProcessRetryEnvelopes(IModel channel) {
  98.             var consumer = new QueueingBasicConsumer(channel);
  99.             channel.BasicConsume(Queue.Default, false, consumer);
  100.  
  101.             while (true) {
  102.                 var e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
  103.                 try {
  104.                     var envelope = ProtoDeserialize<RetryEnvelope>(e.Body);
  105.                     var callback = ProtoDeserialize<WebHookCallback>(envelope.Data);
  106.                     Console.WriteLine(envelope.Retries);
  107.  
  108.                     if (!Process(callback)) {
  109.                         envelope.Retries++;
  110.                         string exchange;
  111.                         var props = channel.CreateBasicProperties();
  112.                         props.DeliveryMode = 2;
  113.                         props.ContentType = "application/protobuf";
  114.  
  115.                         if (envelope.Retries > 0 &&
  116.                             envelope.Retries%2 == 0) {
  117.                             exchange = Exchange.Retry;
  118.                             TimeSpan delay;
  119.                             var retryLevel = envelope.Retries/2;
  120.                             if (!RetryLevelDelays.TryGetValue(retryLevel, out delay)) {
  121.                                 channel.BasicAck(e.DeliveryTag, false);
  122.                                 Console.WriteLine("Unable to deliver message*");
  123.                                 continue;
  124.                             }
  125.                             props.Expiration = ((long) delay.TotalMilliseconds).ToString("D");
  126.                         } else {
  127.                             exchange = Exchange.Default;
  128.                         }
  129.  
  130.                         channel.BasicPublish(exchange, "", props, ProtoSerialize(envelope));
  131.                     }
  132.  
  133.                     channel.BasicAck(e.DeliveryTag, false);
  134.                     channel.WaitForConfirms();
  135.                 } catch (OperationInterruptedException) {
  136.                     channel.BasicNack(e.DeliveryTag, false, true);
  137.                 }
  138.             }
  139.         }
  140.  
  141.         private static bool Process(WebHookCallback callback) {
  142.             try {
  143.                 var request = WebRequest.CreateHttp(callback.CallbackUrl);
  144.                 request.Method = WebRequestMethods.Http.Post;
  145.                 request.ContentType = "application/json; charset=UTF-8";
  146.                 request.ContentLength = callback.Json.Length;
  147.                 using (var stream = request.GetRequestStream()) {
  148.                     using (var writer = new StreamWriter(stream)) {
  149.                         writer.Write(callback.Json);
  150.                         writer.Flush();
  151.                     }
  152.                 }
  153.                 bool processed;
  154.                 using (var response = (HttpWebResponse) request.GetResponse()) {
  155.                     processed = response.StatusCode == HttpStatusCode.OK;
  156.                     response.Close();
  157.                 }
  158.                 return processed;
  159.             } catch {
  160.                 return false;
  161.             }
  162.         }
  163.  
  164.         private static void Main(string[] args) {
  165.             /* Dependencies
  166.              * ------------
  167.              * RabbitMq.Client
  168.              * protobuf-net (ProtoBuf Serializer)
  169.              * ServiceStack.Text (Json Serializer)
  170.              */
  171.  
  172.             var factory = new ConnectionFactory();
  173.             using (var connection = factory.CreateConnection()) {
  174.                 var channel = connection.CreateModel();
  175.                 ConfigureModel(channel);
  176.                 Enqueue(
  177.                     channel,
  178.                     new WebHookCallback {
  179.                         CallbackUrl = "http://www.mysite.com/callback_url",
  180.                         Json = JsonSerializer.SerializeToString(
  181.                             new Dictionary<string, object> {
  182.                                 {"type", "123"},
  183.                                 {"data", "collapse"}
  184.                             })
  185.                     });
  186.  
  187.                 ProcessRetryEnvelopes(channel);
  188.             }
  189.         }
  190.     }
  191.  
  192.  
  193.     [ProtoContract]
  194.     public class RetryEnvelope
  195.     {
  196.         [ProtoMember(1)]
  197.         public int Retries { get; set; }
  198.  
  199.         [ProtoMember(2)]
  200.         public byte[] Data { get; set; }
  201.     }
  202.  
  203.  
  204.     [ProtoContract]
  205.     public class WebHookCallback
  206.     {
  207.         [ProtoMember(1)]
  208.         public string CallbackUrl { get; set; }
  209.  
  210.         [ProtoMember(2)]
  211.         public string Json { get; set; }
  212.     }
  213. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement