Advertisement
simonradev

FeedEngineWithInterface

Jun 6th, 2018
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 2.91 KB | None | 0 0
  1. namespace MonitoringSystem.Hubs.Engines
  2. {
  3.     using System;
  4.     using System.Collections.Generic;
  5.     using System.Linq;
  6.     using Idnc.Cache;
  7.     using Idnc.Cache.Contracts;
  8.     using Idnc.Logger;
  9.     using MonitoringSystem.Kafka.Loaders;
  10.     using MonitoringSystem.SelfHost.Data;
  11.  
  12.     public class KafkaFeed<TCache, TSnapshot> : IKafkaFeed<TCache, TSnapshot>
  13.         where TSnapshot : BetsSnapshot, new()
  14.         where TCache : CacheObject<string, TSnapshot>, new()
  15.     {
  16.         private static readonly Type LoggerName = typeof(KafkaFeed<TCache, TSnapshot>);
  17.  
  18.         private static readonly TCache _cache;
  19.  
  20.         private static bool _isRunning;
  21.         private static object _instanceLock;
  22.  
  23.         public static ICacheLoader<TCache> AggregatedBetsSnapshotCacheLoader { get; private set; }
  24.  
  25.         public static TSnapshot LastUpdateItem { get; private set; }
  26.  
  27.         static KafkaFeed()
  28.         {
  29.             _cache = new TCache();
  30.             _instanceLock = new object();
  31.  
  32.             AggregatedBetsSnapshotCacheLoader = (ICacheLoader<TCache>)KafkaLoadersRegistrar.Instance.GetLoader<TCache>();
  33.             AggregatedBetsSnapshotCacheLoader.Init();
  34.  
  35.             CacheObject<string, TSnapshot>.CacheUpdated += AggregatedBetsSnapshotCache_CacheUpdated;
  36.         }
  37.  
  38.         public TSnapshot NewestItem => LastUpdateItem;
  39.  
  40.         private static void AggregatedBetsSnapshotCache_CacheUpdated(ICacheObject arg1, List<Tuple<TSnapshot, TSnapshot>> arg2)
  41.         {
  42.             try
  43.             {
  44.                 //TODO Figure out why this is called every second
  45.                 var item = arg2.FirstOrDefault().Item1;
  46.                 var offsetTime = DateTime.UtcNow.AddMinutes(-15); //Skip all elements from topic with older than 15min
  47.                 if (LastUpdateItem == null)
  48.                 {
  49.                     LastUpdateItem = new TSnapshot
  50.                     {
  51.                         EndTime = DateTime.UtcNow.AddMinutes(-5)
  52.                     };
  53.                 }
  54.  
  55.                 if (item != null && item.Data.Count > 0
  56.                     && item.EndTime >= offsetTime
  57.                     && item.EndTime > LastUpdateItem?.EndTime)
  58.                 {
  59.                     LastUpdateItem = arg2.FirstOrDefault().Item1;
  60.                 }
  61.             }
  62.             catch (Exception ex)
  63.             {
  64.                 LogManager.Current.GetLogger(LoggerName).Error("Cache Updated Ex:", ex);
  65.             }
  66.         }
  67.  
  68.         public static void Run()
  69.         {
  70.             if (!_isRunning)
  71.             {
  72.                 lock (_instanceLock)
  73.                 {
  74.                     try
  75.                     {
  76.                         _isRunning = true;
  77.                     }
  78.                     catch (Exception ex)
  79.                     {
  80.                         LogManager.Current.GetLogger(LoggerName).Error("RUN Ex:", ex);
  81.                     }
  82.                 }
  83.             }
  84.         }
  85.     }
  86. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement