brunobozic

Untitled

Apr 16th, 2021
360
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 3.19 KB | None | 0 0
  1. using Confluent.Kafka;
  2. using HT.Framework.Kafka.Core.Contracts;
  3. using HT.Framework.Kafka.Core.DTOs.KafkaConsumer;
  4. using Serilog;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Diagnostics;
  8.  
  9. namespace GADM.Adapter.Infrastructure.KafkaImplementions.Decorated
  10. {
  11.     public class KafkaConsumerStopwatchDecorator : IKafkaScheduledConsumer
  12.     {
  13.         #region Private Props
  14.  
  15.         private readonly IKafkaScheduledConsumer _decorated;
  16.  
  17.         #endregion Private Props
  18.  
  19.         #region ctor
  20.  
  21.         public KafkaConsumerStopwatchDecorator(IKafkaScheduledConsumer consumer)
  22.         {
  23.             _decorated = consumer;
  24.         }
  25.  
  26.         #endregion ctor
  27.  
  28.         public void Continue()
  29.         {
  30.             _decorated.Continue();
  31.         }
  32.  
  33.         public void Dispose()
  34.         {
  35.             _decorated.Dispose();
  36.         }
  37.  
  38.         public string GetBootstrapServers()
  39.         {
  40.             return _decorated.GetBootstrapServers();
  41.         }
  42.  
  43.         public string GetCurrentOffset()
  44.         {
  45.             return _decorated.GetCurrentOffset();
  46.         }
  47.  
  48.         public string GetKafkaConsumerMaxOffset()
  49.         {
  50.             return _decorated.GetKafkaConsumerMaxOffset();
  51.         }
  52.  
  53.         public string GetTopic()
  54.         {
  55.             return _decorated.GetTopic();
  56.         }
  57.  
  58.         public IConsumer<string, string> Instance()
  59.         {
  60.             return _decorated.Instance();
  61.         }
  62.  
  63.         public void Pause()
  64.         {
  65.             _decorated.Pause();
  66.         }
  67.  
  68.         /// <summary>
  69.         /// Reads a single message from Kafka topic/partition the consumer is subscribed to.
  70.         /// Crucial information here is the _lastOffset variable, we store the consumed message offset into it (if available).
  71.         /// </summary>
  72.         /// <returns></returns>
  73.         public ConsumeMessageResult Consume()
  74.         {
  75.             Log.Information("Reading message from kafka start [ {0} ]", DateTimeOffset.UtcNow);
  76.             Stopwatch stopwatch = new Stopwatch();
  77.             stopwatch.Start();
  78.             var retVal = _decorated.Consume();
  79.             stopwatch.Stop();
  80.             Log.Information("Reading message from kafka took: [ {0} ] Milliseconds", stopwatch.Elapsed.TotalMilliseconds);
  81.             return retVal;
  82.         }
  83.  
  84.         public void Seek(TopicPartition topicPartition, long recordOffset)
  85.         {
  86.             _decorated.Seek(topicPartition, recordOffset);
  87.         }
  88.  
  89.         public bool SkipPoisonPill(ConsumeResult<string, string> consumedMessage)
  90.         {
  91.             return _decorated.SkipPoisonPill(consumedMessage);
  92.         }
  93.  
  94.         public bool SkipPoisonPill(TopicPartition topicPartition, long recordOffset)
  95.         {
  96.             return _decorated.SkipPoisonPill(topicPartition, recordOffset);
  97.         }
  98.  
  99.         public void StoreOffsetFor(ConsumeResult<string, string> msg)
  100.         {
  101.             _decorated.StoreOffsetFor(msg);
  102.         }
  103.  
  104.         public Handle UnderlyingHandle()
  105.         {
  106.             return _decorated.UnderlyingHandle();
  107.         }
  108.  
  109.         public List<string> UnderlyingSubscriptions()
  110.         {
  111.             return _decorated.UnderlyingSubscriptions();
  112.         }
  113.     }
  114. }
  115.  
Advertisement
Add Comment
Please, Sign In to add comment