Advertisement
Guest User

Untitled

a guest
May 24th, 2015
240
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.25 KB | None | 0 0
  1. public static class RxExtensions
  2. {
  3. public static IObservable<IEnumerable<T>> TrailWithCount<T>(this IObservable<T> observable, int count)
  4. {
  5. Queue<T> queue = new Queue<T>();
  6.  
  7. return observable.Select(
  8. o =>
  9. {
  10. queue.Enqueue(o);
  11.  
  12. while (queue.Count > count) queue.Dequeue();
  13. return queue.ToArray();
  14. });
  15. }
  16.  
  17. public static IObservable<IEnumerable<T>> TrailWithTime<T>(this IObservable<T> observable, TimeSpan timeSpan)
  18. {
  19. Queue<Timestamped<T>> queue = new Queue<Timestamped<T>>();
  20.  
  21. return observable.Select(
  22. o =>
  23. {
  24. DateTime now = DateTime.Now;
  25.  
  26. queue.Enqueue(new Timestamped<T>(o, now));
  27.  
  28. while (now - queue.Peek().Timestamp > timeSpan)
  29. queue.Dequeue();
  30.  
  31. return queue.Select(v => v.Value).ToArray();
  32. });
  33. }
  34.  
  35. public static IObservable<IEnumerable<T>> TrailWithTimeOrCount<T>(this IObservable<T> observable, TimeSpan timeSpan, int count)
  36. {
  37. Queue<Timestamped<T>> queue = new Queue<Timestamped<T>>();
  38.  
  39. return observable.Select(
  40. o =>
  41. {
  42. DateTime now = DateTime.Now;
  43.  
  44. queue.Enqueue(new Timestamped<T>(o, now));
  45.  
  46. while (now - queue.Peek().Timestamp > timeSpan || queue.Count > count)
  47. queue.Dequeue();
  48.  
  49. return queue.Select(v => v.Value).ToArray();
  50. });
  51. }
  52. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement