Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- extension ObservableType {
- func buffer<R>(_ selector: Observable<R>) -> Observable<[E]> {
- var valueBuffer: [E] = []
- return Observable.create { observer in
- let selectorSubscription = selector.subscribe(onNext: { (value) in
- let emitValues = valueBuffer
- valueBuffer = []
- print("Selector emitting values")
- observer.on(.next(emitValues))
- }, onError: { (error) in
- print("selector error: \(error)")
- valueBuffer = []
- observer.on(.error(error))
- }, onCompleted: {
- print("selector completed")
- valueBuffer = []
- observer.on(.completed)
- }, onDisposed: {
- print("selector disposed")
- valueBuffer = []
- })
- let subscription = self.subscribe(onNext: { (value) in
- print("buffer collecting value: \(value)")
- valueBuffer.append(value)
- }, onError: { (error) in
- observer.on(.error(error))
- print("buffer error: \(error)")
- selectorSubscription.dispose()
- }, onCompleted: {
- print("buffer completed")
- observer.on(.completed)
- selectorSubscription.dispose()
- }, onDisposed: {
- print("buffer disposed")
- observer.on(.completed)
- selectorSubscription.dispose()
- })
- return subscription
- }
- }
- func debouncedBuffer(_ dueTime: RxTimeInterval, scheduler: SchedulerType) -> Observable<[E]> {
- var valueBuffer: [E] = []
- let observable = self.do(onNext: { (value) in
- valueBuffer.append(value)
- }, onError: { (error) in
- valueBuffer = []
- }, onCompleted: {
- valueBuffer = []
- }, onSubscribe: {
- valueBuffer = []
- }, onDispose: {
- valueBuffer = []
- }).debounce(dueTime, scheduler: scheduler).flatMap { (value) -> Observable<[E]> in
- let emitValues = valueBuffer
- valueBuffer = []
- return Observable<[E]>.just(emitValues)
- }
- return observable
- }
- }
- class MyRxTests {
- func testDebouncedBuffer() {
- let values = randomValuesEmitter()
- var lastEmissionTime = CACurrentMediaTime()
- let backgroundScheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .utility)
- let delayedValues = values.debouncedBuffer(1.0, scheduler: backgroundScheduler)
- let bufferSubscription = delayedValues.observeOn(MainScheduler.instance).subscribe(onNext: { (value) in
- let bufferTime = CACurrentMediaTime() - lastEmissionTime
- lastEmissionTime = CACurrentMediaTime()
- print("onNext: \(value) (buffer time: \(bufferTime))")
- }, onError: { (error) in
- print("onError: \(error)")
- }, onCompleted: {
- print("onCompleted")
- }, onDisposed: {
- print("onDisposed")
- })
- DispatchQueue.main.asyncAfter(deadline: .now() + 5.0) {
- bufferSubscription.dispose()
- }
- }
- func testClosingBuffer() {
- let values = randomValuesEmitter()
- var lastEmissionTime = CACurrentMediaTime()
- let backgroundScheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .utility)
- let valuesMulticast = values.publish().refCount()
- let valuesDebounced = valuesMulticast.debounce(1.0, scheduler: backgroundScheduler)
- let delayedValues = valuesMulticast.buffer(valuesDebounced)
- let bufferSubscription = delayedValues.observeOn(MainScheduler.instance).subscribe(onNext: { (value) in
- let bufferTime = CACurrentMediaTime() - lastEmissionTime
- lastEmissionTime = CACurrentMediaTime()
- print("onNext: \(value) (buffer time: \(bufferTime))")
- }, onError: { (error) in
- print("onError: \(error)")
- }, onCompleted: {
- print("onCompleted")
- }, onDisposed: {
- print("onDisposed")
- })
- DispatchQueue.main.asyncAfter(deadline: .now() + 5.0) {
- bufferSubscription.dispose()
- }
- }
- func randomValuesEmitter() -> Observable<Int> {
- let backgroundScheduler = ConcurrentDispatchQueueScheduler(globalConcurrentQueueQOS: .background)
- var lastEmissionTime = CACurrentMediaTime()
- let randomValues = Observable<Int>.interval(0.1, scheduler: backgroundScheduler)
- .flatMap { (value) -> Observable<Int> in
- let shouldPost = arc4random() % 5 == 0 && value < 200
- if shouldPost {
- let elapsedTime = CACurrentMediaTime() - lastEmissionTime
- lastEmissionTime = CACurrentMediaTime()
- print("emitting \(value) (since last emission: \(elapsedTime))")
- }
- return shouldPost == true ? Observable<Int>.just(value) : Observable<Int>.empty()
- }
- return randomValues
- }
- }
- let tests = MyRxTests()
- tests.testClosingBuffer()
- // tests.testDebouncedBuffer()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement