Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- [Test]
- public void ReproduceAsyncIssue()
- {
- int recvCount = 0;
- int cancelled = 0;
- CountdownEvent endEvent = new CountdownEvent(1);
- bool countdownTriggered = false, shutdown = false;
- object shutdownLock = new object();
- AsyncEventHandler<BasicDeliverEventArgs> callbackAsync = async (o, ea) =>
- {
- // uncomment to make it work
- // await Task.Delay(100).ConfigureAwait(false);
- try
- {
- AsyncEventingBasicConsumer c = o as AsyncEventingBasicConsumer;
- if (shutdown)
- {
- if (c.Model.IsOpen)
- c.Model.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
- return;
- }
- if (recvCount >= 2)
- {
- c.Model.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
- lock (shutdownLock)
- {
- if (!shutdown)
- {
- shutdown = true;
- if (c.Model.IsOpen)
- c.Model.BasicCancel(c.ConsumerTag);
- }
- if (!countdownTriggered)
- {
- countdownTriggered = true;
- endEvent.Signal();
- }
- }
- return;
- }
- // simulate async message processing
- await Task.Delay(100).ConfigureAwait(false);
- c.Model.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
- Interlocked.Increment(ref recvCount);
- }
- catch (Exception e)
- {
- Console.WriteLine(e);
- }
- };
- AsyncEventHandler<ConsumerEventArgs> cancelAsync = (o, ea) =>
- {
- endEvent.Signal();
- Console.WriteLine("cancel callback async");
- Interlocked.Increment(ref cancelled);
- return Task.CompletedTask;
- };
- var factory = new ConnectionFactory()
- {
- HostName = QueueHost,
- Port = QueuePort,
- UserName = QueueUser,
- Password = QueuePass,
- AutomaticRecoveryEnabled = true,
- DispatchConsumersAsync = true,
- UseBackgroundThreadsForIO = true
- };
- var connection = factory.CreateConnection();
- var channel = connection.CreateModel();
- channel.ConfirmSelect();
- var asyncConsumer = new AsyncEventingBasicConsumer(channel);
- asyncConsumer.Received += callbackAsync;
- asyncConsumer.Unregistered += cancelAsync;
- channel.BasicQos(0, 1, false);
- var tag = channel.BasicConsume(queue: "TestExchange_TestQueue",
- autoAck: false,
- consumer: asyncConsumer);
- endEvent.AddCount();
- Thread.Sleep(1000);
- Console.WriteLine("starting wait");
- bool t = endEvent.Wait(TimeSpan.FromSeconds(25));
- Console.WriteLine("wait over");
- connection.Close();
- Assert.IsTrue(t);
- Assert.AreEqual(2, recvCount);
- Assert.AreEqual(1, cancelled);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement