Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Net;
- using ProtoBuf;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
- using RabbitMQ.Client.Exceptions;
- using ServiceStack.Text;
- namespace WebHooks
- {
- internal class Program
- {
- public static class Exchange
- {
- public const string Default = "WebHookExchange";
- public const string Retry = "WebHookRetryExchange";
- }
- public static class Queue
- {
- public const string Default = "WebHookQueue";
- public const string Retry = "WebHookRetry";
- }
- private static void ConfigureModel(IModel channel) {
- channel.ExchangeDeclare(
- Exchange.Default,
- ExchangeType.Fanout,
- /* Durable */ true,
- /* Auto-Delete */ false,
- null);
- channel.QueueDeclare(
- Queue.Default,
- /* Durable */ true,
- /* Exclusive */ false,
- /* Auto-Delete */ false,
- null);
- channel.QueueBind(Queue.Default, Exchange.Default, "", null);
- channel.ExchangeDeclare(
- Exchange.Retry,
- ExchangeType.Fanout,
- /* Durable */ true,
- /* Auto-Delete */ false,
- null);
- channel.QueueDeclare(
- Queue.Retry,
- /* Durable */ true,
- /* Exclusive */ false,
- /* Auto-Delete */ false,
- new Dictionary<string, object> {
- {"x-dead-letter-exchange", Exchange.Default}
- });
- channel.QueueBind(Queue.Retry, Exchange.Retry, "", null);
- //Enable options
- channel.ConfirmSelect();
- }
- private static byte[] ProtoSerialize<T>(T value)
- where T : class {
- using (var ms = new MemoryStream()) {
- Serializer.Serialize(ms, value);
- return ms.ToArray();
- }
- }
- private static T ProtoDeserialize<T>(byte[] data) {
- using (var ms = new MemoryStream(data)) {
- return Serializer.Deserialize<T>(ms);
- }
- }
- private static void Enqueue(IModel channel, WebHookCallback callback) {
- var envelope = new RetryEnvelope {
- Retries = 0,
- Data = ProtoSerialize(callback)
- };
- var props = channel.CreateBasicProperties();
- props.DeliveryMode = 2;
- props.ContentType = "application/protobuf";
- channel.BasicPublish(Exchange.Default, "", props, ProtoSerialize(envelope));
- channel.WaitForConfirms();
- }
- private static readonly IDictionary<int, TimeSpan> RetryLevelDelays = new Dictionary<int, TimeSpan> {
- {1, TimeSpan.FromSeconds(5)},
- {2, TimeSpan.FromSeconds(5)},
- {3, TimeSpan.FromSeconds(5)},
- {4, TimeSpan.FromSeconds(5)}
- };
- private static void ProcessRetryEnvelopes(IModel channel) {
- var consumer = new QueueingBasicConsumer(channel);
- channel.BasicConsume(Queue.Default, false, consumer);
- while (true) {
- var e = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
- try {
- var envelope = ProtoDeserialize<RetryEnvelope>(e.Body);
- var callback = ProtoDeserialize<WebHookCallback>(envelope.Data);
- Console.WriteLine(envelope.Retries);
- if (!Process(callback)) {
- envelope.Retries++;
- string exchange;
- var props = channel.CreateBasicProperties();
- props.DeliveryMode = 2;
- props.ContentType = "application/protobuf";
- if (envelope.Retries > 0 &&
- envelope.Retries%2 == 0) {
- exchange = Exchange.Retry;
- TimeSpan delay;
- var retryLevel = envelope.Retries/2;
- if (!RetryLevelDelays.TryGetValue(retryLevel, out delay)) {
- channel.BasicAck(e.DeliveryTag, false);
- Console.WriteLine("Unable to deliver message*");
- continue;
- }
- props.Expiration = ((long) delay.TotalMilliseconds).ToString("D");
- } else {
- exchange = Exchange.Default;
- }
- channel.BasicPublish(exchange, "", props, ProtoSerialize(envelope));
- }
- channel.BasicAck(e.DeliveryTag, false);
- channel.WaitForConfirms();
- } catch (OperationInterruptedException) {
- channel.BasicNack(e.DeliveryTag, false, true);
- }
- }
- }
- private static bool Process(WebHookCallback callback) {
- try {
- var request = WebRequest.CreateHttp(callback.CallbackUrl);
- request.Method = WebRequestMethods.Http.Post;
- request.ContentType = "application/json; charset=UTF-8";
- request.ContentLength = callback.Json.Length;
- using (var stream = request.GetRequestStream()) {
- using (var writer = new StreamWriter(stream)) {
- writer.Write(callback.Json);
- writer.Flush();
- }
- }
- bool processed;
- using (var response = (HttpWebResponse) request.GetResponse()) {
- processed = response.StatusCode == HttpStatusCode.OK;
- response.Close();
- }
- return processed;
- } catch {
- return false;
- }
- }
- private static void Main(string[] args) {
- /* Dependencies
- * ------------
- * RabbitMq.Client
- * protobuf-net (ProtoBuf Serializer)
- * ServiceStack.Text (Json Serializer)
- */
- var factory = new ConnectionFactory();
- using (var connection = factory.CreateConnection()) {
- var channel = connection.CreateModel();
- ConfigureModel(channel);
- Enqueue(
- channel,
- new WebHookCallback {
- CallbackUrl = "http://www.mysite.com/callback_url",
- Json = JsonSerializer.SerializeToString(
- new Dictionary<string, object> {
- {"type", "123"},
- {"data", "collapse"}
- })
- });
- ProcessRetryEnvelopes(channel);
- }
- }
- }
- [ProtoContract]
- public class RetryEnvelope
- {
- [ProtoMember(1)]
- public int Retries { get; set; }
- [ProtoMember(2)]
- public byte[] Data { get; set; }
- }
- [ProtoContract]
- public class WebHookCallback
- {
- [ProtoMember(1)]
- public string CallbackUrl { get; set; }
- [ProtoMember(2)]
- public string Json { get; set; }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement