Guest User

Untitled

a guest
Mar 25th, 2018
105
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.88 KB | None | 0 0
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using Raven.Client;
  7.  
  8. /// <summary>
  9. /// This class allows performing high throughput document store operations, using a batching mechanism
  10. /// That mechanism collects up to 128 documents into a single batch and stores them in a single operation, reducing
  11. /// network io compared to an atomic operation. The mechanism is also capable of treating single store operation failure
  12. /// by retrying a batch, executing it item by item
  13. /// </summary>
  14. internal class RavenBatchOptimizer : IDisposable
  15. {
  16. private readonly BlockingCollection<Work> _pending = new BlockingCollection<Work>();
  17. private readonly IDocumentStore _store;
  18. private readonly CancellationToken _ct;
  19. private readonly ConcurrentQueue<ManualResetEventSlim> _mresQueue = new ConcurrentQueue<ManualResetEventSlim>();
  20. private readonly List<Task> _processingTasks;
  21. private readonly CancellationTokenSource _cts;
  22.  
  23. public RavenBatchOptimizer(IDocumentStore store, CancellationToken ct, int parallelism = 2)
  24. {
  25. _store = store;
  26. _cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
  27. _ct = _cts.Token;
  28.  
  29. _processingTasks = new List<Task>(parallelism);
  30. for (var i = 0; i < parallelism; i++)
  31. {
  32. _processingTasks.Add(Task.Factory.StartNew(DoWork, TaskCreationOptions.LongRunning));
  33. }
  34. }
  35.  
  36. /// <summary>
  37. /// Enqueue the entity to be stored and waits until it will be proccessed
  38. /// </summary>
  39. /// <param name="entity"></param>
  40. public void Write(object entity)
  41. {
  42. _cts.Token.ThrowIfCancellationRequested();
  43. if (_mresQueue.TryDequeue(out var mres) == false)
  44. {
  45. mres = new ManualResetEventSlim();
  46. }
  47.  
  48. var work = new Work
  49. {
  50. Entity = entity,
  51. Event = mres
  52. };
  53.  
  54. work.Event.Reset();
  55. _pending.Add(work, CancellationToken.None);
  56.  
  57. // wait for entity to be stored in the next batch
  58. work.Event.Wait(CancellationToken.None);
  59. _mresQueue.Enqueue(mres);
  60.  
  61. if (work.Exception != null)
  62. throw new InvalidOperationException("Failed to write", work.Exception);
  63. }
  64.  
  65. public void Dispose()
  66. {
  67. if (_cts.IsCancellationRequested == false)
  68. {
  69. var proccesses = _processingTasks.ToArray();
  70. _processingTasks.Clear();
  71. _cts.Cancel();
  72. _pending.CompleteAdding();
  73. try
  74. {
  75. Task.WaitAll(proccesses);
  76. }
  77. catch (OperationCanceledException)
  78. {
  79. // do nothing
  80. }
  81. }
  82. }
  83.  
  84. /// <summary>
  85. /// Long running method that performs the batch grouping and execution operations
  86. /// </summary>
  87. /// <returns></returns>
  88. async Task DoWork()
  89. {
  90. var list = new List<Work>();
  91.  
  92. while (_cts.IsCancellationRequested == false || _pending.IsCompleted == false)
  93. {
  94. list.Clear();
  95. var work = _pending.Take(_ct);
  96. list.Add(work);
  97.  
  98. while (list.Count < 128 && _pending.TryTake(out var item)) list.Add(item);
  99.  
  100. using (var session = _store.OpenAsyncSession())
  101. {
  102. var failed = false;
  103.  
  104. try
  105. {
  106. foreach (var p in list) session.Advanced.Store(p.Entity);
  107.  
  108. await session.SaveChangesAsync(CancellationToken.None).ConfigureAwait(false);
  109. }
  110. catch (Exception)
  111. {
  112. failed = true;
  113. }
  114.  
  115. // if we've failed to store the batch (which was entirely rolled back), we try executing each of the
  116. // items seperately, in case we've failed because of a single item
  117. if (failed) await HandleSaveFailure(list).ConfigureAwait(false);
  118. }
  119.  
  120. foreach (var p in list) p.Event.Set();
  121. }
  122. }
  123.  
  124. /// <summary>
  125. /// Store entities list one by one, setting the errorous message state to the particular message
  126. /// </summary>
  127. /// <param name="list"></param>
  128. /// <returns></returns>
  129. async Task HandleSaveFailure(IEnumerable<Work> list)
  130. {
  131. foreach (var p in list)
  132. using (var recoverySession = _store.OpenAsyncSession())
  133. {
  134. await recoverySession.StoreAsync(p.Entity, CancellationToken.None).ConfigureAwait(false);
  135.  
  136. try
  137. {
  138. await recoverySession.SaveChangesAsync(CancellationToken.None).ConfigureAwait(false);
  139. }
  140. catch (Exception e)
  141. {
  142. p.Exception = e;
  143. }
  144. }
  145. }
  146.  
  147. class Work
  148. {
  149. public object Entity;
  150. public ManualResetEventSlim Event;
  151. public Exception Exception;
  152. }
  153. }
Add Comment
Please, Sign In to add comment