Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Threading;
- using System.Threading.Tasks;
- using Microsoft.ServiceBus.Messaging;
- namespace ConsoleApplication1
- {
- [Serializable] public class Msg { public string Test { get; set; } }
- class Program
- {
- static void Main(string[] args)
- {
- const string queueName = "TestQueue";
- const string connectionString = "SERVICE_BUS_CONNECTION_STRING";
- const bool useCustomMessagePump = true;
- var client = QueueClient.CreateFromConnectionString(connectionString, queueName);
- for (var i = 0; i < 10; i++) Send(client); //push some messages
- if (useCustomMessagePump)
- CustomListen(queueName, connectionString, client);
- else
- PumpListen(queueName, connectionString, client);
- }
- public static void CustomListen(string queueName, string connectionString, QueueClient client)
- {
- var cancelMessagePumps = CreateMessagePumps(client, Environment.ProcessorCount, Handle);
- Console.ReadLine();
- cancelMessagePumps.Cancel();
- }
- public static void PumpListen(string queueName, string connectionString, QueueClient client)
- {
- var receiver = client.MessagingFactory.CreateMessageReceiver(queueName);
- receiver.OnMessageAsync(Handle, new OnMessageOptions
- {
- AutoComplete = false,
- MaxConcurrentCalls = Environment.ProcessorCount
- });
- Console.ReadLine();
- }
- public static async Task Handle(BrokeredMessage message)
- {
- Console.WriteLine("Got Message " + message.MessageId);
- Thread.Sleep(2000); //long enough to be human-visible
- await message.CompleteAsync();
- Console.WriteLine("Done Message " + message.MessageId);
- }
- public static void Send(QueueClient client)
- {
- client.SendAsync(new BrokeredMessage(new Msg()) { MessageId = Guid.NewGuid().ToString() });
- }
- public static CancellationTokenSource CreateMessagePumps(QueueClient client, int count, Func<BrokeredMessage, Task> doAction)
- {
- var cancellation = new CancellationTokenSource();
- var taskFactory = new TaskFactory();
- for (var i = 0; i < count; i++)
- taskFactory.StartNew(() =>
- {
- while (Thread.CurrentThread.IsAlive)
- {
- var msg = client.Receive(TimeSpan.FromSeconds(10));
- if (msg != null) doAction(msg).Wait(cancellation.Token);
- }
- }, cancellation.Token);
- return cancellation;
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement