Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // there are a couple of hidden, nasty error conditions
- // there's an assumption that the observable/driver returned from the
- // accumulator returns exactly 1 event (no more, no less)
- // really, it should return a Promise instead of an Observable
- extension SharedSequenceConvertibleType where SharingStrategy == DriverSharingStrategy {
- func scanDriver<A>(_ seed: A,
- accumulator: @escaping (A, Self.E) -> Driver<A>) -> Driver<A> {
- let scanner: Driver<Driver<A>> = self
- .scan(Driver.just(seed),
- accumulator: { (previousDriver: Driver<A>, current: Self.E) -> Driver<A> in
- return previousDriver
- .flatMap { previous in accumulator(previous, current) }
- // this seems a bit awkward -- should shareReplay be
- // available on Driver?
- .asObservable()
- .shareReplay(1)
- .asDriver(onErrorJustReturn: seed)
- })
- return scanner.flatMap { driver in driver }
- }
- func scanDriverSubject<A>(_ seed: A,
- accumulator: @escaping (A, Self.E) -> Driver<A>) -> Driver<A> {
- let outVariable: Variable<A> = Variable(seed)
- let out: Driver<Driver<A>> = self.withLatestFrom(
- outVariable.asDriver(),
- resultSelector: { (next: Self.E, previous: A) -> Driver<A> in
- accumulator(previous, next)
- .do(onNext: { value in outVariable.value = value })
- })
- return out.flatMap { (dr: Driver<A>) -> Driver<A> in dr }
- }
- }
- extension Observable {
- func scanObservable<A>(_ seed: A,
- accumulator: @escaping (A, Element) -> Observable<A>) -> Observable<A> {
- let scanner: Observable<Observable<A>> = self
- .scan(Observable<A>.just(seed),
- accumulator: { (previousObservable: Observable<A>, current: Element) -> Observable<A> in
- return previousObservable
- .flatMap { previous in accumulator(previous, current) }
- .shareReplay(1)
- })
- return scanner.flatMap { obs in obs }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement