Advertisement
Guest User

EventStoreAtomFeedPoller

a guest
Jul 13th, 2015
204
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 4.83 KB | None | 0 0
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net;
  5. using System.ServiceModel.Syndication;
  6. using System.Threading;
  7.  
  8. namespace CommonDomain.Messaging.Atom
  9. {
  10.     public class EventStoreAtomFeedPoller : IBoundedContextSync
  11.     {
  12.         private readonly IBus bus;
  13.  
  14.         public EventStoreAtomFeedPoller(IBus bus)
  15.         {
  16.             this.bus = bus;
  17.         }
  18.  
  19.         private readonly Uri streamHeadUri;
  20.         private readonly NetworkCredential credentials;
  21.         private readonly IEventStoreAtomPollerStore positionDataStore;
  22.         private readonly int pollInterval;
  23.  
  24.         public EventStoreAtomFeedPoller(Uri streamHeadUri, NetworkCredential credentials, IEventStoreAtomPollerStore positionDataStore, int pollInterval)
  25.         {
  26.             this.streamHeadUri = streamHeadUri;
  27.             this.credentials = credentials;
  28.             this.positionDataStore = positionDataStore;
  29.             this.pollInterval = pollInterval;
  30.         }
  31.  
  32.         private Thread thread;
  33.         private readonly object Lock = new object();
  34.  
  35.         public void Start()
  36.         {
  37.             lock(this.Lock)
  38.             {
  39.                 if (this.thread != null)
  40.                     return;
  41.  
  42.                 this.thread = new Thread(() =>
  43.                 {
  44.                     Thread.CurrentThread.IsBackground = true;
  45.                     while (true)
  46.                     {
  47.                         PlayEventsForward();
  48.                         Thread.Sleep(pollInterval);
  49.                     }
  50.                 });
  51.  
  52.                 this.thread.Start();
  53.             }
  54.         }
  55.  
  56.         public void Stop()
  57.         {
  58.             lock (this.Lock)
  59.             {
  60.                 if (this.thread == null)
  61.                     return;
  62.  
  63.                 this.thread.Start();
  64.                 this.thread = null;
  65.             }
  66.         }
  67.  
  68.         PageRequest NewPageRequest()
  69.         {
  70.             return new PageRequest(this.credentials);
  71.         }
  72.  
  73.         void ProcessItems(IEnumerable<SyndicationItem> items)
  74.         {
  75.             Console.ForegroundColor = ConsoleColor.DarkRed;
  76.  
  77.             foreach (var item in items)
  78.             {
  79.                 Console.WriteLine("Dispatching event " + item.Links.Alternate());
  80.  
  81.                 var request = NewPageRequest();
  82.                 var alternate = item.Links.Alternate();
  83.  
  84.                 var json = request.GetItem(alternate);
  85.  
  86.                 //var envelope = new EventEnvelope()
  87.                 //bus.PublishEvent(); // process event
  88.  
  89.                 positionDataStore.WriteLastReadEventUri(item.Links.Alternate()); // store last processed event in poller data store
  90.             }
  91.         }
  92.  
  93.         List<SyndicationItem> SearchFor(List<SyndicationItem> items, Uri eventUri)
  94.         {
  95.             Console.ResetColor();
  96.  
  97.             foreach (var item in items)
  98.             {
  99.                 // Console.WriteLine("Searching " + item.Links.Alternate());
  100.  
  101.                 if (item.Links.Alternate() == eventUri)
  102.                 {
  103.                     var index = items.IndexOf(item);
  104.                     return items.Take(index).Reverse().ToList();  
  105.                 }
  106.             }
  107.             return null;
  108.         }
  109.  
  110.         private void PlayEventsForward()
  111.         {
  112.             var pageRequest = NewPageRequest();
  113.             var lastEventUri = positionDataStore.GetLastReadEventUri(); // get last event from data store
  114.             var processing = lastEventUri == null; // go straight to processing if no previous event uri
  115.  
  116.             // if we have a last event (was not null), then get the first page, otherwise skip straight to the last
  117.             var page = lastEventUri != null ? pageRequest.GetFeed(streamHeadUri).First : pageRequest.GetFeed(streamHeadUri).Last;
  118.  
  119.             while (page != null)
  120.             {
  121.                 var reply = pageRequest.GetFeed(page); // read a page
  122.                 var items = processing ? reply.Items.Reverse().ToList() : reply.Items.ToList();
  123.  
  124.                 if(!processing) // we are searching...
  125.                 {
  126.                     var remaining = SearchFor(items, lastEventUri);
  127.  
  128.                     if (remaining != null)
  129.                     {
  130.                         processing = true; // set processing to true
  131.                         items = remaining; // set items to reverse of the remaining list :D
  132.                     }
  133.                 }
  134.  
  135.                 if (processing) // we are processing...
  136.                 {
  137.                     ProcessItems(items);
  138.                 }
  139.  
  140.                 page = processing ? reply.Previous : reply.Next;
  141.             }
  142.  
  143.             // if we reached this point but did not switch to processing mode, then throw an error
  144.             if (!processing)
  145.                 throw new ArgumentException("Last known event was not found.");
  146.         }
  147.     }
  148. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement