Advertisement
Guest User

Untitled

a guest
Jun 21st, 2018
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 2.12 KB | None | 0 0
  1. internal static void IndexIEnumerable<TDocument>(
  2.     this ElasticClient client,
  3.     string indexName,
  4.     IEnumerable<TDocument> source,
  5.     int? batchSize = null,
  6.     int? maxDegreeOfParallelism = null,
  7.     TimeSpan? backOffTime = null,
  8.     int? backOffRetries = null,
  9.     Action<int> onNextBatchExecuted = null,
  10.     CancellationToken cancellationToken = default)
  11.     where TDocument : class
  12. {
  13.     cancellationToken.ThrowIfCancellationRequested();
  14.     if (source == null)
  15.         throw new ArgumentNullException(nameof(source),
  16.             "Требуется указать перечисление, содержащее индексируемые объекты");
  17.  
  18.     var actualBatchSize = batchSize ?? 1000;
  19.  
  20.     var actualMaxDegreeOfParallelism = maxDegreeOfParallelism ?? GetMaxDegreeOfParallelism();
  21.     var actualBackOffTime = backOffTime ?? TimeSpan.FromSeconds(30);
  22.     var actualBackOffRetries = backOffRetries ?? 10;
  23.     var observable = client.BulkAll(
  24.         source,
  25.         bulkAllDescriptor => bulkAllDescriptor
  26.             .Index(indexName)
  27.             .MaxDegreeOfParallelism(actualMaxDegreeOfParallelism)
  28.             .BackOffTime(new Time(actualBackOffTime))
  29.             .BackOffRetries(actualBackOffRetries)
  30.             .Size(actualBatchSize)
  31.             .RefreshOnCompleted(),
  32.         cancellationToken);
  33.  
  34.     var totalBatchesIndexed = 0;
  35.     Action<IBulkAllResponse> bulkResponseAction = response => { };
  36.     if (onNextBatchExecuted != null)
  37.         bulkResponseAction = response =>
  38.         {
  39.             var currentIndexedDocs = Interlocked.Increment(ref totalBatchesIndexed) * actualBatchSize;
  40.             onNextBatchExecuted(currentIndexedDocs);
  41.         };
  42.  
  43.     var waitHandle = new ManualResetEvent(false);
  44.     Exception exception = null;
  45.     var observer = new BulkAllObserver(
  46.         onError: e =>
  47.         {
  48.             exception = e;
  49.             waitHandle.Set();
  50.         },
  51.         onNext: bulkResponseAction,
  52.         onCompleted: () => { waitHandle.Set(); });
  53.     observable.Subscribe(observer);
  54.     waitHandle.WaitOne();
  55.     if (exception != null)
  56.         throw exception;
  57. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement