Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public static class RxExtensions
- {
- public static IObservable<IEnumerable<T>> TrailWithCount<T>(this IObservable<T> observable, int count)
- {
- Queue<T> queue = new Queue<T>();
- return observable.Select(
- o =>
- {
- queue.Enqueue(o);
- while (queue.Count > count) queue.Dequeue();
- return queue.ToArray();
- });
- }
- public static IObservable<IEnumerable<T>> TrailWithTime<T>(this IObservable<T> observable, TimeSpan timeSpan)
- {
- Queue<Timestamped<T>> queue = new Queue<Timestamped<T>>();
- return observable.Select(
- o =>
- {
- DateTime now = DateTime.Now;
- queue.Enqueue(new Timestamped<T>(o, now));
- while (now - queue.Peek().Timestamp > timeSpan)
- queue.Dequeue();
- return queue.Select(v => v.Value).ToArray();
- });
- }
- public static IObservable<IEnumerable<T>> TrailWithTimeOrCount<T>(this IObservable<T> observable, TimeSpan timeSpan, int count)
- {
- Queue<Timestamped<T>> queue = new Queue<Timestamped<T>>();
- return observable.Select(
- o =>
- {
- DateTime now = DateTime.Now;
- queue.Enqueue(new Timestamped<T>(o, now));
- while (now - queue.Peek().Timestamp > timeSpan || queue.Count > count)
- queue.Dequeue();
- return queue.Select(v => v.Value).ToArray();
- });
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement