Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class Sink<O : ObserverType> : Disposable {
- private let _observer: O
- private var _disposed: Bool
- init(observer: O) {
- _observer = observer
- _disposed = false
- }
- public final func forwardOn(_ event: Event<O.E>) {
- if self._disposed {
- return
- }
- self._observer.on(event)
- }
- public final var disposed: Bool {
- get {
- return self._disposed
- }
- }
- public func dispose() {
- self._disposed = true
- }
- }
- private class WindowIntervalSink<Element, O: ObserverType> : Sink<O> where O.E == Observable<Element> {
- private var _scheduler: SchedulerType
- private var _lock: NSRecursiveLock = NSRecursiveLock()
- private var _lastItemSendTime: RxTime = RxTime.distantPast
- private var _windowInterval: RxTimeInterval
- private var _currentSequence: PublishSubject<Element>?
- private var _windowId: UInt32 = 0
- private let _parent: WindowInterval<Element>
- public init(parent: WindowInterval<Element>, interval: RxTimeInterval, scheduler: SchedulerType, observer: O) {
- self._parent = parent
- self._scheduler = scheduler
- self._windowInterval = interval
- super.init(observer: observer)
- }
- public func run() -> Disposable {
- return Disposables.create {}
- }
- public func on(_ event: Event<Element>) {
- self._syncOn(event)
- }
- private func _syncOn(_ event: Event<Element>) {
- self._lock.lock()
- defer {
- self._lock.unlock()
- }
- switch event {
- case let .next(element):
- let now = self._scheduler.now
- let timeIntervalSinceLastSend = now.timeIntervalSince(self._lastItemSendTime)
- if timeIntervalSinceLastSend > self._windowInterval {
- if let currentObservable = self._currentSequence {
- currentObservable.onCompleted()
- self._currentSequence = nil
- }
- } else {
- self._windowId += 1
- let newSequence = PublishSubject<Element>()
- self.forwardOn(.next(newSequence))
- self._scheduler.schedule(element) { element in
- let disposable = Disposables.create { }
- guard let sequence = self._currentSequence else {
- return disposable
- }
- sequence.onNext(element)
- return disposable
- }
- }
- case let .error(error):
- if let currentSequence = self._currentSequence {
- currentSequence.onError(error)
- }
- self.forwardOn(.error(error))
- case .completed:
- if let currentSequence = self._currentSequence {
- currentSequence.onCompleted()
- }
- self.forwardOn(.completed)
- }
- }
- }
- public class WindowInterval<Element>: ObservableType {
- public typealias E = Observable<Element>
- private let _timeInterval: RxTimeInterval
- private let _scheduler: SchedulerType
- private let _source: Observable<Element>
- public init(source: Observable<Element>, timeInterval: RxTimeInterval, scheduler: SchedulerType) {
- self._timeInterval = timeInterval
- self._scheduler = scheduler
- self._source = source
- }
- public func subscribe<O>(_ observer: O) -> Disposable where O : ObserverType, WindowInterval.E == O.E {
- return CurrentThreadScheduler.instance.schedule(()) { _ in
- let disposer = SinkDisposer()
- let sinkAndSubscription = self._run(observer, disposer: disposer)
- disposer.setSinkAndSubscription(sinkAndSubscription)
- return disposer
- }
- }
- private func _run<O: ObserverType>(_ observer: O, disposer: Cancelable) -> (sink: Disposable, subsription: Disposable) where O.E == Observable<Element> {
- let sink = WindowIntervalSink(parent: self, interval: self._timeInterval, scheduler: self._scheduler, observer: observer)
- let subscription = sink.run()
- return (sink: sink, subsription: subscription)
- }
- }
- public class SinkDisposer: Cancelable {
- private static let DisposedState: Int32 = 2
- private static let SetState: Int32 = 1
- private var _state: Int32 = 0
- private var _sink: Disposable
- private var _subscription: Disposable
- public var isDisposed: Bool {
- get {
- return AtomicFla`
- }
- }
- public func setSinkAndSubscription(_ sink: ((sink: Disposable, subsription: Disposable))) {
- _sink = sink
- _subscription = subscription
- let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
- if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
- rxFatalError("Sink and subscription were already set")
- }
- if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
- sink.dispose()
- subscription.dispose()
- _sink = nil
- _subscription = nil
- }
- }
- public func dispose() {
- }
- }
- public extension ObservableType {
- public func window(interval: RxTimeInterval, scheduler: SchedulerType) -> Observable<Observable<E>> {
- return WindowInterval(source: self.asObservable(), timeInterval: interval, scheduler: scheduler).asObservable()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement