using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Raven.Client;
///
/// This class allows performing high throughput document store operations, using a batching mechanism
/// That mechanism collects up to 128 documents into a single batch and stores them in a single operation, reducing
/// network io compared to an atomic operation. The mechanism is also capable of treating single store operation failure
/// by retrying a batch, executing it item by item
///
internal class RavenBatchOptimizer : IDisposable
{
private readonly BlockingCollection _pending = new BlockingCollection();
private readonly IDocumentStore _store;
private readonly CancellationToken _ct;
private readonly ConcurrentQueue _mresQueue = new ConcurrentQueue();
private readonly List _processingTasks;
private readonly CancellationTokenSource _cts;
public RavenBatchOptimizer(IDocumentStore store, CancellationToken ct, int parallelism = 2)
{
_store = store;
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
_ct = _cts.Token;
_processingTasks = new List(parallelism);
for (var i = 0; i < parallelism; i++)
{
_processingTasks.Add(Task.Factory.StartNew(DoWork, TaskCreationOptions.LongRunning));
}
}
///
/// Enqueue the entity to be stored and waits until it will be proccessed
///
///
public void Write(object entity)
{
_cts.Token.ThrowIfCancellationRequested();
if (_mresQueue.TryDequeue(out var mres) == false)
{
mres = new ManualResetEventSlim();
}
var work = new Work
{
Entity = entity,
Event = mres
};
work.Event.Reset();
_pending.Add(work, CancellationToken.None);
// wait for entity to be stored in the next batch
work.Event.Wait(CancellationToken.None);
_mresQueue.Enqueue(mres);
if (work.Exception != null)
throw new InvalidOperationException("Failed to write", work.Exception);
}
public void Dispose()
{
if (_cts.IsCancellationRequested == false)
{
var proccesses = _processingTasks.ToArray();
_processingTasks.Clear();
_cts.Cancel();
_pending.CompleteAdding();
try
{
Task.WaitAll(proccesses);
}
catch (OperationCanceledException)
{
// do nothing
}
}
}
///
/// Long running method that performs the batch grouping and execution operations
///
///
async Task DoWork()
{
var list = new List();
while (_cts.IsCancellationRequested == false || _pending.IsCompleted == false)
{
list.Clear();
var work = _pending.Take(_ct);
list.Add(work);
while (list.Count < 128 && _pending.TryTake(out var item)) list.Add(item);
using (var session = _store.OpenAsyncSession())
{
var failed = false;
try
{
foreach (var p in list) session.Advanced.Store(p.Entity);
await session.SaveChangesAsync(CancellationToken.None).ConfigureAwait(false);
}
catch (Exception)
{
failed = true;
}
// if we've failed to store the batch (which was entirely rolled back), we try executing each of the
// items seperately, in case we've failed because of a single item
if (failed) await HandleSaveFailure(list).ConfigureAwait(false);
}
foreach (var p in list) p.Event.Set();
}
}
///
/// Store entities list one by one, setting the errorous message state to the particular message
///
///
///
async Task HandleSaveFailure(IEnumerable list)
{
foreach (var p in list)
using (var recoverySession = _store.OpenAsyncSession())
{
await recoverySession.StoreAsync(p.Entity, CancellationToken.None).ConfigureAwait(false);
try
{
await recoverySession.SaveChangesAsync(CancellationToken.None).ConfigureAwait(false);
}
catch (Exception e)
{
p.Exception = e;
}
}
}
class Work
{
public object Entity;
public ManualResetEventSlim Event;
public Exception Exception;
}
}