Advertisement
Guest User

C# async doc extraction

a guest
Dec 31st, 2020
126
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 2.79 KB | None | 0 0
  1.     public enum DocState { Processing, Completed, Error }
  2.    
  3.     private readonly BlockingCollection<IDocument> queue = new BlockingCollection<IDocument>();
  4.     private readonly Func<ResultDocument, Task> callback;
  5.     private CancellationTokenSource cancelToken = new CancellationTokenSource();
  6.     MockPulseService _pulseService;
  7.  
  8.     public DocumentProcessor(Func<ResultDocument, Task> callback)
  9.     {
  10.         this.callback = callback;
  11.         StartQueueProcessor().GetAwaiter();
  12.  
  13.         _pulseService = new MockPulseService();
  14.     }
  15.  
  16.     public void Dispose()
  17.     {
  18.         cancelToken.Cancel();
  19.     }
  20.  
  21.     public void Add(IDocument doc)
  22.     {
  23.         if (cancelToken.IsCancellationRequested)
  24.         {
  25.             throw new InvalidOperationException("Processor is disposed");
  26.         }
  27.         queue.Add(doc);
  28.     }
  29.  
  30.     private async void ProcessDocument(IDocument doc)
  31.     {
  32.         try
  33.         {
  34.             // do processing
  35.             DoCallback(new InProcessExtractedDocument(doc), DocState.Processing);
  36.             ResultDocument successExtractedDocument = await _pulseService.ProcessDocument(doc);
  37.             DoCallback(successExtractedDocument, DocState.Completed);
  38.         }
  39.         catch (Exception ex)
  40.         {
  41.             DoCallback(new ErrorExtractedDocument(doc, ex), DocState.Error);
  42.         }
  43.     }
  44.    
  45.     private void DoCallback(ResultDocument result, DocState docState)
  46.     {
  47.         if (callback != null)
  48.         {
  49.             // send callbacks in background
  50.             callback(result).GetAwaiter();
  51.         }
  52.     }
  53.  
  54.     private Task StartQueueProcessor()
  55.     {
  56.         while (!cancelToken.Token.IsCancellationRequested)
  57.         {
  58.             if (queue.TryTake(out IDocument doc, 1000, cancelToken.Token))
  59.             {
  60.                 // can chance to Task.Run(() => ProcessDocument(doc)).GetAwaiter() for parallel execution
  61.                 ProcessDocument(doc);
  62.             }
  63.         }
  64.         while (queue.TryTake(out IDocument doc))
  65.         {
  66.             callback(new ErrorExtractedDocument(new ObjectDisposedException("Processor was disposed")));
  67.         }
  68.     }
  69. }
  70.  
  71.     internal class InProcessExtractedDocument : ResultDocument
  72.     {
  73.         public InProcessExtractedDocument(IDocument doc)
  74.         {
  75.             Debug.WriteLine("InProcessExtractedDocument created : " + doc.File);
  76.         }
  77.     }
  78.  
  79.     internal class ErrorExtractedDocument : ResultDocument
  80.     {
  81.         public ErrorExtractedDocument(IDocument doc, Exception exception) : base()
  82.         {
  83.             Debug.WriteLine("ErrorExtractedDocument Document created : " + doc.File);
  84.  
  85.         }
  86.  
  87.         public ErrorExtractedDocument(ObjectDisposedException doc)
  88.         {
  89.             Debug.WriteLine("Disposable ErrorExtractedDocument Document created");
  90.         }
  91.     }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement