Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- //When sending out a message
- timeoutManager.Add("Message001")
- //When receiving a reply
- bool replyWasExpected = timeoutManager.MarkOldestItemAsFinished(x=>x=="Message001");
- //When no reply is received
- timeoutManager.OnTimout += (evt, args) =>{Console.WriteLine("HELP!");};
- using System;
- using System.Collections.Concurrent;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace McatXml
- {
- public class TimeoutManager<T_Item>
- {
- private readonly TimeSpan timeout = TimeSpan.FromSeconds(30);
- private BlockingCollection<ItemWithTimeout> itemsWaitingForTimeout = new BlockingCollection<ItemWithTimeout>();
- public class TimeoutEventArgs : EventArgs
- {
- public T_Item Item { get; private set; }
- public TimeoutEventArgs(T_Item item) { this.Item = item; }
- }
- /// <summary> called whenever an item is not finished before the timeout </summary>
- public event EventHandler<TimeoutEventArgs> OnTimeout;
- /// <summary> private wrapper class to decorate an item with a timeout </summary>
- private class ItemWithTimeout
- {
- internal readonly T_Item Item;
- internal readonly Stopwatch Watch;
- internal volatile bool FinishedWaiting;
- internal ItemWithTimeout(T_Item item)
- {
- this.Item = item;
- this.Watch = Stopwatch.StartNew();
- this.FinishedWaiting = false;
- }
- }
- public TimeoutManager(TimeSpan timeout)
- {
- this.timeout = timeout;
- OnTimeout += (sender, args) => { };
- Task loop = new Task(this.timeoutLoop, TaskCreationOptions.LongRunning);
- loop.Start();
- }
- public void Add(T_Item item)
- {
- var itemExt = new ItemWithTimeout(item);
- itemsWaitingForTimeout.Add(itemExt);
- itemsWaitingForTimeout.Add(itemExt); // this is an ugly hack!
- }
- /// <summary> mark all items as finished, that fit the given condition </summary>
- public bool MarkAllAsFinished(Func<T_Item, bool> isMatch = null)
- {
- return markAsFinished(stopAfterFirstHit: false, reverseOrder: false, isMatch: isMatch);
- }
- /// <summary> mark the most recent item as finished, that fits the given condition </summary>
- public bool MarkNewestAsFinished(Func<T_Item, bool> isMatch = null)
- {
- return markAsFinished(stopAfterFirstHit: true, reverseOrder: true, isMatch: isMatch);
- }
- /// <summary> mark the oldest item as finished, that fits the given condition </summary>
- public bool MarkOldestAsFinished(Func<T_Item, bool> isMatch = null)
- {
- return markAsFinished(stopAfterFirstHit: true, reverseOrder: false, isMatch: isMatch);
- }
- /// <summary> mark items as finished, that fit the given condition </summary>
- private bool markAsFinished(bool stopAfterFirstHit, bool reverseOrder, Func<T_Item, bool> isMatch = null)
- {
- var items = this.itemsWaitingForTimeout.ToArray();
- bool success = false;
- int startIdx = reverseOrder ? items.Length - 1 : 0;
- int inc = reverseOrder ? -1 : 1;
- for (int i = startIdx; i < items.Length && i >= 0; i += inc)
- {
- var item = items[i];
- if (item.FinishedWaiting) continue;
- if (isMatch == null || isMatch(item.Item))
- {
- lock (item)
- {
- if (item.FinishedWaiting) continue;
- item.FinishedWaiting = true;
- }
- success = true;
- if (stopAfterFirstHit) break;
- }
- }
- return success;
- }
- /// <summary> for all items that are not finished, check whether their time is up </summary>
- private void timeoutLoop()
- {
- foreach (var item in itemsWaitingForTimeout.GetConsumingEnumerable())
- {
- if (item.FinishedWaiting) continue; // item has already been finished
- while (!item.FinishedWaiting && item.Watch.Elapsed < this.timeout)
- {
- // wait until the timeout has passed or the item is finished
- Thread.Sleep(
- TimeSpan.FromMilliseconds(Math.Max(1,
- this.timeout.TotalMilliseconds -
- item.Watch.ElapsedMilliseconds)));
- }
- if (item.FinishedWaiting) continue; // item has been finished while we were waiting
- lock (item)
- {
- if (item.FinishedWaiting) continue; // item has been finished while we ackquired the lock
- item.FinishedWaiting = true;
- }
- // item has not been finished in time!
- OnTimeout(this, new TimeoutEventArgs(item.Item));
- }
- }
- }
- static class Program
- {
- static void Main()
- {
- TimeoutManager<int> test = new TimeoutManager<int>(TimeSpan.FromSeconds(5));
- test.OnTimeout += (sender, args) => { Console.WriteLine("Timeout: {0}", args.Item); };
- Parallel.ForEach(Enumerable.Range(0, 100).OrderBy(i => Guid.NewGuid()), (i) =>
- {
- test.Add(i);
- });
- Parallel.ForEach(Enumerable.Range(0 , 90).OrderBy(i => Guid.NewGuid()), (i) =>
- {
- if (!test.MarkAllAsFinished(x => x == i))
- Console.WriteLine("could not mark as finished: {0}", i);
- });
- Console.ReadLine();
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement