Advertisement
arvigeus

Untitled

Feb 22nd, 2018
217
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 3.06 KB | None | 0 0
  1. using System;
  2. using System.Threading;
  3. using System.Threading.Tasks;
  4. using Polly;
  5. using RabbitMQ.Client.Exceptions;
  6. using RawRabbit;
  7. using RawRabbit.Common;
  8. using RawRabbit.Configuration;
  9. using RawRabbit.Configuration.Exchange;
  10. using RawRabbit.Configuration.Queue;
  11. using RawRabbit.Enrichers.Attributes;
  12. using RawRabbit.Enrichers.Polly;
  13. using RawRabbit.Instantiation;
  14. using RawRabbit.Pipe;
  15.  
  16. namespace BugsBunny
  17. {
  18.     class Program
  19.     {
  20.         [Exchange(Name = "service", Type = ExchangeType.Topic)]
  21.         [Queue(Name = "notification", MessageTtl = 3000)]
  22.         [Routing(RoutingKey = "notification", AutoAck = true, PrefetchCount = 50)]
  23.         class BasicMessage
  24.         {
  25.             public BasicMessage(string message) { }
  26.         }
  27.  
  28.         private static async Task<Acknowledgement> OnBasicMessage(BasicMessage msg)
  29.         {
  30.             Console.WriteLine("OnBasicMessage");
  31.             throw new Exception("Force polly");
  32.         }
  33.  
  34.         public static void Main()
  35.         {
  36.             var exitEvent = new ManualResetEvent(false);
  37.             Console.CancelKeyPress += (sender, eventArgs) => { eventArgs.Cancel = true; exitEvent.Set(); };
  38.  
  39.             var defaultPolicy = Policy
  40.                 .Handle<Exception>()
  41.                 .RetryAsync((exception, retryCount, pollyContext) =>
  42.                 {
  43.                     Console.WriteLine("defaultCalled");
  44.                 });
  45.             var declareQueuePolicy = Policy
  46.                 .Handle<OperationInterruptedException>()
  47.                 .RetryAsync(async (e, retryCount, ctx) =>
  48.                 {
  49.                     Console.WriteLine("customCalled");
  50.                     var defaultQueueCfg = ctx.GetPipeContext().GetClientConfiguration().Queue;
  51.                     var topology = ctx.GetTopologyProvider();
  52.                     var queue = new QueueDeclaration(defaultQueueCfg) { Name = ctx.GetQueueName(), Durable = false };
  53.                     await topology.DeclareQueueAsync(queue);
  54.                 });
  55.  
  56.             var client = RawRabbitFactory.CreateSingleton(options: new RawRabbitOptions()
  57.             {
  58.                 Plugins = p => p
  59.                     .UseAttributeRouting()
  60.                     .UsePolly(c => c
  61.                         .UsePolicy(defaultPolicy)
  62.                         .UsePolicy(declareQueuePolicy, PolicyKeys.QueueBind)
  63.                     ),
  64.                 ClientConfiguration = new RawRabbitConfiguration
  65.                 {
  66.                     Username = "guest",
  67.                     Password = "guest",
  68.                     VirtualHost = "/",
  69.                     Hostnames = { "localhost" },
  70.                     Port = 5672
  71.                 }
  72.             });
  73.  
  74.             var _shutdown = new CancellationTokenSource();
  75.  
  76.             client.SubscribeAsync<BasicMessage>(OnBasicMessage);
  77.  
  78.             client.PublishAsync(new BasicMessage("Hello. world!"));
  79.            
  80.             Console.WriteLine("Press Ctrl+C to exit...");
  81.             exitEvent.WaitOne();
  82.  
  83.             client.Dispose();
  84.             exitEvent.Dispose();
  85.         }
  86.     }
  87. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement