Advertisement
Guest User

Untitled

a guest
Aug 4th, 2015
187
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.72 KB | None | 0 0
  1. using System;
  2. using System.Threading;
  3. using System.Threading.Tasks;
  4. using Microsoft.ServiceBus.Messaging;
  5.  
  6. namespace ConsoleApplication1
  7. {
  8. [Serializable] public class Msg { public string Test { get; set; } }
  9.  
  10. class Program
  11. {
  12. static void Main(string[] args)
  13. {
  14. const string queueName = "TestQueue";
  15. const string connectionString = "SERVICE_BUS_CONNECTION_STRING";
  16. const bool useCustomMessagePump = true;
  17.  
  18. var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
  19. for (var i = 0; i < 10; i++) Send(client); //push some messages
  20.  
  21. if (useCustomMessagePump)
  22. CustomListen(queueName, connectionString, client);
  23. else
  24. PumpListen(queueName, connectionString, client);
  25. }
  26.  
  27. public static void CustomListen(string queueName, string connectionString, QueueClient client)
  28. {
  29. var cancelMessagePumps = CreateMessagePumps(client, Environment.ProcessorCount, Handle);
  30.  
  31. Console.ReadLine();
  32. cancelMessagePumps.Cancel();
  33. }
  34.  
  35. public static void PumpListen(string queueName, string connectionString, QueueClient client)
  36. {
  37. var receiver = client.MessagingFactory.CreateMessageReceiver(queueName);
  38. receiver.OnMessageAsync(Handle, new OnMessageOptions
  39. {
  40. AutoComplete = false,
  41. MaxConcurrentCalls = Environment.ProcessorCount
  42. });
  43. Console.ReadLine();
  44. }
  45.  
  46. public static async Task Handle(BrokeredMessage message)
  47. {
  48. Console.WriteLine("Got Message " + message.MessageId);
  49. Thread.Sleep(2000); //long enough to be human-visible
  50. await message.CompleteAsync();
  51. Console.WriteLine("Done Message " + message.MessageId);
  52. }
  53.  
  54. public static void Send(QueueClient client)
  55. {
  56. client.SendAsync(new BrokeredMessage(new Msg()) { MessageId = Guid.NewGuid().ToString() });
  57. }
  58.  
  59. public static CancellationTokenSource CreateMessagePumps(QueueClient client, int count, Func<BrokeredMessage, Task> doAction)
  60. {
  61. var cancellation = new CancellationTokenSource();
  62. var taskFactory = new TaskFactory();
  63. for (var i = 0; i < count; i++)
  64. taskFactory.StartNew(() =>
  65. {
  66. while (Thread.CurrentThread.IsAlive)
  67. {
  68. var msg = client.Receive(TimeSpan.FromSeconds(10));
  69. if (msg != null) doAction(msg).Wait(cancellation.Token);
  70. }
  71. }, cancellation.Token);
  72. return cancellation;
  73. }
  74. }
  75. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement