Advertisement
Guest User

Untitled

a guest
Jun 24th, 2019
122
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.39 KB | None | 0 0
  1. public class ConsumerService : BackgroundService
  2. {
  3. private readonly IConfiguration _config;
  4. private readonly IElasticLogger _logger;
  5. private readonly ConsumerConfig _consumerConfig;
  6. private readonly string[] _topics;
  7. private readonly double _maxNumAttempts;
  8. private readonly double _retryIntervalInSec;
  9.  
  10. public ConsumerService(IConfiguration config, IElasticLogger logger)
  11. {
  12. _config = config;
  13. _logger = logger;
  14. _consumerConfig = new ConsumerConfig
  15. {
  16. BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
  17. GroupId = _config.GetValue<string>("Kafka:GroupId"),
  18. EnableAutoCommit = _config.GetValue<bool>("Kafka:Consumer:EnableAutoCommit"),
  19. AutoOffsetReset = (AutoOffsetReset)_config.GetValue<int>("Kafka:Consumer:AutoOffsetReset")
  20. };
  21. _topics = _config.GetValue<string>("Kafka:Consumer:Topics").Split(',');
  22. _maxNumAttempts = _config.GetValue<double>("App:MaxNumAttempts");
  23. _retryIntervalInSec = _config.GetValue<double>("App:RetryIntervalInSec");
  24. }
  25.  
  26. protected override Task ExecuteAsync(CancellationToken stoppingToken)
  27. {
  28. Console.WriteLine("!!! CONSUMER STARTED !!!n");
  29.  
  30. var task = Task.Run(() => ProcessQueue(stoppingToken), stoppingToken);
  31.  
  32. return task;
  33. }
  34.  
  35. private void ProcessQueue(CancellationToken stoppingToken)
  36. {
  37. using (var consumer = new ConsumerBuilder<Ignore, Request>(_consumerConfig).SetValueDeserializer(new MessageDeserializer()).Build())
  38. {
  39. consumer.Subscribe(_topics);
  40.  
  41. try
  42. {
  43. while (!stoppingToken.IsCancellationRequested)
  44. {
  45. try
  46. {
  47. var consumeResult = consumer.Consume(stoppingToken);
  48.  
  49. // Don't want to block consume loop, so starting new Task for each message
  50. Task.Run(async () =>
  51. {
  52. var currentNumAttempts = 0;
  53. var committed = false;
  54.  
  55. var response = new Response();
  56.  
  57. while (currentNumAttempts < _maxNumAttempts)
  58. {
  59. currentNumAttempts++;
  60.  
  61. // SendDataAsync is a method that sends http request to some end-points
  62. response = await Helper.SendDataAsync(consumeResult.Value, _config, _logger);
  63.  
  64. if (response != null && response.Code >= 0)
  65. {
  66. try
  67. {
  68. consumer.Commit(consumeResult);
  69. committed = true;
  70.  
  71. break;
  72. }
  73. catch (KafkaException ex)
  74. {
  75. // log
  76. }
  77. }
  78. else
  79. {
  80. // log
  81. }
  82.  
  83. if (currentNumAttempts < _maxNumAttempts)
  84. {
  85. // Delay between tries
  86. await Task.Delay(TimeSpan.FromSeconds(_retryIntervalInSec));
  87. }
  88. }
  89.  
  90. if (!committed)
  91. {
  92. try
  93. {
  94. consumer.Commit(consumeResult);
  95. }
  96. catch (KafkaException ex)
  97. {
  98. // log
  99. }
  100. }
  101. }, stoppingToken);
  102. }
  103. catch (ConsumeException ex)
  104. {
  105. // log
  106. }
  107. }
  108. }
  109. catch (OperationCanceledException ex)
  110. {
  111. // log
  112. consumer.Close();
  113. }
  114. }
  115. }
  116. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement