Advertisement
Guest User

Untitled

a guest
Dec 7th, 2016
61
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.41 KB | None | 0 0
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using RdKafka;
  6.  
  7. namespace AdvancedConsumer
  8. {
  9. public class Program
  10. {
  11. public static void Run(string brokerList, List<string> topics)
  12. {
  13. bool enableAutoCommit = false;
  14.  
  15. var config = new Config()
  16. {
  17. GroupId = "advanced-csharp-consumer",
  18. EnableAutoCommit = enableAutoCommit,
  19. StatisticsInterval = TimeSpan.FromSeconds(60)
  20. };
  21.  
  22. using (var consumer = new EventConsumer(config, brokerList))
  23. {
  24. consumer.OnMessage += (obj, msg) => {
  25. string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
  26. Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
  27.  
  28. if (!enableAutoCommit && msg.Offset % 10 == 0)
  29. {
  30. Console.WriteLine($"Committing offset");
  31. consumer.Commit(msg).Wait();
  32. Console.WriteLine($"Committed offset");
  33. }
  34. };
  35.  
  36. consumer.OnConsumerError += (obj, errorCode) =>
  37. {
  38. Console.WriteLine($"Consumer Error: {errorCode}");
  39. };
  40.  
  41. consumer.OnEndReached += (obj, end) => {
  42. Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
  43. };
  44.  
  45. consumer.OnError += (obj, error) => {
  46. Console.WriteLine($"Error: {error.ErrorCode} {error.Reason}");
  47. };
  48.  
  49. if (enableAutoCommit)
  50. {
  51. consumer.OnOffsetCommit += (obj, commit) => {
  52. if (commit.Error != ErrorCode.NO_ERROR)
  53. {
  54. Console.WriteLine($"Failed to commit offsets: {commit.Error}");
  55. }
  56. Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
  57. };
  58. }
  59.  
  60. consumer.OnPartitionsAssigned += (obj, partitions) => {
  61. Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
  62. consumer.Assign(partitions.Select(p => new TopicPartitionOffset(p.Topic, P.Partition, 2)).ToList());
  63. };
  64.  
  65. consumer.OnPartitionsRevoked += (obj, partitions) => {
  66. Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
  67. consumer.Unassign();
  68. };
  69.  
  70. consumer.OnStatistics += (obj, json) => {
  71. Console.WriteLine($"Statistics: {json}");
  72. };
  73.  
  74. consumer.Subscribe(topics);
  75. consumer.Start();
  76.  
  77. Console.WriteLine($"Assigned to: [{string.Join(", ", consumer.Assignment)}]");
  78. Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
  79.  
  80. Console.WriteLine($"Started consumer, press enter to stop consuming");
  81. Console.ReadLine();
  82. }
  83. }
  84.  
  85. public static void Main(string[] args)
  86. {
  87. Run(args[0], args.Skip(1).ToList());
  88. }
  89. }
  90. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement