Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Reactive;
- using System.Reactive.Linq;
- using System.Reactive.Subjects;
- /// <summary>
- /// Provides a timeout mechanism that will not timeout if it is signalled often enough
- /// </summary>
- internal class TrafficTimeout
- {
- private readonly Action onTimeout;
- private object signalLock = new object();
- private IObserver<Unit> signals;
- /// <summary>
- /// Initialises a new instance of the <see cref="TrafficTimeout"/> class.
- /// </summary>
- /// <param name="timeout">The duration to wait after receiving signals before timing out.</param>
- /// <param name="onTimeout">The <see cref="Action"/> to perform when the the timeout duration expires.</param>
- public TrafficTimeout(TimeSpan timeout, Action onTimeout)
- {
- // Subscribe to a throttled observable to trigger the expirey
- var messageQueue = new BehaviorSubject<Unit>(Unit.Default);
- IDisposable subscription = null;
- subscription = messageQueue.Throttle(timeout).Subscribe(
- p =>
- {
- messageQueue.OnCompleted();
- messageQueue.Dispose();
- });
- this.signals = messageQueue.AsObserver();
- this.onTimeout = onTimeout;
- }
- /// <summary>
- /// Signals that traffic has been received.
- /// </summary>
- public void Signal()
- {
- lock (this.signalLock)
- {
- this.signals.OnNext(Unit.Default);
- }
- }
- }
Add Comment
Please, Sign In to add comment