Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public enum DocState { Processing, Completed, Error }
- private readonly BlockingCollection<IDocument> queue = new BlockingCollection<IDocument>();
- private readonly Func<ResultDocument, Task> callback;
- private CancellationTokenSource cancelToken = new CancellationTokenSource();
- MockPulseService _pulseService;
- public DocumentProcessor(Func<ResultDocument, Task> callback)
- {
- this.callback = callback;
- StartQueueProcessor().GetAwaiter();
- _pulseService = new MockPulseService();
- }
- public void Dispose()
- {
- cancelToken.Cancel();
- }
- public void Add(IDocument doc)
- {
- if (cancelToken.IsCancellationRequested)
- {
- throw new InvalidOperationException("Processor is disposed");
- }
- queue.Add(doc);
- }
- private async void ProcessDocument(IDocument doc)
- {
- try
- {
- // do processing
- DoCallback(new InProcessExtractedDocument(doc), DocState.Processing);
- ResultDocument successExtractedDocument = await _pulseService.ProcessDocument(doc);
- DoCallback(successExtractedDocument, DocState.Completed);
- }
- catch (Exception ex)
- {
- DoCallback(new ErrorExtractedDocument(doc, ex), DocState.Error);
- }
- }
- private void DoCallback(ResultDocument result, DocState docState)
- {
- if (callback != null)
- {
- // send callbacks in background
- callback(result).GetAwaiter();
- }
- }
- private Task StartQueueProcessor()
- {
- while (!cancelToken.Token.IsCancellationRequested)
- {
- if (queue.TryTake(out IDocument doc, 1000, cancelToken.Token))
- {
- // can chance to Task.Run(() => ProcessDocument(doc)).GetAwaiter() for parallel execution
- ProcessDocument(doc);
- }
- }
- while (queue.TryTake(out IDocument doc))
- {
- callback(new ErrorExtractedDocument(new ObjectDisposedException("Processor was disposed")));
- }
- }
- }
- internal class InProcessExtractedDocument : ResultDocument
- {
- public InProcessExtractedDocument(IDocument doc)
- {
- Debug.WriteLine("InProcessExtractedDocument created : " + doc.File);
- }
- }
- internal class ErrorExtractedDocument : ResultDocument
- {
- public ErrorExtractedDocument(IDocument doc, Exception exception) : base()
- {
- Debug.WriteLine("ErrorExtractedDocument Document created : " + doc.File);
- }
- public ErrorExtractedDocument(ObjectDisposedException doc)
- {
- Debug.WriteLine("Disposable ErrorExtractedDocument Document created");
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement