Advertisement
simonradev

KafkaFeed

Jun 6th, 2018
92
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 2.78 KB | None | 0 0
  1. namespace MonitoringSystem.Hubs.Engines
  2. {
  3.     using System;
  4.     using System.Collections.Generic;
  5.     using System.Linq;
  6.     using Idnc.Cache;
  7.     using Idnc.Cache.Contracts;
  8.     using Idnc.Logger;
  9.     using MonitoringSystem.Kafka.Loaders;
  10.     using MonitoringSystem.SelfHost.Data;
  11.  
  12.     public class KafkaFeed<TCache, TSnapshot>
  13.         where TSnapshot : BetsSnapshot, new()
  14.         where TCache : CacheObject<string, TSnapshot>, new()
  15.     {
  16.         private static readonly TCache _cache;
  17.  
  18.         private static bool _isRunning;
  19.         private static object _instanceLock;
  20.  
  21.         public static ICacheLoader<TCache> AggregatedBetsSnapshotCacheLoader { get; private set; }
  22.  
  23.         public static TSnapshot LastUpdateItem { get; private set; }
  24.  
  25.         static KafkaFeed()
  26.         {
  27.             _cache = new TCache();
  28.             _instanceLock = new object();
  29.  
  30.             AggregatedBetsSnapshotCacheLoader = (ICacheLoader<TCache>)KafkaLoadersRegistrar.Instance.GetLoader<TCache>();
  31.             AggregatedBetsSnapshotCacheLoader.Init();
  32.  
  33.  
  34.             CacheObject<string, TSnapshot>.CacheUpdated += AggregatedBetsSnapshotCache_CacheUpdated;
  35.         }
  36.  
  37.         private static async void AggregatedBetsSnapshotCache_CacheUpdated(ICacheObject arg1, List<Tuple<TSnapshot, TSnapshot>> arg2)
  38.         {
  39.             try
  40.             {
  41.                 //TODO Figure out why this is called every second
  42.                 var item = arg2.FirstOrDefault().Item1;
  43.                 var offsetTime = DateTime.UtcNow.AddMinutes(-15); //Skip all elements from topic with older than 15min
  44.                 if (LastUpdateItem == null)
  45.                 {
  46.                     LastUpdateItem = new TSnapshot
  47.                     {
  48.                         EndTime = DateTime.UtcNow.AddMinutes(-5)
  49.                     };
  50.                 }
  51.  
  52.                 if (item != null && item.Data.Count > 0
  53.                     && item.EndTime >= offsetTime
  54.                     && item.EndTime > LastUpdateItem?.EndTime)
  55.                 {
  56.                     LastUpdateItem = arg2.FirstOrDefault().Item1;
  57.                 }
  58.             }
  59.             catch (Exception ex)
  60.             {
  61.                 LogManager.Current.GetLogger($"{typeof(KafkaFeedEngine)}").Error("Cache Updated Ex:", ex);
  62.             }
  63.         }
  64.  
  65.         public static void Run()
  66.         {
  67.             if (!_isRunning)
  68.             {
  69.                 lock (_instanceLock)
  70.                 {
  71.                     try
  72.                     {
  73.                         _isRunning = true;
  74.                     }
  75.                     catch (Exception ex)
  76.                     {
  77.                         LogManager.Current.GetLogger($"{typeof(KafkaFeedEngine)}").Error("RUN Ex:", ex);
  78.                     }
  79.                 }
  80.             }
  81.         }
  82.     }
  83. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement