Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- namespace MonitoringSystem.Hubs.Engines
- {
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using Idnc.Cache;
- using Idnc.Cache.Contracts;
- using Idnc.Logger;
- using MonitoringSystem.Kafka.Loaders;
- using MonitoringSystem.SelfHost.Data;
- public class KafkaFeed<TCache, TSnapshot>
- where TSnapshot : BetsSnapshot, new()
- where TCache : CacheObject<string, TSnapshot>, new()
- {
- private static readonly TCache _cache;
- private static bool _isRunning;
- private static object _instanceLock;
- public static ICacheLoader<TCache> AggregatedBetsSnapshotCacheLoader { get; private set; }
- public static TSnapshot LastUpdateItem { get; private set; }
- static KafkaFeed()
- {
- _cache = new TCache();
- _instanceLock = new object();
- AggregatedBetsSnapshotCacheLoader = (ICacheLoader<TCache>)KafkaLoadersRegistrar.Instance.GetLoader<TCache>();
- AggregatedBetsSnapshotCacheLoader.Init();
- CacheObject<string, TSnapshot>.CacheUpdated += AggregatedBetsSnapshotCache_CacheUpdated;
- }
- private static async void AggregatedBetsSnapshotCache_CacheUpdated(ICacheObject arg1, List<Tuple<TSnapshot, TSnapshot>> arg2)
- {
- try
- {
- //TODO Figure out why this is called every second
- var item = arg2.FirstOrDefault().Item1;
- var offsetTime = DateTime.UtcNow.AddMinutes(-15); //Skip all elements from topic with older than 15min
- if (LastUpdateItem == null)
- {
- LastUpdateItem = new TSnapshot
- {
- EndTime = DateTime.UtcNow.AddMinutes(-5)
- };
- }
- if (item != null && item.Data.Count > 0
- && item.EndTime >= offsetTime
- && item.EndTime > LastUpdateItem?.EndTime)
- {
- LastUpdateItem = arg2.FirstOrDefault().Item1;
- }
- }
- catch (Exception ex)
- {
- LogManager.Current.GetLogger($"{typeof(KafkaFeedEngine)}").Error("Cache Updated Ex:", ex);
- }
- }
- public static void Run()
- {
- if (!_isRunning)
- {
- lock (_instanceLock)
- {
- try
- {
- _isRunning = true;
- }
- catch (Exception ex)
- {
- LogManager.Current.GetLogger($"{typeof(KafkaFeedEngine)}").Error("RUN Ex:", ex);
- }
- }
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement