Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Threading;
- using System.Threading.Tasks;
- using Raven.Client;
- /// <summary>
- /// This class allows performing high throughput document store operations, using a batching mechanism
- /// That mechanism collects up to 128 documents into a single batch and stores them in a single operation, reducing
- /// network io compared to an atomic operation. The mechanism is also capable of treating single store operation failure
- /// by retrying a batch, executing it item by item
- /// </summary>
- internal class RavenBatchOptimizer : IDisposable
- {
- private readonly BlockingCollection<Work> _pending = new BlockingCollection<Work>();
- private readonly IDocumentStore _store;
- private readonly CancellationToken _ct;
- private readonly ConcurrentQueue<ManualResetEventSlim> _mresQueue = new ConcurrentQueue<ManualResetEventSlim>();
- private readonly List<Task> _processingTasks;
- private readonly CancellationTokenSource _cts;
- public RavenBatchOptimizer(IDocumentStore store, CancellationToken ct, int parallelism = 2)
- {
- _store = store;
- _cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
- _ct = _cts.Token;
- _processingTasks = new List<Task>(parallelism);
- for (var i = 0; i < parallelism; i++)
- {
- _processingTasks.Add(Task.Factory.StartNew(DoWork, TaskCreationOptions.LongRunning));
- }
- }
- /// <summary>
- /// Enqueue the entity to be stored and waits until it will be proccessed
- /// </summary>
- /// <param name="entity"></param>
- public void Write(object entity)
- {
- _cts.Token.ThrowIfCancellationRequested();
- if (_mresQueue.TryDequeue(out var mres) == false)
- {
- mres = new ManualResetEventSlim();
- }
- var work = new Work
- {
- Entity = entity,
- Event = mres
- };
- work.Event.Reset();
- _pending.Add(work, CancellationToken.None);
- // wait for entity to be stored in the next batch
- work.Event.Wait(CancellationToken.None);
- _mresQueue.Enqueue(mres);
- if (work.Exception != null)
- throw new InvalidOperationException("Failed to write", work.Exception);
- }
- public void Dispose()
- {
- if (_cts.IsCancellationRequested == false)
- {
- var proccesses = _processingTasks.ToArray();
- _processingTasks.Clear();
- _cts.Cancel();
- _pending.CompleteAdding();
- try
- {
- Task.WaitAll(proccesses);
- }
- catch (OperationCanceledException)
- {
- // do nothing
- }
- }
- }
- /// <summary>
- /// Long running method that performs the batch grouping and execution operations
- /// </summary>
- /// <returns></returns>
- async Task DoWork()
- {
- var list = new List<Work>();
- while (_cts.IsCancellationRequested == false || _pending.IsCompleted == false)
- {
- list.Clear();
- var work = _pending.Take(_ct);
- list.Add(work);
- while (list.Count < 128 && _pending.TryTake(out var item)) list.Add(item);
- using (var session = _store.OpenAsyncSession())
- {
- var failed = false;
- try
- {
- foreach (var p in list) session.Advanced.Store(p.Entity);
- await session.SaveChangesAsync(CancellationToken.None).ConfigureAwait(false);
- }
- catch (Exception)
- {
- failed = true;
- }
- // if we've failed to store the batch (which was entirely rolled back), we try executing each of the
- // items seperately, in case we've failed because of a single item
- if (failed) await HandleSaveFailure(list).ConfigureAwait(false);
- }
- foreach (var p in list) p.Event.Set();
- }
- }
- /// <summary>
- /// Store entities list one by one, setting the errorous message state to the particular message
- /// </summary>
- /// <param name="list"></param>
- /// <returns></returns>
- async Task HandleSaveFailure(IEnumerable<Work> list)
- {
- foreach (var p in list)
- using (var recoverySession = _store.OpenAsyncSession())
- {
- await recoverySession.StoreAsync(p.Entity, CancellationToken.None).ConfigureAwait(false);
- try
- {
- await recoverySession.SaveChangesAsync(CancellationToken.None).ConfigureAwait(false);
- }
- catch (Exception e)
- {
- p.Exception = e;
- }
- }
- }
- class Work
- {
- public object Entity;
- public ManualResetEventSlim Event;
- public Exception Exception;
- }
- }
Add Comment
Please, Sign In to add comment