Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- [RegisterSingletonToDi]
- public class CacheInvalidationWorker
- {
- private static readonly CacheKeyMappingStorage CacheKeysMappingStorage = new CacheKeyMappingStorage();
- private readonly RabbitMQConnectionOptions _options;
- private IConnection _conn;
- private static IMemoryCache _cache;
- private IModel _channel;
- public CacheInvalidationWorker(
- IOptions<RabbitMQConnectionOptions> options,
- IMemoryCache cache)
- {
- _options = options.Value;
- _cache = cache;
- Consume();
- }
- public static void AddCacheKeyMapping(string key, string value = null)
- {
- CacheKeysMappingStorage.Add(key, value ?? key);
- }
- private bool ConnectToRabbitMq()
- {
- _conn?.Dispose();
- _channel?.Dispose();
- try
- {
- var factory = new ConnectionFactory
- {
- UserName = _options.UserName,
- Password = _options.Passwd,
- VirtualHost = _options.VirtualHost,
- HostName = _options.Host
- };
- _conn = factory.CreateConnection();
- _channel = _conn.CreateModel();
- return true;
- }
- catch (System.IO.EndOfStreamException ex)
- {
- Console.WriteLine(ex);
- }
- catch (BrokerUnreachableException ex)
- {
- Console.WriteLine(ex);
- }
- return false;
- }
- private void Consume()
- {
- ConnectToRabbitMq();
- _channel.ExchangeDeclare("cache_invalidation", "fanout");
- var queueName = _channel.QueueDeclare().QueueName;
- _channel.QueueBind(queueName,"cache_invalidation",string.Empty);
- var consumer = new EventingBasicConsumer(_channel);
- consumer.Received += (model, ea) =>
- {
- var body = ea.Body;
- var msg = LZ4MessagePackSerializer.Deserialize<string>(
- body,
- MessagePack.Resolvers.ContractlessStandardResolver.Instance);
- RemoveFromCache(msg);
- };
- consumer.Shutdown += (o, e) =>
- {
- Consume();
- };
- _channel.BasicConsume(queueName,true,consumer);
- }
- private static void RemoveFromCache(string keyMapping)
- {
- if (!CacheKeysMappingStorage.TryRemove(keyMapping, out var keys))
- {
- return;
- }
- foreach (var key in keys)
- {
- _cache.Remove(key.Key);
- }
- }
- }
- public class CacheKeyMappingStorage
- {
- private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, byte>> _straightMapping =
- new ConcurrentDictionary<string, ConcurrentDictionary<string, byte>>();
- private readonly ConcurrentDictionary<string, ConcurrentDictionary<string, byte>> _reverseMapping =
- new ConcurrentDictionary<string, ConcurrentDictionary<string, byte>>();
- public void Add(string key, string value)
- {
- while (true)
- {
- if (!_straightMapping.TryGetValue(key, out var values))
- {
- values = new ConcurrentDictionary<string, byte>();
- if (!_straightMapping.TryAdd(key, values))
- {
- continue;
- }
- }
- values.TryAdd(value, 0);
- ReverseAdd(value, key);
- break;
- }
- }
- private void ReverseAdd(string key, string value)
- {
- while (true)
- {
- if (!_reverseMapping.TryGetValue(key, out var values))
- {
- values = new ConcurrentDictionary<string, byte>();
- if (!_reverseMapping.TryAdd(key, values))
- {
- continue;
- }
- }
- values.TryAdd(value, 0);
- break;
- }
- }
- public bool TryRemove(string map, out ConcurrentDictionary<string, byte> keys)
- {
- if (!_straightMapping.TryRemove(map, out keys)) return false;
- foreach (var key in keys)
- {
- if (!_reverseMapping.TryRemove(key.Key, out var reverseMaps)) continue;
- foreach (var reverseMap in reverseMaps)
- {
- if (reverseMap.Key == map ||
- !_straightMapping.TryGetValue(reverseMap.Key, out var straightMap)) continue;
- straightMap.TryRemove(key.Key, out _);
- if(straightMap.Count == 0) _straightMapping.TryRemove(reverseMap.Key, out _);
- }
- }
- return true;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement