Guest User

Untitled

a guest
Sep 21st, 2018
72
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.03 KB | None | 0 0
  1. public static class ObservableExtensions
  2. {
  3. /// <summary>
  4. /// Group observable sequence into buffers separated by periods of calm
  5. /// </summary>
  6. /// <param name="source">Observable to buffer</param>
  7. /// <param name="calmDuration">Duration of calm after which to close buffer</param>
  8. /// <param name="maxCount">Max size to buffer before returning</param>
  9. /// <param name="maxDuration">Max duration to buffer before returning</param>
  10. public static IObservable<IList<T>> BufferUntilCalm<T>(this IObservable<T> source, TimeSpan calmDuration, Int32? maxCount=null, TimeSpan? maxDuration = null)
  11. {
  12. if (source == null)
  13. {
  14. throw new ArgumentNullException("source");
  15. }
  16.  
  17. var closes = source.Throttle(calmDuration);
  18. if (maxCount != null)
  19. {
  20. var overflows = source.Where((x,index) => index+1 >= maxCount);
  21. closes = closes.Amb(overflows);
  22. }
  23. if (maxDuration != null)
  24. {
  25. var ages = source.Delay(maxDuration.Value);
  26. closes = closes.Amb(ages);
  27. }
  28. return source.Window(() => closes).SelectMany(window => window.ToList());
  29. }
  30. }
Add Comment
Please, Sign In to add comment