Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public static class ObservableExtensions
- {
- /// <summary>
- /// Group observable sequence into buffers separated by periods of calm
- /// </summary>
- /// <param name="source">Observable to buffer</param>
- /// <param name="calmDuration">Duration of calm after which to close buffer</param>
- /// <param name="maxCount">Max size to buffer before returning</param>
- /// <param name="maxDuration">Max duration to buffer before returning</param>
- public static IObservable<IList<T>> BufferUntilCalm<T>(this IObservable<T> source, TimeSpan calmDuration, Int32? maxCount=null, TimeSpan? maxDuration = null)
- {
- if (source == null)
- {
- throw new ArgumentNullException("source");
- }
- var closes = source.Throttle(calmDuration);
- if (maxCount != null)
- {
- var overflows = source.Where((x,index) => index+1 >= maxCount);
- closes = closes.Amb(overflows);
- }
- if (maxDuration != null)
- {
- var ages = source.Delay(maxDuration.Value);
- closes = closes.Amb(ages);
- }
- return source.Window(() => closes).SelectMany(window => window.ToList());
- }
- }
Add Comment
Please, Sign In to add comment