Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Linq;
- using System.Net.Http.Headers;
- using System.Text.Json;
- using System.Text.Json.Serialization;
- using System.Threading;
- using Confluent.Kafka;
- namespace Kafka.Consumer_1
- {
- public class OffsetModel
- {
- public int Partition { get; set; }
- public long Offset { get; set; }
- }
- class Program
- {
- static void Main(string[] args)
- {
- //Save();
- var topics = new[] { "Topic-3" };
- var config = new ConsumerConfig
- {
- BootstrapServers = "localhost:9092",
- GroupId = "Group-6",
- AutoOffsetReset = AutoOffsetReset.Earliest,
- EnableAutoOffsetStore = false,
- EnableAutoCommit = false,
- };
- var c1 = new ConsumerBuilder<Ignore, string>(config)
- .SetPartitionsRevokedHandler((a, b) =>
- {
- var m = b.Select(p => new
- {
- Partition = p.Partition,
- o = a.GetWatermarkOffsets(new TopicPartition(p.Topic, p.Partition))
- })
- .Select(e => new OffsetModel() { Partition = e.Partition.Value, Offset = e.o.High.Value })
- .ToList();
- var r = JsonSerializer.Serialize(m);
- File.WriteAllText(@"C:\Kafka\Offsets.txt", r);
- })
- .SetPartitionsAssignedHandler((a, b) =>
- {
- var store = JsonSerializer.Deserialize<List<OffsetModel>>(File.ReadAllText(@"C:\Kafka\Offsets.txt"));
- return b
- .Select(tp => store.First(tp1 => tp.Partition == tp1.Partition))
- .Select(e => new TopicPartitionOffset(new TopicPartition("Topic-3", e.Partition), e.Offset));
- });
- //using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
- using (var consumer = c1.Build())
- {
- consumer.Subscribe(topics);
- //consumer.Assign(new TopicPartition("Topic-3", new Partition(0)));
- //consumer.Assign(new TopicPartitionOffset(new TopicPartition("Topic-3", new Partition(0)), new Offset(3)));
- //consumer
- // .Assign(
- // new[]
- // {
- // new TopicPartitionOffset(new TopicPartition("Topic-3", new Partition(0)), new Offset(1)),
- // new TopicPartitionOffset(new TopicPartition("Topic-3", new Partition(1)), new Offset(1))
- // }
- // );
- //Thread.Sleep(5000);
- //consumer.Seek(new TopicPartitionOffset(new TopicPartition("Topic-3", new Partition(0)), new Offset(2)));
- //consumer
- // .Assign(
- // new[]
- // {
- // new TopicPartitionOffset(new TopicPartition("Topic-3", new Partition(0)), new Offset(1)),
- // //new TopicPartitionOffset(new TopicPartition("Topic-3", new Partition(1)), new Offset(1))
- // }
- // );
- while (true)
- {
- var consumeResult = consumer.Consume();
- Console.WriteLine(consumeResult.Message.Value);
- }
- }
- }
- }
- }
Advertisement
Comments
-
- I thought this was about kafka from hi:sr but nope
Add Comment
Please, Sign In to add comment
Advertisement