using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Raven.Client; /// /// This class allows performing high throughput document store operations, using a batching mechanism /// That mechanism collects up to 128 documents into a single batch and stores them in a single operation, reducing /// network io compared to an atomic operation. The mechanism is also capable of treating single store operation failure /// by retrying a batch, executing it item by item /// internal class RavenBatchOptimizer : IDisposable { private readonly BlockingCollection _pending = new BlockingCollection(); private readonly IDocumentStore _store; private readonly CancellationToken _ct; private readonly ConcurrentQueue _mresQueue = new ConcurrentQueue(); private readonly List _processingTasks; private readonly CancellationTokenSource _cts; public RavenBatchOptimizer(IDocumentStore store, CancellationToken ct, int parallelism = 2) { _store = store; _cts = CancellationTokenSource.CreateLinkedTokenSource(ct); _ct = _cts.Token; _processingTasks = new List(parallelism); for (var i = 0; i < parallelism; i++) { _processingTasks.Add(Task.Factory.StartNew(DoWork, TaskCreationOptions.LongRunning)); } } /// /// Enqueue the entity to be stored and waits until it will be proccessed /// /// public void Write(object entity) { _cts.Token.ThrowIfCancellationRequested(); if (_mresQueue.TryDequeue(out var mres) == false) { mres = new ManualResetEventSlim(); } var work = new Work { Entity = entity, Event = mres }; work.Event.Reset(); _pending.Add(work, CancellationToken.None); // wait for entity to be stored in the next batch work.Event.Wait(CancellationToken.None); _mresQueue.Enqueue(mres); if (work.Exception != null) throw new InvalidOperationException("Failed to write", work.Exception); } public void Dispose() { if (_cts.IsCancellationRequested == false) { var proccesses = _processingTasks.ToArray(); _processingTasks.Clear(); _cts.Cancel(); _pending.CompleteAdding(); try { Task.WaitAll(proccesses); } catch (OperationCanceledException) { // do nothing } } } /// /// Long running method that performs the batch grouping and execution operations /// /// async Task DoWork() { var list = new List(); while (_cts.IsCancellationRequested == false || _pending.IsCompleted == false) { list.Clear(); var work = _pending.Take(_ct); list.Add(work); while (list.Count < 128 && _pending.TryTake(out var item)) list.Add(item); using (var session = _store.OpenAsyncSession()) { var failed = false; try { foreach (var p in list) session.Advanced.Store(p.Entity); await session.SaveChangesAsync(CancellationToken.None).ConfigureAwait(false); } catch (Exception) { failed = true; } // if we've failed to store the batch (which was entirely rolled back), we try executing each of the // items seperately, in case we've failed because of a single item if (failed) await HandleSaveFailure(list).ConfigureAwait(false); } foreach (var p in list) p.Event.Set(); } } /// /// Store entities list one by one, setting the errorous message state to the particular message /// /// /// async Task HandleSaveFailure(IEnumerable list) { foreach (var p in list) using (var recoverySession = _store.OpenAsyncSession()) { await recoverySession.StoreAsync(p.Entity, CancellationToken.None).ConfigureAwait(false); try { await recoverySession.SaveChangesAsync(CancellationToken.None).ConfigureAwait(false); } catch (Exception e) { p.Exception = e; } } } class Work { public object Entity; public ManualResetEventSlim Event; public Exception Exception; } }