Advertisement
Guest User

Untitled

a guest
Jun 22nd, 2018
352
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.86 KB | None | 0 0
  1. class Sink<O : ObserverType> : Disposable {
  2. private let _observer: O
  3. private var _disposed: Bool
  4.  
  5. init(observer: O) {
  6. _observer = observer
  7. _disposed = false
  8. }
  9.  
  10. public final func forwardOn(_ event: Event<O.E>) {
  11. if self._disposed {
  12. return
  13. }
  14. self._observer.on(event)
  15. }
  16.  
  17. public final var disposed: Bool {
  18. get {
  19. return self._disposed
  20. }
  21. }
  22.  
  23. public func dispose() {
  24. self._disposed = true
  25. }
  26. }
  27.  
  28. private class WindowIntervalSink<Element, O: ObserverType> : Sink<O> where O.E == Observable<Element> {
  29.  
  30. private var _scheduler: SchedulerType
  31. private var _lock: NSRecursiveLock = NSRecursiveLock()
  32.  
  33. private var _lastItemSendTime: RxTime = RxTime.distantPast
  34. private var _windowInterval: RxTimeInterval
  35.  
  36. private var _currentSequence: PublishSubject<Element>?
  37. private var _windowId: UInt32 = 0
  38.  
  39. private let _parent: WindowInterval<Element>
  40.  
  41. public init(parent: WindowInterval<Element>, interval: RxTimeInterval, scheduler: SchedulerType, observer: O) {
  42. self._parent = parent
  43. self._scheduler = scheduler
  44. self._windowInterval = interval
  45. super.init(observer: observer)
  46. }
  47.  
  48. public func run() -> Disposable {
  49. return Disposables.create {}
  50. }
  51.  
  52. public func on(_ event: Event<Element>) {
  53. self._syncOn(event)
  54. }
  55.  
  56. private func _syncOn(_ event: Event<Element>) {
  57. self._lock.lock()
  58. defer {
  59. self._lock.unlock()
  60. }
  61.  
  62. switch event {
  63. case let .next(element):
  64.  
  65. let now = self._scheduler.now
  66. let timeIntervalSinceLastSend = now.timeIntervalSince(self._lastItemSendTime)
  67.  
  68. if timeIntervalSinceLastSend > self._windowInterval {
  69. if let currentObservable = self._currentSequence {
  70. currentObservable.onCompleted()
  71. self._currentSequence = nil
  72. }
  73.  
  74. } else {
  75. self._windowId += 1
  76. let newSequence = PublishSubject<Element>()
  77. self.forwardOn(.next(newSequence))
  78.  
  79. self._scheduler.schedule(element) { element in
  80. let disposable = Disposables.create { }
  81. guard let sequence = self._currentSequence else {
  82. return disposable
  83. }
  84.  
  85. sequence.onNext(element)
  86. return disposable
  87. }
  88. }
  89.  
  90. case let .error(error):
  91. if let currentSequence = self._currentSequence {
  92. currentSequence.onError(error)
  93. }
  94. self.forwardOn(.error(error))
  95.  
  96. case .completed:
  97. if let currentSequence = self._currentSequence {
  98. currentSequence.onCompleted()
  99. }
  100. self.forwardOn(.completed)
  101. }
  102. }
  103. }
  104.  
  105. public class WindowInterval<Element>: ObservableType {
  106. public typealias E = Observable<Element>
  107.  
  108. private let _timeInterval: RxTimeInterval
  109. private let _scheduler: SchedulerType
  110. private let _source: Observable<Element>
  111.  
  112. public init(source: Observable<Element>, timeInterval: RxTimeInterval, scheduler: SchedulerType) {
  113. self._timeInterval = timeInterval
  114. self._scheduler = scheduler
  115. self._source = source
  116. }
  117.  
  118. public func subscribe<O>(_ observer: O) -> Disposable where O : ObserverType, WindowInterval.E == O.E {
  119. return CurrentThreadScheduler.instance.schedule(()) { _ in
  120. let disposer = SinkDisposer()
  121.  
  122. let sinkAndSubscription = self._run(observer, disposer: disposer)
  123. disposer.setSinkAndSubscription(sinkAndSubscription)
  124.  
  125. return disposer
  126. }
  127. }
  128.  
  129. private func _run<O: ObserverType>(_ observer: O, disposer: Cancelable) -> (sink: Disposable, subsription: Disposable) where O.E == Observable<Element> {
  130. let sink = WindowIntervalSink(parent: self, interval: self._timeInterval, scheduler: self._scheduler, observer: observer)
  131. let subscription = sink.run()
  132.  
  133. return (sink: sink, subsription: subscription)
  134. }
  135. }
  136.  
  137. public class SinkDisposer: Cancelable {
  138.  
  139. private static let DisposedState: Int32 = 2
  140. private static let SetState: Int32 = 1
  141.  
  142. private var _state: Int32 = 0
  143. private var _sink: Disposable
  144. private var _subscription: Disposable
  145.  
  146. public var isDisposed: Bool {
  147. get {
  148. return AtomicFla`
  149. }
  150. }
  151.  
  152. public func setSinkAndSubscription(_ sink: ((sink: Disposable, subsription: Disposable))) {
  153. _sink = sink
  154. _subscription = subscription
  155.  
  156. let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
  157. if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
  158. rxFatalError("Sink and subscription were already set")
  159. }
  160.  
  161. if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
  162. sink.dispose()
  163. subscription.dispose()
  164. _sink = nil
  165. _subscription = nil
  166. }
  167. }
  168.  
  169. public func dispose() {
  170.  
  171. }
  172. }
  173.  
  174. public extension ObservableType {
  175. public func window(interval: RxTimeInterval, scheduler: SchedulerType) -> Observable<Observable<E>> {
  176. return WindowInterval(source: self.asObservable(), timeInterval: interval, scheduler: scheduler).asObservable()
  177.  
  178. }
  179. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement