Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- using Polly;
- using RabbitMQ.Client.Exceptions;
- using RawRabbit;
- using RawRabbit.Common;
- using RawRabbit.Configuration;
- using RawRabbit.Configuration.Exchange;
- using RawRabbit.Configuration.Queue;
- using RawRabbit.Enrichers.Attributes;
- using RawRabbit.Enrichers.Polly;
- using RawRabbit.Instantiation;
- using RawRabbit.Pipe;
- namespace BugsBunny
- {
- class Program
- {
- [Exchange(Name = "service", Type = ExchangeType.Topic)]
- [Queue(Name = "notification", MessageTtl = 3000)]
- [Routing(RoutingKey = "notification", AutoAck = true, PrefetchCount = 50)]
- class BasicMessage
- {
- public BasicMessage(string message) { }
- }
- private static async Task<Acknowledgement> OnBasicMessage(BasicMessage msg)
- {
- Console.WriteLine("OnBasicMessage");
- throw new Exception("Force polly");
- }
- public static void Main()
- {
- var exitEvent = new ManualResetEvent(false);
- Console.CancelKeyPress += (sender, eventArgs) => { eventArgs.Cancel = true; exitEvent.Set(); };
- var defaultPolicy = Policy
- .Handle<Exception>()
- .RetryAsync((exception, retryCount, pollyContext) =>
- {
- Console.WriteLine("defaultCalled");
- });
- var declareQueuePolicy = Policy
- .Handle<OperationInterruptedException>()
- .RetryAsync(async (e, retryCount, ctx) =>
- {
- Console.WriteLine("customCalled");
- var defaultQueueCfg = ctx.GetPipeContext().GetClientConfiguration().Queue;
- var topology = ctx.GetTopologyProvider();
- var queue = new QueueDeclaration(defaultQueueCfg) { Name = ctx.GetQueueName(), Durable = false };
- await topology.DeclareQueueAsync(queue);
- });
- var client = RawRabbitFactory.CreateSingleton(options: new RawRabbitOptions()
- {
- Plugins = p => p
- .UseAttributeRouting()
- .UsePolly(c => c
- .UsePolicy(defaultPolicy)
- .UsePolicy(declareQueuePolicy, PolicyKeys.QueueBind)
- ),
- ClientConfiguration = new RawRabbitConfiguration
- {
- Username = "guest",
- Password = "guest",
- VirtualHost = "/",
- Hostnames = { "localhost" },
- Port = 5672
- }
- });
- var _shutdown = new CancellationTokenSource();
- client.SubscribeAsync<BasicMessage>(OnBasicMessage);
- client.PublishAsync(new BasicMessage("Hello. world!"));
- Console.WriteLine("Press Ctrl+C to exit...");
- exitEvent.WaitOne();
- client.Dispose();
- exitEvent.Dispose();
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement