Advertisement
Guest User

Untitled

a guest
Nov 27th, 2019
483
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 2.62 KB | None | 0 0
  1. using Microsoft.Azure.EventHubs;
  2. using Microsoft.Azure.EventHubs.Processor;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Text;
  7. using System.Threading.Tasks;
  8.  
  9. namespace ConsoleApp1
  10. {
  11.     public class MyEventProcessor : IEventProcessor
  12.     {
  13.         private Stopwatch checkpointStopWatch;
  14.  
  15.         public Task ProcessErrorAsync(PartitionContext context, Exception error)
  16.         {
  17.             Console.WriteLine(error.ToString());
  18.             return Task.FromResult(true);
  19.         }
  20.  
  21.         async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
  22.         {
  23.             if (reason == CloseReason.Shutdown)
  24.             {
  25.                 await context.CheckpointAsync();
  26.             }
  27.         }
  28.  
  29.         Task IEventProcessor.OpenAsync(PartitionContext context)
  30.         {
  31.             var eventHubPartitionId = context.PartitionId;
  32.             Console.WriteLine($"Registered reading from the partition: {eventHubPartitionId} ");
  33.             this.checkpointStopWatch = new Stopwatch();
  34.             this.checkpointStopWatch.Start();
  35.             return Task.FromResult<object>(null);
  36.         }
  37.  
  38.         //Data comes in here
  39.         async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
  40.         {
  41.             foreach (var eventData in messages)
  42.             {
  43.                 var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
  44.                 Console.WriteLine($"Message Received from partition {context.PartitionId}: {data}");
  45.             }
  46.  
  47.             await context.CheckpointAsync();
  48.         }
  49.     }
  50.     class Program
  51.     {
  52.         static void Main(string[] args)
  53.         {
  54.             string processorHostName = Guid.NewGuid().ToString();
  55.             var Options = new EventProcessorOptions()
  56.             {
  57.                 MaxBatchSize = 1,
  58.             };
  59.             Options.SetExceptionHandler((ex) =>
  60.             {
  61.                 System.Diagnostics.Debug.WriteLine($"Exception : {ex}");
  62.             });
  63.             var eventHubCS = "event hub connection string";
  64.             var storageCS = "storage connection string";
  65.             var containerName = "test";
  66.             var eventHubname = "test2";
  67.             EventProcessorHost eventProcessorHost = new EventProcessorHost(eventHubname, "$Default", eventHubCS, storageCS, containerName);
  68.             eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>(Options).Wait();
  69.  
  70.             while(true)
  71.             {
  72.                 //do nothing
  73.             }
  74.         }
  75.     }
  76. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement