Advertisement
Guest User

Untitled

a guest
Dec 14th, 2018
65
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 4.91 KB | None | 0 0
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Collections.ObjectModel;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using Dac.Caravaggio.Services.Provider.Domain;
  8. using Microsoft.Extensions.Logging;
  9. using Microsoft.Extensions.Options;
  10.  
  11. namespace Dac.Caravaggio.Services.Provider.Core
  12. {
  13.     internal class ActionsBuffer<T> : IActionsBuffer<T>
  14.     {
  15.         private readonly ILogger logger;
  16.         private readonly NotificationsBufferSettings settings;
  17.         private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
  18.         private readonly ConcurrentDictionary<string, Buffer<T>> buffers = new ConcurrentDictionary<string, Buffer<T>>();
  19.  
  20.         public ActionsBuffer(ILogger logger, IOptions<NotificationsBufferSettings> settings)
  21.         {
  22.             this.logger = logger;
  23.             this.settings = settings.Value;
  24.         }
  25.  
  26.         public void BufferAction(string id, T item, BufferContinuation<T> continuation) => BufferAction(id, new [] {item}, continuation);
  27.  
  28.         public void BufferAction(string id, IEnumerable<T> items, BufferContinuation<T> continuation) => WrapSemaphore(() =>
  29.         {
  30.             var (waitingTask, buffer) = CancelCurrentExecution(id);
  31.  
  32.             foreach (var item in items)
  33.             {
  34.                 buffer.AddBufferItem(item);
  35.             }
  36.  
  37.             waitingTask.ContinueWith(async p => await continuation(buffer.GetBufferItems()).ConfigureAwait(false), TaskContinuationOptions.NotOnCanceled)
  38.                 .Unwrap()
  39.                 .ContinueWith(p => FinishContinuation(p, id, continuation), TaskContinuationOptions.NotOnCanceled);
  40.            
  41.             buffers.AddOrUpdate(buffer.Id, buffer, (_, __) => buffer);
  42.         });
  43.  
  44.         private async void WrapSemaphore(Action action)
  45.         {
  46.             try
  47.             {
  48.                 await semaphore.WaitAsync();
  49.                 action();
  50.             }
  51.             finally
  52.             {
  53.                 semaphore.Release();
  54.             }
  55.         }
  56.  
  57.         private (Task waitingTask, Buffer<T> buffer) CancelCurrentExecution(string id)
  58.         {
  59.             var (waitingTask, cts) = GetWaitingTask();
  60.             var result = buffers.TryGetValue(id, out var resultBuffer);
  61.             if (result)
  62.             {
  63.                 resultBuffer.CancelAndRenew(cts, waitingTask);
  64.             }
  65.             else
  66.             {
  67.                 resultBuffer = new Buffer<T>(id, cts, waitingTask);
  68.             }
  69.  
  70.             return (waitingTask, resultBuffer);
  71.         }
  72.  
  73.         private (Task Action, CancellationTokenSource Cts) GetWaitingTask()
  74.         {
  75.             var cts = new CancellationTokenSource();
  76.             var task = Task.Delay(TimeSpan.FromSeconds(settings.BufferingTimespan.Value), cts.Token);
  77.             return (task, cts);
  78.         }
  79.  
  80.         private void FinishContinuation(Task finishedTask, string id, BufferContinuation<T> continuation)
  81.         {
  82.             if (finishedTask.IsFaulted)
  83.             {
  84.                 logger.LogError(LoggingEvents.BufferContinuationException, $"Continuation action failed for notification {id}! {finishedTask.Exception.ToString()}");
  85.             }
  86.  
  87.             if (buffers.TryRemove(id, out var removedBuffer))
  88.             {
  89.                 removedBuffer.Dispose();
  90.             }
  91.         }
  92.  
  93.         private class Buffer<TItem> : IDisposable
  94.         {
  95.             private readonly ConcurrentBag<TItem> bufferedItems = new ConcurrentBag<TItem>();
  96.  
  97.             private Task task;
  98.             private CancellationTokenSource cts;
  99.  
  100.             public Buffer(string id, CancellationTokenSource cts, Task task)
  101.             {
  102.                 Id = id;
  103.                 this.task = task;
  104.                 this.cts = cts;
  105.             }
  106.  
  107.             public string Id { get; }
  108.  
  109.             public void AddBufferItem(TItem item) => bufferedItems.Add(item);
  110.  
  111.             public IReadOnlyCollection<TItem> GetBufferItems() => new ReadOnlyCollection<TItem>(bufferedItems.ToArray());
  112.  
  113.             public Buffer<TItem> CancelAndRenew(CancellationTokenSource newCts, Task newWaitingTask)
  114.             {
  115.                 cts.Cancel();
  116.                 cts.Dispose();
  117.                 this.cts = newCts;
  118.                 task.Dispose();
  119.                 this.task = newWaitingTask;
  120.                 return this;
  121.             }
  122.  
  123.             public void Dispose()
  124.             {
  125.                 cts.Dispose();
  126.                 task.Dispose();
  127.             }
  128.         }
  129.     }
  130.  
  131.     internal class ActionsBuffer : ActionsBuffer<object>, IActionsBuffer
  132.     {
  133.         /// <inheritdoc />
  134.         public ActionsBuffer(ILogger logger, IOptions<NotificationsBufferSettings> settings) : base(logger, settings)
  135.         {
  136.         }
  137.  
  138.         /// <inheritdoc />
  139.         public void BufferAction(string id, BufferContinuation continuation) => base.BufferAction(id, new object(), _ => continuation());
  140.     }
  141. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement