Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class Program
- {
- private static void Main()
- {
- var eventProcessorHostName = Guid.NewGuid().ToString();
- const string eventHubPath = "";
- const string eventHubConnectionString = "";
- const string storageConnectionString = "";
- const string consumerGroupName = "";
- var manager = NamespaceManager.CreateFromConnectionString(eventHubConnectionString);
- manager.CreateConsumerGroupIfNotExists(eventHubPath, consumerGroupName);
- var host = new EventProcessorHost(
- eventProcessorHostName,
- eventHubPath,
- consumerGroupName,
- eventHubConnectionString,
- storageConnectionString,
- consumerGroupName.ToLower());
- host.RegisterEventProcessorAsync<EventProcessor>().Wait();
- Console.ReadLine();
- host.UnregisterEventProcessorAsync().Wait();
- manager.DeleteConsumerGroup(eventHubPath, consumerGroupName);
- }
- }
- class EventProcessor : IEventProcessor
- {
- Task IEventProcessor.OpenAsync(PartitionContext context)
- {
- Console.WriteLine(
- "Open EventProcessor. Partition: {0}, Offset: {1}, SequenceNumber: {2}",
- context.Lease.PartitionId,
- context.Lease.Offset,
- context.Lease.SequenceNumber);
- return Task.FromResult<object>(null);
- }
- async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
- {
- Console.WriteLine(
- "Close EventProcessor. Partition: {0}, Offset: {1}, SequenceNumber: {2}, Reason: {3}",
- context.Lease.PartitionId,
- context.Lease.Offset,
- context.Lease.SequenceNumber,
- reason);
- await context.CheckpointAsync();
- }
- public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
- {
- foreach (var message in messages)
- {
- var str = Encoding.UTF8.GetString(message.GetBytes());
- Console.WriteLine("{0} : {1}", context.Lease.PartitionId, str);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement