Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- internal static void IndexIEnumerable<TDocument>(
- this ElasticClient client,
- string indexName,
- IEnumerable<TDocument> source,
- int? batchSize = null,
- int? maxDegreeOfParallelism = null,
- TimeSpan? backOffTime = null,
- int? backOffRetries = null,
- Action<int> onNextBatchExecuted = null,
- CancellationToken cancellationToken = default)
- where TDocument : class
- {
- cancellationToken.ThrowIfCancellationRequested();
- if (source == null)
- throw new ArgumentNullException(nameof(source),
- "Требуется указать перечисление, содержащее индексируемые объекты");
- var actualBatchSize = batchSize ?? 1000;
- var actualMaxDegreeOfParallelism = maxDegreeOfParallelism ?? GetMaxDegreeOfParallelism();
- var actualBackOffTime = backOffTime ?? TimeSpan.FromSeconds(30);
- var actualBackOffRetries = backOffRetries ?? 10;
- var observable = client.BulkAll(
- source,
- bulkAllDescriptor => bulkAllDescriptor
- .Index(indexName)
- .MaxDegreeOfParallelism(actualMaxDegreeOfParallelism)
- .BackOffTime(new Time(actualBackOffTime))
- .BackOffRetries(actualBackOffRetries)
- .Size(actualBatchSize)
- .RefreshOnCompleted(),
- cancellationToken);
- var totalBatchesIndexed = 0;
- Action<IBulkAllResponse> bulkResponseAction = response => { };
- if (onNextBatchExecuted != null)
- bulkResponseAction = response =>
- {
- var currentIndexedDocs = Interlocked.Increment(ref totalBatchesIndexed) * actualBatchSize;
- onNextBatchExecuted(currentIndexedDocs);
- };
- var waitHandle = new ManualResetEvent(false);
- Exception exception = null;
- var observer = new BulkAllObserver(
- onError: e =>
- {
- exception = e;
- waitHandle.Set();
- },
- onNext: bulkResponseAction,
- onCompleted: () => { waitHandle.Set(); });
- observable.Subscribe(observer);
- waitHandle.WaitOne();
- if (exception != null)
- throw exception;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement