Advertisement
Guest User

Untitled

a guest
Aug 9th, 2017
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.66 KB | None | 0 0
  1. [Test]
  2. public void ReproduceAsyncIssue()
  3. {
  4. int recvCount = 0;
  5. int cancelled = 0;
  6.  
  7. CountdownEvent endEvent = new CountdownEvent(1);
  8. bool countdownTriggered = false, shutdown = false;
  9. object shutdownLock = new object();
  10.  
  11. AsyncEventHandler<BasicDeliverEventArgs> callbackAsync = async (o, ea) =>
  12. {
  13. // uncomment to make it work
  14. // await Task.Delay(100).ConfigureAwait(false);
  15.  
  16. try
  17. {
  18. AsyncEventingBasicConsumer c = o as AsyncEventingBasicConsumer;
  19.  
  20. if (shutdown)
  21. {
  22. if (c.Model.IsOpen)
  23. c.Model.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
  24. return;
  25. }
  26.  
  27. if (recvCount >= 2)
  28. {
  29. c.Model.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
  30. lock (shutdownLock)
  31. {
  32. if (!shutdown)
  33. {
  34. shutdown = true;
  35. if (c.Model.IsOpen)
  36. c.Model.BasicCancel(c.ConsumerTag);
  37. }
  38.  
  39. if (!countdownTriggered)
  40. {
  41. countdownTriggered = true;
  42. endEvent.Signal();
  43. }
  44. }
  45. return;
  46. }
  47.  
  48. // simulate async message processing
  49. await Task.Delay(100).ConfigureAwait(false);
  50.  
  51. c.Model.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
  52. Interlocked.Increment(ref recvCount);
  53. }
  54. catch (Exception e)
  55. {
  56. Console.WriteLine(e);
  57. }
  58. };
  59.  
  60. AsyncEventHandler<ConsumerEventArgs> cancelAsync = (o, ea) =>
  61. {
  62. endEvent.Signal();
  63. Console.WriteLine("cancel callback async");
  64. Interlocked.Increment(ref cancelled);
  65. return Task.CompletedTask;
  66. };
  67.  
  68. var factory = new ConnectionFactory()
  69. {
  70. HostName = QueueHost,
  71. Port = QueuePort,
  72. UserName = QueueUser,
  73. Password = QueuePass,
  74. AutomaticRecoveryEnabled = true,
  75. DispatchConsumersAsync = true,
  76. UseBackgroundThreadsForIO = true
  77. };
  78.  
  79. var connection = factory.CreateConnection();
  80. var channel = connection.CreateModel();
  81.  
  82. channel.ConfirmSelect();
  83.  
  84. var asyncConsumer = new AsyncEventingBasicConsumer(channel);
  85. asyncConsumer.Received += callbackAsync;
  86. asyncConsumer.Unregistered += cancelAsync;
  87.  
  88. channel.BasicQos(0, 1, false);
  89. var tag = channel.BasicConsume(queue: "TestExchange_TestQueue",
  90. autoAck: false,
  91. consumer: asyncConsumer);
  92. endEvent.AddCount();
  93.  
  94. Thread.Sleep(1000);
  95.  
  96. Console.WriteLine("starting wait");
  97. bool t = endEvent.Wait(TimeSpan.FromSeconds(25));
  98. Console.WriteLine("wait over");
  99.  
  100. connection.Close();
  101.  
  102. Assert.IsTrue(t);
  103. Assert.AreEqual(2, recvCount);
  104. Assert.AreEqual(1, cancelled);
  105. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement