Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Net;
- using System.ServiceModel.Syndication;
- using System.Threading;
- namespace CommonDomain.Messaging.Atom
- {
- public class EventStoreAtomFeedPoller : IBoundedContextSync
- {
- private readonly IBus bus;
- public EventStoreAtomFeedPoller(IBus bus)
- {
- this.bus = bus;
- }
- private readonly Uri streamHeadUri;
- private readonly NetworkCredential credentials;
- private readonly IEventStoreAtomPollerStore positionDataStore;
- private readonly int pollInterval;
- public EventStoreAtomFeedPoller(Uri streamHeadUri, NetworkCredential credentials, IEventStoreAtomPollerStore positionDataStore, int pollInterval)
- {
- this.streamHeadUri = streamHeadUri;
- this.credentials = credentials;
- this.positionDataStore = positionDataStore;
- this.pollInterval = pollInterval;
- }
- private Thread thread;
- private readonly object Lock = new object();
- public void Start()
- {
- lock(this.Lock)
- {
- if (this.thread != null)
- return;
- this.thread = new Thread(() =>
- {
- Thread.CurrentThread.IsBackground = true;
- while (true)
- {
- PlayEventsForward();
- Thread.Sleep(pollInterval);
- }
- });
- this.thread.Start();
- }
- }
- public void Stop()
- {
- lock (this.Lock)
- {
- if (this.thread == null)
- return;
- this.thread.Start();
- this.thread = null;
- }
- }
- PageRequest NewPageRequest()
- {
- return new PageRequest(this.credentials);
- }
- void ProcessItems(IEnumerable<SyndicationItem> items)
- {
- Console.ForegroundColor = ConsoleColor.DarkRed;
- foreach (var item in items)
- {
- Console.WriteLine("Dispatching event " + item.Links.Alternate());
- var request = NewPageRequest();
- var alternate = item.Links.Alternate();
- var json = request.GetItem(alternate);
- //var envelope = new EventEnvelope()
- //bus.PublishEvent(); // process event
- positionDataStore.WriteLastReadEventUri(item.Links.Alternate()); // store last processed event in poller data store
- }
- }
- List<SyndicationItem> SearchFor(List<SyndicationItem> items, Uri eventUri)
- {
- Console.ResetColor();
- foreach (var item in items)
- {
- // Console.WriteLine("Searching " + item.Links.Alternate());
- if (item.Links.Alternate() == eventUri)
- {
- var index = items.IndexOf(item);
- return items.Take(index).Reverse().ToList();
- }
- }
- return null;
- }
- private void PlayEventsForward()
- {
- var pageRequest = NewPageRequest();
- var lastEventUri = positionDataStore.GetLastReadEventUri(); // get last event from data store
- var processing = lastEventUri == null; // go straight to processing if no previous event uri
- // if we have a last event (was not null), then get the first page, otherwise skip straight to the last
- var page = lastEventUri != null ? pageRequest.GetFeed(streamHeadUri).First : pageRequest.GetFeed(streamHeadUri).Last;
- while (page != null)
- {
- var reply = pageRequest.GetFeed(page); // read a page
- var items = processing ? reply.Items.Reverse().ToList() : reply.Items.ToList();
- if(!processing) // we are searching...
- {
- var remaining = SearchFor(items, lastEventUri);
- if (remaining != null)
- {
- processing = true; // set processing to true
- items = remaining; // set items to reverse of the remaining list :D
- }
- }
- if (processing) // we are processing...
- {
- ProcessItems(items);
- }
- page = processing ? reply.Previous : reply.Next;
- }
- // if we reached this point but did not switch to processing mode, then throw an error
- if (!processing)
- throw new ArgumentException("Last known event was not found.");
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement