Advertisement
kogo1a

Kafka Cunsomer Groups

Sep 16th, 2023
766
0
Never
1
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 3.49 KB | None | 0 0
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Net.Http.Headers;
  6. using System.Text.Json;
  7. using System.Text.Json.Serialization;
  8. using System.Threading;
  9. using Confluent.Kafka;
  10.  
  11. namespace Kafka.Consumer_1
  12. {
  13.     public class OffsetModel
  14.     {
  15.         public int Partition { get; set; }
  16.         public long Offset { get; set; }
  17.     }
  18.     class Program
  19.     {
  20.         static void Main(string[] args)
  21.         {
  22.             //Save();
  23.  
  24.             var topics = new[] { "Topic-3" };
  25.             var config = new ConsumerConfig
  26.             {
  27.                 BootstrapServers = "localhost:9092",
  28.                 GroupId = "Group-6",
  29.                 AutoOffsetReset = AutoOffsetReset.Earliest,
  30.                 EnableAutoOffsetStore = false,
  31.                 EnableAutoCommit = false,
  32.             };
  33.  
  34.             var c1 = new ConsumerBuilder<Ignore, string>(config)
  35.             .SetPartitionsRevokedHandler((a, b) =>
  36.             {
  37.                 var m = b.Select(p => new
  38.                 {
  39.                     Partition = p.Partition,
  40.                     o = a.GetWatermarkOffsets(new TopicPartition(p.Topic, p.Partition))
  41.                 })
  42.                 .Select(e => new OffsetModel() { Partition = e.Partition.Value, Offset = e.o.High.Value })
  43.                 .ToList();
  44.  
  45.                 var r = JsonSerializer.Serialize(m);
  46.  
  47.                 File.WriteAllText(@"C:\Kafka\Offsets.txt", r);
  48.             })
  49.             .SetPartitionsAssignedHandler((a, b) =>
  50.             {
  51.                 var store = JsonSerializer.Deserialize<List<OffsetModel>>(File.ReadAllText(@"C:\Kafka\Offsets.txt"));
  52.                 return b
  53.                     .Select(tp => store.First(tp1 => tp.Partition == tp1.Partition))
  54.                     .Select(e => new TopicPartitionOffset(new TopicPartition("Topic-3", e.Partition), e.Offset));
  55.             });
  56.  
  57.             //using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
  58.             using (var consumer = c1.Build())
  59.             {
  60.                 consumer.Subscribe(topics);
  61.                 //consumer.Assign(new TopicPartition("Topic-3", new Partition(0)));
  62.                 //consumer.Assign(new TopicPartitionOffset(new TopicPartition("Topic-3", new Partition(0)), new Offset(3)));
  63.  
  64.                 //consumer
  65.                 //    .Assign(
  66.                 //        new[]
  67.                 //        {
  68.                 //            new TopicPartitionOffset(new TopicPartition("Topic-3", new Partition(0)), new Offset(1)),
  69.                 //            new TopicPartitionOffset(new TopicPartition("Topic-3", new Partition(1)), new Offset(1))
  70.                 //        }
  71.                 //    );
  72.  
  73.  
  74.                 //Thread.Sleep(5000);
  75.                 //consumer.Seek(new TopicPartitionOffset(new TopicPartition("Topic-3", new Partition(0)), new Offset(2)));
  76.  
  77.                 //consumer
  78.                 //    .Assign(
  79.                 //        new[]
  80.                 //        {
  81.                 //            new TopicPartitionOffset(new TopicPartition("Topic-3", new Partition(0)), new Offset(1)),
  82.                 //            //new TopicPartitionOffset(new TopicPartition("Topic-3", new Partition(1)), new Offset(1))
  83.                 //        }
  84.                 //    );
  85.                 while (true)
  86.                 {
  87.                     var consumeResult = consumer.Consume();
  88.                     Console.WriteLine(consumeResult.Message.Value);
  89.                 }
  90.             }
  91.         }
  92.     }
  93. }
Advertisement
Comments
Add Comment
Please, Sign In to add comment
Advertisement