Guest User

Untitled

a guest
Nov 17th, 2017
76
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.41 KB | None | 0 0
  1. using System;
  2. using System.Reactive;
  3. using System.Reactive.Linq;
  4. using System.Reactive.Subjects;
  5.  
  6. /// <summary>
  7. /// Provides a timeout mechanism that will not timeout if it is signalled often enough
  8. /// </summary>
  9. internal class TrafficTimeout
  10. {
  11. private readonly Action onTimeout;
  12. private object signalLock = new object();
  13. private IObserver<Unit> signals;
  14.  
  15. /// <summary>
  16. /// Initialises a new instance of the <see cref="TrafficTimeout"/> class.
  17. /// </summary>
  18. /// <param name="timeout">The duration to wait after receiving signals before timing out.</param>
  19. /// <param name="onTimeout">The <see cref="Action"/> to perform when the the timeout duration expires.</param>
  20. public TrafficTimeout(TimeSpan timeout, Action onTimeout)
  21. {
  22. // Subscribe to a throttled observable to trigger the expirey
  23. var messageQueue = new BehaviorSubject<Unit>(Unit.Default);
  24. IDisposable subscription = null;
  25. subscription = messageQueue.Throttle(timeout).Subscribe(
  26. p =>
  27. {
  28. messageQueue.OnCompleted();
  29. messageQueue.Dispose();
  30. });
  31.  
  32. this.signals = messageQueue.AsObserver();
  33. this.onTimeout = onTimeout;
  34. }
  35.  
  36. /// <summary>
  37. /// Signals that traffic has been received.
  38. /// </summary>
  39. public void Signal()
  40. {
  41. lock (this.signalLock)
  42. {
  43. this.signals.OnNext(Unit.Default);
  44. }
  45. }
  46. }
Add Comment
Please, Sign In to add comment