Advertisement
Guest User

Debounced Buffer in Swift

a guest
Oct 8th, 2016
245
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Swift 5.34 KB | None | 0 0
  1. extension ObservableType {
  2.     func buffer<R>(_ selector: Observable<R>) -> Observable<[E]> {
  3.         var valueBuffer: [E] = []
  4.  
  5.         return Observable.create { observer in
  6.             let selectorSubscription = selector.subscribe(onNext: { (value) in
  7.                 let emitValues = valueBuffer
  8.                 valueBuffer = []
  9.                 print("Selector emitting values")
  10.                 observer.on(.next(emitValues))
  11.             }, onError: { (error) in
  12.                 print("selector error: \(error)")
  13.                 valueBuffer = []
  14.                 observer.on(.error(error))
  15.             }, onCompleted: {
  16.                 print("selector completed")
  17.                 valueBuffer = []
  18.                 observer.on(.completed)
  19.             }, onDisposed: {
  20.                 print("selector disposed")
  21.                 valueBuffer = []
  22.             })
  23.  
  24.             let subscription = self.subscribe(onNext: { (value) in
  25.                 print("buffer collecting value: \(value)")
  26.                 valueBuffer.append(value)
  27.             }, onError: { (error) in
  28.                 observer.on(.error(error))
  29.                 print("buffer error: \(error)")
  30.                 selectorSubscription.dispose()
  31.             }, onCompleted: {
  32.                 print("buffer completed")
  33.                 observer.on(.completed)
  34.                 selectorSubscription.dispose()
  35.             }, onDisposed: {
  36.                 print("buffer disposed")
  37.                 observer.on(.completed)
  38.                 selectorSubscription.dispose()
  39.             })
  40.             return subscription
  41.         }
  42.     }
  43.  
  44.     func debouncedBuffer(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
  45.         var valueBuffer: [E] = []
  46.  
  47.         let observable = self.do(onNext: { (value) in
  48.             valueBuffer.append(value)
  49.         }, onError: { (error) in
  50.             valueBuffer = []
  51.         }, onCompleted: {
  52.             valueBuffer = []
  53.         }, onSubscribe: {
  54.             valueBuffer = []
  55.         }, onDispose: {
  56.             valueBuffer = []
  57.         }).debounce(dueTime, scheduler: scheduler).flatMap { (value) -> Observable<[E]> in
  58.             let emitValues = valueBuffer
  59.             valueBuffer = []
  60.             return Observable<[E]>.just(emitValues)
  61.         }
  62.  
  63.         return observable
  64.     }
  65. }
  66.  
  67. class MyRxTests {
  68.  
  69.     func testDebouncedBuffer() {
  70.         let values = randomValuesEmitter()
  71.  
  72.         var lastEmissionTime = CACurrentMediaTime()
  73.         let backgroundScheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .utility)
  74.  
  75.         let delayedValues = values.debouncedBuffer(1.0, scheduler: backgroundScheduler)
  76.         let bufferSubscription = delayedValues.observeOn(MainScheduler.instance).subscribe(onNext: { (value) in
  77.                 let bufferTime = CACurrentMediaTime() - lastEmissionTime
  78.                 lastEmissionTime = CACurrentMediaTime()
  79.                 print("onNext: \(value) (buffer time: \(bufferTime))")
  80.             }, onError: { (error) in
  81.                 print("onError: \(error)")
  82.             }, onCompleted: {
  83.                 print("onCompleted")
  84.             }, onDisposed: {
  85.                 print("onDisposed")
  86.         })
  87.  
  88.         DispatchQueue.main.asyncAfter(deadline: .now() + 5.0) {
  89.             bufferSubscription.dispose()
  90.         }
  91.     }
  92.  
  93.     func testClosingBuffer() {
  94.         let values = randomValuesEmitter()
  95.  
  96.         var lastEmissionTime = CACurrentMediaTime()
  97.         let backgroundScheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .utility)
  98.  
  99.         let valuesMulticast = values.publish().refCount()
  100.         let valuesDebounced = valuesMulticast.debounce(1.0, scheduler: backgroundScheduler)
  101.         let delayedValues = valuesMulticast.buffer(valuesDebounced)
  102.  
  103.         let bufferSubscription = delayedValues.observeOn(MainScheduler.instance).subscribe(onNext: { (value) in
  104.             let bufferTime = CACurrentMediaTime() - lastEmissionTime
  105.             lastEmissionTime = CACurrentMediaTime()
  106.             print("onNext: \(value) (buffer time: \(bufferTime))")
  107.             }, onError: { (error) in
  108.                 print("onError: \(error)")
  109.             }, onCompleted: {
  110.                 print("onCompleted")
  111.             }, onDisposed: {
  112.                 print("onDisposed")
  113.         })
  114.  
  115.         DispatchQueue.main.asyncAfter(deadline: .now() + 5.0) {
  116.             bufferSubscription.dispose()
  117.         }
  118.     }
  119.  
  120.     func randomValuesEmitter() -> Observable<Int> {
  121.         let backgroundScheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .background)
  122.         var lastEmissionTime = CACurrentMediaTime()
  123.         let randomValues = Observable<Int>.interval(0.1, scheduler: backgroundScheduler)
  124.             .flatMap { (value) -> Observable<Int> in
  125.                 let shouldPost = arc4random() % 5 == 0 && value < 200
  126.                 if shouldPost {
  127.                     let elapsedTime = CACurrentMediaTime() - lastEmissionTime
  128.                     lastEmissionTime = CACurrentMediaTime()
  129.                     print("emitting \(value)  (since last emission: \(elapsedTime))")
  130.                 }
  131.                 return shouldPost == true ? Observable<Int>.just(value) : Observable<Int>.empty()
  132.         }
  133.         return randomValues
  134.     }
  135.  
  136. }
  137.  
  138. let tests = MyRxTests()
  139.         tests.testClosingBuffer()
  140. //        tests.testDebouncedBuffer()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement