Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Collections.ObjectModel;
- using System.Threading;
- using System.Threading.Tasks;
- using Dac.Caravaggio.Services.Provider.Domain;
- using Microsoft.Extensions.Logging;
- using Microsoft.Extensions.Options;
- namespace Dac.Caravaggio.Services.Provider.Core
- {
- internal class ActionsBuffer<T> : IActionsBuffer<T>
- {
- private readonly ILogger logger;
- private readonly NotificationsBufferSettings settings;
- private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
- private readonly ConcurrentDictionary<string, Buffer<T>> buffers = new ConcurrentDictionary<string, Buffer<T>>();
- public ActionsBuffer(ILogger logger, IOptions<NotificationsBufferSettings> settings)
- {
- this.logger = logger;
- this.settings = settings.Value;
- }
- public void BufferAction(string id, T item, BufferContinuation<T> continuation) => BufferAction(id, new [] {item}, continuation);
- public void BufferAction(string id, IEnumerable<T> items, BufferContinuation<T> continuation) => WrapSemaphore(() =>
- {
- var (waitingTask, buffer) = CancelCurrentExecution(id);
- foreach (var item in items)
- {
- buffer.AddBufferItem(item);
- }
- waitingTask.ContinueWith(async p => await continuation(buffer.GetBufferItems()).ConfigureAwait(false), TaskContinuationOptions.NotOnCanceled)
- .Unwrap()
- .ContinueWith(p => FinishContinuation(p, id, continuation), TaskContinuationOptions.NotOnCanceled);
- buffers.AddOrUpdate(buffer.Id, buffer, (_, __) => buffer);
- });
- private async void WrapSemaphore(Action action)
- {
- try
- {
- await semaphore.WaitAsync();
- action();
- }
- finally
- {
- semaphore.Release();
- }
- }
- private (Task waitingTask, Buffer<T> buffer) CancelCurrentExecution(string id)
- {
- var (waitingTask, cts) = GetWaitingTask();
- var result = buffers.TryGetValue(id, out var resultBuffer);
- if (result)
- {
- resultBuffer.CancelAndRenew(cts, waitingTask);
- }
- else
- {
- resultBuffer = new Buffer<T>(id, cts, waitingTask);
- }
- return (waitingTask, resultBuffer);
- }
- private (Task Action, CancellationTokenSource Cts) GetWaitingTask()
- {
- var cts = new CancellationTokenSource();
- var task = Task.Delay(TimeSpan.FromSeconds(settings.BufferingTimespan.Value), cts.Token);
- return (task, cts);
- }
- private void FinishContinuation(Task finishedTask, string id, BufferContinuation<T> continuation)
- {
- if (finishedTask.IsFaulted)
- {
- logger.LogError(LoggingEvents.BufferContinuationException, $"Continuation action failed for notification {id}! {finishedTask.Exception.ToString()}");
- }
- if (buffers.TryRemove(id, out var removedBuffer))
- {
- removedBuffer.Dispose();
- }
- }
- private class Buffer<TItem> : IDisposable
- {
- private readonly ConcurrentBag<TItem> bufferedItems = new ConcurrentBag<TItem>();
- private Task task;
- private CancellationTokenSource cts;
- public Buffer(string id, CancellationTokenSource cts, Task task)
- {
- Id = id;
- this.task = task;
- this.cts = cts;
- }
- public string Id { get; }
- public void AddBufferItem(TItem item) => bufferedItems.Add(item);
- public IReadOnlyCollection<TItem> GetBufferItems() => new ReadOnlyCollection<TItem>(bufferedItems.ToArray());
- public Buffer<TItem> CancelAndRenew(CancellationTokenSource newCts, Task newWaitingTask)
- {
- cts.Cancel();
- cts.Dispose();
- this.cts = newCts;
- task.Dispose();
- this.task = newWaitingTask;
- return this;
- }
- public void Dispose()
- {
- cts.Dispose();
- task.Dispose();
- }
- }
- }
- internal class ActionsBuffer : ActionsBuffer<object>, IActionsBuffer
- {
- /// <inheritdoc />
- public ActionsBuffer(ILogger logger, IOptions<NotificationsBufferSettings> settings) : base(logger, settings)
- {
- }
- /// <inheritdoc />
- public void BufferAction(string id, BufferContinuation continuation) => base.BufferAction(id, new object(), _ => continuation());
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement