Advertisement
Guest User

Untitled

a guest
May 23rd, 2019
3,485
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 278.91 KB | None | 0 0
  1. /**
  2. * Copyright 2013 Netflix, Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16.  
  17. package rx.lang.scala
  18.  
  19. import rx.annotations.{Beta, Experimental}
  20. import rx.functions.FuncN
  21. import rx.lang.scala.observables.{AsyncOnSubscribe, ConnectableObservable, ErrorDelayingObservable, SyncOnSubscribe}
  22.  
  23. import scala.concurrent.duration
  24.  
  25. import scala.collection.generic.CanBuildFrom
  26. import scala.annotation.unchecked.uncheckedVariance
  27. import scala.collection.{Iterable, Traversable, immutable}
  28. import scala.collection.mutable.ArrayBuffer
  29. import scala.language.higherKinds
  30. import scala.reflect.ClassTag
  31. import scala.util.Try
  32.  
  33.  
  34. /**
  35. * The Observable interface that implements the Reactive Pattern.
  36. *
  37. * @define subscribeObserverMain
  38. * Call this method to subscribe an [[rx.lang.scala.Observer]] for receiving
  39. * items and notifications from the Observable.
  40. *
  41. * A typical implementation of `subscribe` does the following:
  42. *
  43. * It stores a reference to the Observer in a collection object, such as a `List[T]` object.
  44. *
  45. * It returns a reference to the [[rx.lang.scala.Subscription]] interface. This enables Observers to
  46. * unsubscribe, that is, to stop receiving items and notifications before the Observable stops
  47. * sending them, which also invokes the Observer's [[rx.lang.scala.Observer.onCompleted onCompleted]] method.
  48. *
  49. * An `Observable[T]` instance is responsible for accepting all subscriptions
  50. * and notifying all Observers. Unless the documentation for a particular
  51. * `Observable[T]` implementation indicates otherwise, Observers should make no
  52. * assumptions about the order in which multiple Observers will receive their notifications.
  53. *
  54. * @define subscribeObserverParamObserver
  55. * the observer
  56. * @define subscribeObserverParamScheduler
  57. * the [[rx.lang.scala.Scheduler]] on which Observers subscribe to the Observable
  58. *
  59. * @define subscribeSubscriberMain
  60. * Call this method to subscribe an [[Subscriber]] for receiving items and notifications from the [[Observable]].
  61. *
  62. * A typical implementation of `subscribe` does the following:
  63. *
  64. * It stores a reference to the Observer in a collection object, such as a `List[T]` object.
  65. *
  66. * It returns a reference to the [[rx.lang.scala.Subscription]] interface. This enables [[Subscriber]]s to
  67. * unsubscribe, that is, to stop receiving items and notifications before the Observable stops
  68. * sending them, which also invokes the Subscriber's [[rx.lang.scala.Observer.onCompleted onCompleted]] method.
  69. *
  70. * An [[Observable]] instance is responsible for accepting all subscriptions
  71. * and notifying all [[Subscriber]]s. Unless the documentation for a particular
  72. * [[Observable]] implementation indicates otherwise, [[Subscriber]]s should make no
  73. * assumptions about the order in which multiple [[Subscriber]]s will receive their notifications.
  74. *
  75. * @define subscribeSubscriberParamObserver
  76. * the [[Subscriber]]
  77. * @define subscribeSubscriberParamScheduler
  78. * the [[rx.lang.scala.Scheduler]] on which [[Subscriber]]s subscribe to the Observable
  79. *
  80. * @define subscribeAllReturn
  81. * a [[rx.lang.scala.Subscription]] reference whose `unsubscribe` method can be called to stop receiving items
  82. * before the Observable has finished sending them
  83. *
  84. * @define subscribeCallbacksMainWithNotifications
  85. * Call this method to receive items and notifications from this observable.
  86. *
  87. * @define subscribeCallbacksMainNoNotifications
  88. * Call this method to receive items from this observable.
  89. *
  90. * @define subscribeCallbacksParamOnNext
  91. * this function will be called whenever the Observable emits an item
  92. * @define subscribeCallbacksParamOnError
  93. * this function will be called if an error occurs
  94. * @define subscribeCallbacksParamOnComplete
  95. * this function will be called when this Observable has finished emitting items
  96. * @define subscribeCallbacksParamScheduler
  97. * the scheduler to use
  98. *
  99. * @define noDefaultScheduler
  100. * ===Scheduler:===
  101. * This method does not operate by default on a particular [[Scheduler]].
  102. *
  103. * @define supportBackpressure
  104. * ===Backpressure:===
  105. * Fully supports backpressure.
  106. *
  107. * @define debounceVsThrottle
  108. * Information on debounce vs throttle:
  109. * - [[http://drupalmotion.com/article/debounce-and-throttle-visual-explanation]]
  110. * - [[http://unscriptable.com/2009/03/20/debouncing-javascript-methods/]]
  111. * - [[http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/]]
  112. *
  113. * @define experimental
  114. * <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>
  115. *
  116. * @define beta
  117. * <span class="badge badge-red" style="float: right;">BETA</span>
  118. *
  119. */
  120. trait Observable[+T]
  121. {
  122. import scala.collection.JavaConverters._
  123. import scala.collection.Seq
  124. import scala.concurrent.duration.{Duration, TimeUnit, MILLISECONDS}
  125. import scala.collection.mutable
  126. import rx.functions._
  127. import rx.lang.scala.observables.BlockingObservable
  128. import ImplicitFunctionConversions._
  129. import JavaConversions._
  130.  
  131. private [scala] val asJavaObservable: rx.Observable[_ <: T]
  132.  
  133. /**
  134. * Subscribes to an [[Observable]] but ignore its emissions and notifications.
  135. *
  136. * $noDefaultScheduler
  137. *
  138. * @return $subscribeAllReturn
  139. * @throws rx.exceptions.OnErrorNotImplementedException if the [[Observable]] tries to call `onError`
  140. * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
  141. */
  142. def subscribe(): Subscription = {
  143. asJavaObservable.subscribe()
  144. }
  145.  
  146. /**
  147. * $subscribeObserverMain
  148. *
  149. * $noDefaultScheduler
  150. *
  151. * @param observer $subscribeObserverParamObserver
  152. * @return $subscribeAllReturn
  153. * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
  154. */
  155. def subscribe(observer: Observer[T]): Subscription = {
  156. asJavaObservable.subscribe(observer.asJavaObserver)
  157. }
  158.  
  159. /**
  160. * $subscribeObserverMain
  161. *
  162. * $noDefaultScheduler
  163. *
  164. * @param observer $subscribeObserverParamObserver
  165. * @return $subscribeAllReturn
  166. * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
  167. */
  168. def apply(observer: Observer[T]): Subscription = subscribe(observer)
  169.  
  170. /**
  171. * $subscribeSubscriberMain
  172. *
  173. * $noDefaultScheduler
  174. *
  175. * @param subscriber $subscribeSubscriberParamObserver
  176. * @return $subscribeAllReturn
  177. * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
  178. */
  179. def subscribe(subscriber: Subscriber[T]): Subscription = {
  180. // Add the casting to avoid compile error "ambiguous reference to overloaded definition"
  181. val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
  182. thisJava.subscribe(subscriber.asJavaSubscriber)
  183. }
  184.  
  185. /**
  186. * Subscribe to Observable and invoke `OnSubscribe` function without any
  187. * contract protection, error handling, unsubscribe, or execution hooks.
  188. *
  189. * This should only be used for implementing an `Operator` that requires nested subscriptions.
  190. *
  191. * Normal use should use `Observable.subscribe` which ensures the Rx contract and other functionality.
  192. *
  193. * @param subscriber
  194. * @return [[Subscription]] which is the Subscriber passed in
  195. * @since 0.17
  196. */
  197. def unsafeSubscribe(subscriber: Subscriber[T]): Subscription = {
  198. asJavaObservable.unsafeSubscribe(subscriber.asJavaSubscriber)
  199. }
  200.  
  201. /**
  202. * $subscribeSubscriberMain
  203. *
  204. * $noDefaultScheduler
  205. *
  206. * @param subscriber $subscribeSubscriberParamObserver
  207. * @return $subscribeAllReturn
  208. * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
  209. */
  210. def apply(subscriber: Subscriber[T]): Subscription = subscribe(subscriber)
  211.  
  212. /**
  213. * $subscribeCallbacksMainNoNotifications
  214. *
  215. * $noDefaultScheduler
  216. *
  217. * @param onNext $subscribeCallbacksParamOnNext
  218. * @return $subscribeAllReturn
  219. * @throws rx.exceptions.OnErrorNotImplementedException if the [[Observable]] tries to call `onError`
  220. * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
  221. */
  222. def subscribe(onNext: T => Unit): Subscription = {
  223. asJavaObservable.subscribe(scalaFunction1ProducingUnitToAction1(onNext))
  224. }
  225.  
  226. /**
  227. * $subscribeCallbacksMainWithNotifications
  228. *
  229. * $noDefaultScheduler
  230. *
  231. * @param onNext $subscribeCallbacksParamOnNext
  232. * @param onError $subscribeCallbacksParamOnError
  233. * @return $subscribeAllReturn
  234. * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
  235. */
  236. def subscribe(onNext: T => Unit, onError: Throwable => Unit): Subscription = {
  237. asJavaObservable.subscribe(
  238. scalaFunction1ProducingUnitToAction1(onNext),
  239. scalaFunction1ProducingUnitToAction1(onError)
  240. )
  241. }
  242.  
  243. /**
  244. * $subscribeCallbacksMainWithNotifications
  245. *
  246. * $noDefaultScheduler
  247. *
  248. * @param onNext $subscribeCallbacksParamOnNext
  249. * @param onError $subscribeCallbacksParamOnError
  250. * @param onCompleted $subscribeCallbacksParamOnComplete
  251. * @return $subscribeAllReturn
  252. * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
  253. */
  254. def subscribe(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Subscription = {
  255. asJavaObservable.subscribe(
  256. scalaFunction1ProducingUnitToAction1(onNext),
  257. scalaFunction1ProducingUnitToAction1(onError),
  258. scalaFunction0ProducingUnitToAction0(onCompleted)
  259. )
  260. }
  261.  
  262. /**
  263. * Returns an Observable that first emits the items emitted by `this`, and then `elem`.
  264. *
  265. * @param elem the item to be appended
  266. * @return an Observable that first emits the items emitted by `this`, and then `elem`.
  267. */
  268. def :+[U >: T](elem: U): Observable[U] = {
  269. this ++ Observable.just(elem)
  270. }
  271.  
  272. /**
  273. * Returns an Observable that first emits the items emitted by `this`, and then the items emitted
  274. * by `that`.
  275. *
  276. * <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="" />
  277. *
  278. * @param that
  279. * an Observable to be appended
  280. * @return an Observable that emits items that are the result of combining the items emitted by
  281. * this and that, one after the other
  282. */
  283. def ++[U >: T](that: Observable[U]): Observable[U] = {
  284. val o1: rx.Observable[_ <: U] = this.asJavaObservable
  285. val o2: rx.Observable[_ <: U] = that.asJavaObservable
  286. toScalaObservable(rx.Observable.concat(o1, o2))
  287. }
  288.  
  289. /**
  290. * Returns an Observable that emits a specified item before it begins to emit items emitted by the source Observable.
  291. * <p>
  292. * <img width="640" height="315" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/startWith.png" alt="" />
  293. *
  294. * @param elem the item to emit
  295. * @return an Observable that emits the specified item before it begins to emit items emitted by the source Observable
  296. */
  297. def +:[U >: T](elem: U): Observable[U] = {
  298. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
  299. toScalaObservable(thisJava.startWith(elem))
  300. }
  301.  
  302. /**
  303. * Returns an Observable that emits the items emitted by several Observables, one after the
  304. * other.
  305. *
  306. * This operation is only available if `this` is of type `Observable[Observable[U]]` for some `U`,
  307. * otherwise you'll get a compilation error.
  308. *
  309. * @usecase def concat[U]: Observable[U]
  310. */
  311. def concat[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
  312. val o2: Observable[Observable[U]] = this
  313. val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
  314. val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
  315. val o5 = rx.Observable.concat[U](o4)
  316. toScalaObservable[U](o5)
  317. }
  318.  
  319. /**
  320. * Returns a new Observable that emits items resulting from applying a function that you supply to each item
  321. * emitted by the source Observable, where that function returns an Observable, and then emitting the items
  322. * that result from concatinating those resulting Observables.
  323. *
  324. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="" />
  325. *
  326. * @param f a function that, when applied to an item emitted by the source Observable, returns an Observable
  327. * @return an Observable that emits the result of applying the transformation function to each item emitted
  328. * by the source Observable and concatinating the Observables obtained from this transformation
  329. */
  330. def concatMap[R](f: T => Observable[R]): Observable[R] = {
  331. toScalaObservable[R](asJavaObservable.concatMap[R](new Func1[T, rx.Observable[_ <: R]] {
  332. def call(t1: T): rx.Observable[_ <: R] = {
  333. f(t1).asJavaObservable
  334. }
  335. }))
  336. }
  337.  
  338. /**
  339. * $experimental Concatenates `this` and `that` source [[Observable]]s eagerly into a single stream of values.
  340. *
  341. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  342. * source [[Observable]]s. The operator buffers the values emitted by these [[Observable]]s and then drains them
  343. * in order, each one after the previous one completes.
  344. *
  345. * ===Backpressure:===
  346. * Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources
  347. * are subscribed to in unbounded mode and their values are queued up in an unbounded buffer.
  348. *
  349. * $noDefaultScheduler
  350. *
  351. * @param that the source to concat with.
  352. * @return an [[Observable]] that emits items all of the items emitted by `this` and `that`, one after the other,
  353. * without interleaving them.
  354. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  355. */
  356. @Experimental
  357. def concatEager[U >: T](that: Observable[U]): Observable[U] = {
  358. val o1: rx.Observable[_ <: U] = this.asJavaObservable
  359. val o2: rx.Observable[_ <: U] = that.asJavaObservable
  360. toScalaObservable(rx.Observable.concatEager(o1, o2))
  361. }
  362.  
  363. /**
  364. * $experimental Concatenates an [[Observable]] sequence of [[Observable]]s eagerly into a single stream of values.
  365. *
  366. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  367. * emitted source [[Observable]]s as they are observed. The operator buffers the values emitted by these
  368. * [[Observable]]s and then drains them in order, each one after the previous one completes.
  369. *
  370. * ===Backpressure:===
  371. * Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources
  372. * are subscribed to in unbounded mode and their values are queued up in an unbounded buffer.
  373. *
  374. * $noDefaultScheduler
  375. *
  376. * @return an [[Observable]] that emits items all of the items emitted by the [[Observable]]s emitted by
  377. * `this`, one after the other, without interleaving them
  378. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  379. */
  380. @Experimental
  381. def concatEager[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
  382. val o2: Observable[Observable[U]] = this
  383. val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
  384. val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
  385. val o5 = rx.Observable.concatEager[U](o4)
  386. toScalaObservable[U](o5)
  387. }
  388.  
  389. /**
  390. * $experimental Concatenates an [[Observable]] sequence of [[Observable]]s eagerly into a single stream of values.
  391. *
  392. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  393. * emitted source [[Observable]]s as they are observed. The operator buffers the values emitted by these
  394. * [[Observable]]s and then drains them in order, each one after the previous one completes.
  395. *
  396. * ===Backpressure:===
  397. * Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources
  398. * are subscribed to in unbounded mode and their values are queued up in an unbounded buffer.
  399. *
  400. * $noDefaultScheduler
  401. *
  402. * @param capacityHint hints about the number of expected values in an [[Observable]]
  403. * @return an [[Observable]] that emits items all of the items emitted by the [[Observable]]s emitted by
  404. * `this`, one after the other, without interleaving them
  405. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  406. */
  407. @Experimental
  408. def concatEager[U](capacityHint: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
  409. val o2: Observable[Observable[U]] = this
  410. val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
  411. val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
  412. val o5 = rx.Observable.concatEager[U](o4, capacityHint)
  413. toScalaObservable[U](o5)
  414. }
  415.  
  416. /**
  417. * $experimental Maps a sequence of values into [[Observable]]s and concatenates these [[Observable]]s eagerly into a single
  418. * Observable.
  419. *
  420. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  421. * source [[Observable]]s. The operator buffers the values emitted by these [[Observable]]s and then drains them in
  422. * order, each one after the previous one completes.
  423. *
  424. * ===Backpressure:===
  425. * Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources
  426. * are subscribed to in unbounded mode and their values are queued up in an unbounded buffer.
  427. *
  428. * $noDefaultScheduler
  429. *
  430. * @param f the function that maps a sequence of values into a sequence of [[Observable]]s that will be
  431. * eagerly concatenated
  432. * @return an [[Observable]] that emits items all of the items emitted by the [[Observable]]s returned by
  433. * `f`, one after the other, without interleaving them
  434. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  435. */
  436. @Experimental
  437. def concatMapEager[R](f: T => Observable[R]): Observable[R] = {
  438. toScalaObservable[R](asJavaObservable.concatMapEager[R](new Func1[T, rx.Observable[_ <: R]] {
  439. def call(t1: T): rx.Observable[_ <: R] = {
  440. f(t1).asJavaObservable
  441. }
  442. }))
  443. }
  444.  
  445. /**
  446. * $experimental Maps a sequence of values into [[Observable]]s and concatenates these [[Observable]]s eagerly into a single
  447. * Observable.
  448. *
  449. * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
  450. * source [[Observable]]s. The operator buffers the values emitted by these [[Observable]]s and then drains them in
  451. * order, each one after the previous one completes.
  452. *
  453. * ===Backpressure:===
  454. * Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources
  455. * are subscribed to in unbounded mode and their values are queued up in an unbounded buffer.
  456. *
  457. * $noDefaultScheduler
  458. *
  459. * @param f the function that maps a sequence of values into a sequence of [[Observable]]s that will be
  460. * eagerly concatenated
  461. * @param capacityHint hints about the number of expected values in an [[Observable]]
  462. * @return an [[Observable]] that emits items all of the items emitted by the [[Observable]]s returned by
  463. * `f`, one after the other, without interleaving them
  464. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  465. */
  466. @Experimental
  467. def concatMapEager[R](capacityHint: Int, f: T => Observable[R]): Observable[R] = {
  468. toScalaObservable[R](asJavaObservable.concatMapEager[R](new Func1[T, rx.Observable[_ <: R]] {
  469. def call(t1: T): rx.Observable[_ <: R] = {
  470. f(t1).asJavaObservable
  471. }
  472. }, capacityHint))
  473. }
  474.  
  475. /**
  476. * $experimental Maps a sequence of values into [[Observable]]s and concatenates these [[Observable]]s eagerly into a single [[Observable]].
  477. *
  478. * Eager concatenation means that once a [[Subscriber]] subscribes, this operator subscribes to all of the
  479. * source [[Observable]]s. The operator buffers the values emitted by these [[Observable]]s and then drains them in
  480. * order, each one after the previous one completes.
  481. *
  482. * ===Backpressure:===
  483. * Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources
  484. * are subscribed to in unbounded mode and their values are queued up in an unbounded buffer.
  485. *
  486. * $noDefaultScheduler
  487. *
  488. * @param capacityHint hints about the number of expected source sequence values
  489. * @param maxConcurrent the maximum number of concurrent subscribed [[Observable]]s
  490. * @param f the function that maps a sequence of values into a sequence of [[Observable]]s that will be eagerly concatenated
  491. * @return an [[Observable]] that emits items all of the items emitted by the [[Observable]]s returned by
  492. * `f`, one after the other, without interleaving them
  493. */
  494. @Experimental
  495. def concatMapEager[R](capacityHint: Int, maxConcurrent: Int, f: T => Observable[R]): Observable[R] = {
  496. toScalaObservable[R](asJavaObservable.concatMapEager[R](new Func1[T, rx.Observable[_ <: R]] {
  497. def call(t1: T): rx.Observable[_ <: R] = {
  498. f(t1).asJavaObservable
  499. }
  500. }, capacityHint, maxConcurrent))
  501. }
  502.  
  503. /**
  504. * Wraps this Observable in another Observable that ensures that the resulting
  505. * Observable is chronologically well-behaved.
  506. *
  507. * <img width="640" height="400" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/synchronize.png" alt="" />
  508. *
  509. * A well-behaved Observable does not interleave its invocations of the [[rx.lang.scala.Observer.onNext onNext]], [[rx.lang.scala.Observer.onCompleted onCompleted]], and [[rx.lang.scala.Observer.onError onError]] methods of
  510. * its [[rx.lang.scala.Observer]]s; it invokes `onCompleted` or `onError` only once; and it never invokes `onNext` after invoking either `onCompleted` or `onError`.
  511. * [[Observable.serialize serialize]] enforces this, and the Observable it returns invokes `onNext` and `onCompleted` or `onError` synchronously.
  512. *
  513. * @return an Observable that is a chronologically well-behaved version of the source
  514. * Observable, and that synchronously notifies its [[rx.lang.scala.Observer]]s
  515. */
  516. def serialize: Observable[T] = {
  517. toScalaObservable[T](asJavaObservable.serialize)
  518. }
  519.  
  520. /**
  521. * Wraps each item emitted by a source Observable in a timestamped tuple.
  522. *
  523. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.png" alt="" />
  524. *
  525. * @return an Observable that emits timestamped items from the source Observable
  526. */
  527. def timestamp: Observable[(Long, T)] = {
  528. toScalaObservable[rx.schedulers.Timestamped[_ <: T]](asJavaObservable.timestamp())
  529. .map((t: rx.schedulers.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue))
  530. }
  531.  
  532. /**
  533. * Wraps each item emitted by a source Observable in a timestamped tuple
  534. * with timestamps provided by the given Scheduler.
  535. * <p>
  536. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timestamp.s.png" alt="" />
  537. *
  538. * @param scheduler [[rx.lang.scala.Scheduler]] to use as a time source.
  539. * @return an Observable that emits timestamped items from the source
  540. * Observable with timestamps provided by the given Scheduler
  541. */
  542. def timestamp(scheduler: Scheduler): Observable[(Long, T)] = {
  543. toScalaObservable[rx.schedulers.Timestamped[_ <: T]](asJavaObservable.timestamp(scheduler))
  544. .map((t: rx.schedulers.Timestamped[_ <: T]) => (t.getTimestampMillis, t.getValue))
  545. }
  546.  
  547. /**
  548. * Returns an Observable formed from this Observable and another Observable by combining
  549. * corresponding elements in pairs.
  550. * The number of `onNext` invocations of the resulting `Observable[(T, U)]`
  551. * is the minumum of the number of `onNext` invocations of `this` and `that`.
  552. *
  553. * @param that the Observable to zip with
  554. * @return an Observable that pairs up values from `this` and `that` Observables.
  555. */
  556. def zip[U](that: Observable[U]): Observable[(T, U)] = {
  557. zipWith(that)((_, _))
  558. }
  559.  
  560. /**
  561. * Returns an Observable formed from `this` Observable and `other` Iterable by combining
  562. * corresponding elements in pairs.
  563. * <p>
  564. * <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.i.png" alt="" />
  565. * <p>
  566. * Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is
  567. * not pre-consumed. This allows you to zip infinite streams on either side.
  568. *
  569. * @param that the Iterable sequence
  570. * @return an Observable that pairs up values from the source Observable and the `other` Iterable.
  571. */
  572. def zip[U](that: Iterable[U]): Observable[(T, U)] = {
  573. zipWith(that)((_, _))
  574. }
  575.  
  576. /**
  577. * Returns an Observable that emits items that are the result of applying a specified function to pairs of
  578. * values, one each from the source Observable and a specified Iterable sequence.
  579. * <p>
  580. * <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.i.png" alt="" />
  581. * <p>
  582. * Note that the `other` Iterable is evaluated as items are observed from the source Observable; it is
  583. * not pre-consumed. This allows you to zip infinite streams on either side.
  584. *
  585. * @param that the Iterable sequence
  586. * @param selector a function that combines the pairs of items from the Observable and the Iterable to generate
  587. * the items to be emitted by the resulting Observable
  588. * @return an Observable that pairs up values from the source Observable and the `other` Iterable
  589. * sequence and emits the results of `selector` applied to these pairs
  590. */
  591. def zipWith[U, R](that: Iterable[U])(selector: (T, U) => R): Observable[R] = {
  592. val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
  593. toScalaObservable[R](thisJava.zipWith(that.asJava, selector))
  594. }
  595.  
  596. /**
  597. * Returns an Observable formed from this Observable and another Observable by combining
  598. * corresponding elements using the selector function.
  599. * The number of `onNext` invocations of the resulting `Observable[(T, U)]`
  600. * is the minumum of the number of `onNext` invocations of `this` and `that`.
  601. *
  602. * @param that the Observable to zip with
  603. * @return an Observable that pairs up values from `this` and `that` Observables.
  604. */
  605. def zipWith[U, R](that: Observable[U])(selector: (T, U) => R): Observable[R] = {
  606. toScalaObservable[R](rx.Observable.zip[T, U, R](this.asJavaObservable, that.asJavaObservable, selector))
  607. }
  608.  
  609. /**
  610. * Zips this Observable with its indices.
  611. *
  612. * @return An Observable emitting pairs consisting of all elements of this Observable paired with
  613. * their index. Indices start at 0.
  614. */
  615. def zipWithIndex: Observable[(T, Int)] = {
  616. zip(0 until Int.MaxValue)
  617. }
  618.  
  619. /**
  620. * Creates an Observable which produces buffers of collected values.
  621. *
  622. * This Observable produces buffers. Buffers are created when the specified `openings`
  623. * Observable produces an object. That object is used to construct an Observable to emit buffers, feeding it into `closings` function.
  624. * Buffers are emitted when the created Observable produces an object.
  625. *
  626. * @param openings
  627. * The [[rx.lang.scala.Observable]] which, when it produces an object, will cause
  628. * another buffer to be created.
  629. * @param closings
  630. * The function which is used to produce an [[rx.lang.scala.Observable]] for every buffer created.
  631. * When this [[rx.lang.scala.Observable]] produces an object, the associated buffer
  632. * is emitted.
  633. * @return
  634. * An [[rx.lang.scala.Observable]] which produces buffers which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
  635. */
  636. def slidingBuffer[Opening](openings: Observable[Opening])(closings: Opening => Observable[Any]): Observable[Seq[T]] = {
  637. val opening: rx.Observable[_ <: Opening] = openings.asJavaObservable
  638. val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Any]] = (o: Opening) => closings(o).asJavaObservable
  639. val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Any](opening, closing)
  640. Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
  641. }
  642.  
  643. /**
  644. * Creates an Observable which produces buffers of collected values.
  645. *
  646. * This Observable produces connected non-overlapping buffers, each containing `count`
  647. * elements. When the source Observable completes or encounters an error, the current
  648. * buffer is emitted, and the event is propagated.
  649. *
  650. * @param count
  651. * The maximum size of each buffer before it should be emitted.
  652. * @return
  653. * An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers containing at most
  654. * `count` produced values.
  655. */
  656. def tumblingBuffer(count: Int): Observable[Seq[T]] = {
  657. val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(count)
  658. Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
  659. }
  660.  
  661. /**
  662. * Creates an Observable which produces buffers of collected values.
  663. *
  664. * This Observable produces buffers every `skip` values, each containing `count`
  665. * elements. When the source Observable completes or encounters an error, the current
  666. * buffer is emitted, and the event is propagated.
  667. *
  668. * @param count
  669. * The maximum size of each buffer before it should be emitted.
  670. * @param skip
  671. * How many produced values need to be skipped before starting a new buffer. Note that when `skip` and
  672. * `count` are equals that this is the same operation as `buffer(int)`.
  673. * @return
  674. * An [[rx.lang.scala.Observable]] which produces buffers every `skip` values containing at most
  675. * `count` produced values.
  676. */
  677. def slidingBuffer(count: Int, skip: Int): Observable[Seq[T]] = {
  678. val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(count, skip)
  679. Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
  680. }
  681.  
  682. /**
  683. * Creates an Observable which produces buffers of collected values.
  684. *
  685. * This Observable produces connected non-overlapping buffers, each of a fixed duration
  686. * specified by the `timespan` argument. When the source Observable completes or encounters
  687. * an error, the current buffer is emitted and the event is propagated.
  688. *
  689. * @param timespan
  690. * The period of time each buffer is collecting values before it should be emitted, and
  691. * replaced with a new buffer.
  692. * @return
  693. * An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers with a fixed duration.
  694. */
  695. def tumblingBuffer(timespan: Duration): Observable[Seq[T]] = {
  696. val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(timespan.length, timespan.unit)
  697. Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
  698. }
  699.  
  700. /**
  701. * Creates an Observable which produces buffers of collected values.
  702. *
  703. * This Observable produces connected non-overlapping buffers, each of a fixed duration
  704. * specified by the `timespan` argument. When the source Observable completes or encounters
  705. * an error, the current buffer is emitted and the event is propagated.
  706. *
  707. * @param timespan
  708. * The period of time each buffer is collecting values before it should be emitted, and
  709. * replaced with a new buffer.
  710. * @param scheduler
  711. * The [[rx.lang.scala.Scheduler]] to use when determining the end and start of a buffer.
  712. * @return
  713. * An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers with a fixed duration.
  714. */
  715. def tumblingBuffer(timespan: Duration, scheduler: Scheduler): Observable[Seq[T]] = {
  716. val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(timespan.length, timespan.unit, scheduler)
  717. Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
  718. }
  719.  
  720. /**
  721. * Creates an Observable which produces buffers of collected values. This Observable produces connected
  722. * non-overlapping buffers, each of a fixed duration specified by the `timespan` argument or a maximum size
  723. * specified by the `count` argument (which ever is reached first). When the source Observable completes
  724. * or encounters an error, the current buffer is emitted and the event is propagated.
  725. *
  726. * @param timespan
  727. * The period of time each buffer is collecting values before it should be emitted, and
  728. * replaced with a new buffer.
  729. * @param count
  730. * The maximum size of each buffer before it should be emitted.
  731. * @return
  732. * An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers which are emitted after
  733. * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first).
  734. */
  735. def tumblingBuffer(timespan: Duration, count: Int): Observable[Seq[T]] = {
  736. val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(timespan.length, timespan.unit, count)
  737. Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
  738. }
  739.  
  740. /**
  741. * Creates an Observable which produces buffers of collected values. This Observable produces connected
  742. * non-overlapping buffers, each of a fixed duration specified by the `timespan` argument or a maximum size
  743. * specified by the `count` argument (which ever is reached first). When the source Observable completes
  744. * or encounters an error, the current buffer is emitted and the event is propagated.
  745. *
  746. * @param timespan
  747. * The period of time each buffer is collecting values before it should be emitted, and
  748. * replaced with a new buffer.
  749. * @param count
  750. * The maximum size of each buffer before it should be emitted.
  751. * @param scheduler
  752. * The [[rx.lang.scala.Scheduler]] to use when determining the end and start of a buffer.
  753. * @return
  754. * An [[rx.lang.scala.Observable]] which produces connected non-overlapping buffers which are emitted after
  755. * a fixed duration or when the buffer has reached maximum capacity (which ever occurs first).
  756. */
  757. def tumblingBuffer(timespan: Duration, count: Int, scheduler: Scheduler): Observable[Seq[T]] = {
  758. val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(timespan.length, timespan.unit, count, scheduler)
  759. Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
  760. }
  761.  
  762. /**
  763. * Creates an Observable which produces buffers of collected values. This Observable starts a new buffer
  764. * periodically, which is determined by the `timeshift` argument. Each buffer is emitted after a fixed timespan
  765. * specified by the `timespan` argument. When the source Observable completes or encounters an error, the
  766. * current buffer is emitted and the event is propagated.
  767. *
  768. * @param timespan
  769. * The period of time each buffer is collecting values before it should be emitted.
  770. * @param timeshift
  771. * The period of time after which a new buffer will be created.
  772. * @return
  773. * An [[rx.lang.scala.Observable]] which produces new buffers periodically, and these are emitted after
  774. * a fixed timespan has elapsed.
  775. */
  776. def slidingBuffer(timespan: Duration, timeshift: Duration): Observable[Seq[T]] = {
  777. val span: Long = timespan.toNanos
  778. val shift: Long = timespan.toNanos
  779. val unit: TimeUnit = duration.NANOSECONDS
  780. val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(span, shift, unit)
  781. Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
  782. }
  783.  
  784. /**
  785. * Creates an Observable which produces buffers of collected values. This Observable starts a new buffer
  786. * periodically, which is determined by the `timeshift` argument. Each buffer is emitted after a fixed timespan
  787. * specified by the `timespan` argument. When the source Observable completes or encounters an error, the
  788. * current buffer is emitted and the event is propagated.
  789. *
  790. * @param timespan
  791. * The period of time each buffer is collecting values before it should be emitted.
  792. * @param timeshift
  793. * The period of time after which a new buffer will be created.
  794. * @param scheduler
  795. * The [[rx.lang.scala.Scheduler]] to use when determining the end and start of a buffer.
  796. * @return
  797. * An [[rx.lang.scala.Observable]] which produces new buffers periodically, and these are emitted after
  798. * a fixed timespan has elapsed.
  799. */
  800. def slidingBuffer(timespan: Duration, timeshift: Duration, scheduler: Scheduler): Observable[Seq[T]] = {
  801. val span: Long = timespan.toNanos
  802. val shift: Long = timespan.toNanos
  803. val unit: TimeUnit = duration.NANOSECONDS
  804. val oJava: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer(span, shift, unit, scheduler)
  805. Observable.jObsOfListToScObsOfSeq(oJava.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
  806. }
  807.  
  808. /**
  809. * Returns an Observable that emits non-overlapping buffered items from the source Observable each time the
  810. * specified boundary Observable emits an item.
  811. * <p>
  812. * <img width="640" height="395" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer8.png" alt="" />
  813. * <p>
  814. * Completion of either the source or the boundary Observable causes the returned Observable to emit the
  815. * latest buffer and complete.
  816. *
  817. * @param boundary the boundary Observable. Note: This is a by-name parameter,
  818. * so it is only evaluated when someone subscribes to the returned Observable.
  819. * @return an Observable that emits buffered items from the source Observable when the boundary Observable
  820. * emits an item
  821. */
  822. def tumblingBuffer(boundary: => Observable[Any]): Observable[Seq[T]] = {
  823. val f = new Func0[rx.Observable[_ <: Any]]() {
  824. override def call(): rx.Observable[_ <: Any] = boundary.asJavaObservable
  825. }
  826. toScalaObservable(asJavaObservable.buffer[Any](f)).map(_.asScala)
  827. }
  828.  
  829. /**
  830. * Returns an Observable that emits non-overlapping buffered items from the source Observable each time the
  831. * specified boundary Observable emits an item.
  832. * <p>
  833. * <img width="640" height="395" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/buffer8.png" alt="" />
  834. * <p>
  835. * Completion of either the source or the boundary Observable causes the returned Observable to emit the
  836. * latest buffer and complete.
  837. *
  838. * @param boundary the boundary Observable
  839. * @param initialCapacity the initial capacity of each buffer chunk
  840. * @return an Observable that emits buffered items from the source Observable when the boundary Observable
  841. * emits an item
  842. */
  843. def tumblingBuffer(boundary: Observable[Any], initialCapacity: Int): Observable[Seq[T]] = {
  844. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
  845. toScalaObservable(thisJava.buffer(boundary.asJavaObservable, initialCapacity)).map(_.asScala)
  846. }
  847.  
  848. /**
  849. * Creates an Observable which produces windows of collected values. This Observable produces connected
  850. * non-overlapping windows. The boundary of each window is determined by the items emitted from a specified
  851. * boundary-governing Observable.
  852. *
  853. * <img width="640" height="475" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/window8.png" alt="" />
  854. *
  855. * @param boundary an Observable whose emitted items close and open windows. Note: This is a by-name parameter,
  856. * so it is only evaluated when someone subscribes to the returned Observable.
  857. * @return An Observable which produces connected non-overlapping windows. The boundary of each window is
  858. * determined by the items emitted from a specified boundary-governing Observable.
  859. */
  860. def tumbling(boundary: => Observable[Any]): Observable[Observable[T]] = {
  861. val func = new Func0[rx.Observable[_ <: Any]]() {
  862. override def call(): rx.Observable[_ <: Any] = boundary.asJavaObservable
  863. }
  864. val jo: rx.Observable[_ <: rx.Observable[_ <: T]] = asJavaObservable.window[Any](func)
  865. toScalaObservable(jo).map(toScalaObservable[T](_))
  866. }
  867.  
  868. /**
  869. * Creates an Observable which produces windows of collected values. Chunks are created when the specified `openings`
  870. * Observable produces an object. That object is used to construct an Observable to emit windows, feeding it into `closings` function.
  871. * Windows are emitted when the created Observable produces an object.
  872. *
  873. * @param openings
  874. * The [[rx.lang.scala.Observable]] which when it produces an object, will cause
  875. * another window to be created.
  876. * @param closings
  877. * The function which is used to produce an [[rx.lang.scala.Observable]] for every window created.
  878. * When this [[rx.lang.scala.Observable]] produces an object, the associated window
  879. * is emitted.
  880. * @return
  881. * An [[rx.lang.scala.Observable]] which produces windows which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
  882. */
  883. def sliding[Opening](openings: Observable[Opening])(closings: Opening => Observable[Any]) = {
  884. Observable.jObsOfJObsToScObsOfScObs(
  885. asJavaObservable.window[Opening, Any](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
  886. : Observable[Observable[T]] // SI-7818
  887. }
  888.  
  889. /**
  890. * Creates an Observable which produces windows of collected values. This Observable produces connected
  891. * non-overlapping windows, each containing `count` elements. When the source Observable completes or
  892. * encounters an error, the current window is emitted, and the event is propagated.
  893. *
  894. * @param count
  895. * The maximum size of each window before it should be emitted.
  896. * @return
  897. * An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows containing at most
  898. * `count` produced values.
  899. */
  900. def tumbling(count: Int): Observable[Observable[T]] = {
  901. // this unnecessary ascription is needed because of this bug (without, compiler crashes):
  902. // https://issues.scala-lang.org/browse/SI-7818
  903. Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(count)) : Observable[Observable[T]]
  904. }
  905.  
  906. /**
  907. * Creates an Observable which produces windows of collected values. This Observable produces windows every
  908. * `skip` values, each containing `count` elements. When the source Observable completes or encounters an error,
  909. * the current window is emitted and the event is propagated.
  910. *
  911. * @param count
  912. * The maximum size of each window before it should be emitted.
  913. * @param skip
  914. * How many produced values need to be skipped before starting a new window. Note that when `skip` and
  915. * `count` are equal that this is the same operation as `window(int)`.
  916. * @return
  917. * An [[rx.lang.scala.Observable]] which produces windows every `skip` values containing at most
  918. * `count` produced values.
  919. */
  920. def sliding(count: Int, skip: Int): Observable[Observable[T]] = {
  921. Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(count, skip))
  922. : Observable[Observable[T]] // SI-7818
  923. }
  924.  
  925. /**
  926. * Creates an Observable which produces windows of collected values. This Observable produces connected
  927. * non-overlapping windows, each of a fixed duration specified by the `timespan` argument. When the source
  928. * Observable completes or encounters an error, the current window is emitted and the event is propagated.
  929. *
  930. * @param timespan
  931. * The period of time each window is collecting values before it should be emitted, and
  932. * replaced with a new window.
  933. * @return
  934. * An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows with a fixed duration.
  935. */
  936. def tumbling(timespan: Duration): Observable[Observable[T]] = {
  937. Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(timespan.length, timespan.unit))
  938. : Observable[Observable[T]] // SI-7818
  939. }
  940.  
  941. /**
  942. * Creates an Observable which produces windows of collected values. This Observable produces connected
  943. * non-overlapping windows, each of a fixed duration specified by the `timespan` argument. When the source
  944. * Observable completes or encounters an error, the current window is emitted and the event is propagated.
  945. *
  946. * @param timespan
  947. * The period of time each window is collecting values before it should be emitted, and
  948. * replaced with a new window.
  949. * @param scheduler
  950. * The [[rx.lang.scala.Scheduler]] to use when determining the end and start of a window.
  951. * @return
  952. * An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows with a fixed duration.
  953. */
  954. def tumbling(timespan: Duration, scheduler: Scheduler): Observable[Observable[T]] = {
  955. Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(timespan.length, timespan.unit, scheduler))
  956. : Observable[Observable[T]] // SI-7818
  957. }
  958.  
  959. /**
  960. * Creates an Observable which produces windows of collected values. This Observable produces connected
  961. * non-overlapping windows, each of a fixed duration specified by the `timespan` argument or a maximum size
  962. * specified by the `count` argument (which ever is reached first). When the source Observable completes
  963. * or encounters an error, the current window is emitted and the event is propagated.
  964. *
  965. * @param timespan
  966. * The period of time each window is collecting values before it should be emitted, and
  967. * replaced with a new window.
  968. * @param count
  969. * The maximum size of each window before it should be emitted.
  970. * @return
  971. * An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows which are emitted after
  972. * a fixed duration or when the window has reached maximum capacity (which ever occurs first).
  973. */
  974. def tumbling(timespan: Duration, count: Int): Observable[Observable[T]] = {
  975. Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(timespan.length, timespan.unit, count))
  976. : Observable[Observable[T]] // SI-7818
  977. }
  978.  
  979. /**
  980. * Creates an Observable which produces windows of collected values. This Observable produces connected
  981. * non-overlapping windows, each of a fixed duration specified by the `timespan` argument or a maximum size
  982. * specified by the `count` argument (which ever is reached first). When the source Observable completes
  983. * or encounters an error, the current window is emitted and the event is propagated.
  984. *
  985. * @param timespan
  986. * The period of time each window is collecting values before it should be emitted, and
  987. * replaced with a new window.
  988. * @param count
  989. * The maximum size of each window before it should be emitted.
  990. * @param scheduler
  991. * The [[rx.lang.scala.Scheduler]] to use when determining the end and start of a window.
  992. * @return
  993. * An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows which are emitted after
  994. * a fixed duration or when the window has reached maximum capacity (which ever occurs first).
  995. */
  996. def tumbling(timespan: Duration, count: Int, scheduler: Scheduler): Observable[Observable[T]] = {
  997. Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(timespan.length, timespan.unit, count, scheduler))
  998. : Observable[Observable[T]] // SI-7818
  999. }
  1000.  
  1001. /**
  1002. * Creates an Observable which produces windows of collected values. This Observable starts a new window
  1003. * periodically, which is determined by the `timeshift` argument. Each window is emitted after a fixed timespan
  1004. * specified by the `timespan` argument. When the source Observable completes or encounters an error, the
  1005. * current window is emitted and the event is propagated.
  1006. *
  1007. * @param timespan
  1008. * The period of time each window is collecting values before it should be emitted.
  1009. * @param timeshift
  1010. * The period of time after which a new window will be created.
  1011. * @return
  1012. * An [[rx.lang.scala.Observable]] which produces new windows periodically, and these are emitted after
  1013. * a fixed timespan has elapsed.
  1014. */
  1015. def sliding(timespan: Duration, timeshift: Duration): Observable[Observable[T]] = {
  1016. val span: Long = timespan.toNanos
  1017. val shift: Long = timespan.toNanos
  1018. val unit: TimeUnit = duration.NANOSECONDS
  1019. Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(span, shift, unit))
  1020. : Observable[Observable[T]] // SI-7818
  1021. }
  1022.  
  1023. /**
  1024. * Creates an Observable which produces windows of collected values. This Observable starts a new window
  1025. * periodically, which is determined by the `timeshift` argument. Each window is emitted after a fixed timespan
  1026. * specified by the `timespan` argument. When the source Observable completes or encounters an error, the
  1027. * current window is emitted and the event is propagated.
  1028. *
  1029. * @param timespan
  1030. * The period of time each window is collecting values before it should be emitted.
  1031. * @param timeshift
  1032. * The period of time after which a new window will be created.
  1033. * @param scheduler
  1034. * The [[rx.lang.scala.Scheduler]] to use when determining the end and start of a window.
  1035. * @return
  1036. * An [[rx.lang.scala.Observable]] which produces new windows periodically, and these are emitted after
  1037. * a fixed timespan has elapsed.
  1038. */
  1039. def sliding(timespan: Duration, timeshift: Duration, scheduler: Scheduler): Observable[Observable[T]] = {
  1040. val span: Long = timespan.toNanos
  1041. val shift: Long = timespan.toNanos
  1042. val unit: TimeUnit = duration.NANOSECONDS
  1043. Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(span, shift, unit, scheduler))
  1044. : Observable[Observable[T]] // SI-7818
  1045. }
  1046.  
  1047. /**
  1048. * Returns an Observable that emits windows of items it collects from the source Observable. The resulting
  1049. * Observable starts a new window periodically, as determined by the `timeshift` argument or a maximum
  1050. * size as specified by the `count` argument (whichever is reached first). It emits
  1051. * each window after a fixed timespan, specified by the `timespan` argument. When the source
  1052. * Observable completes or Observable completes or encounters an error, the resulting Observable emits the
  1053. * current window and propagates the notification from the source Observable.
  1054. *
  1055. * <img width="640" height="335" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/window7.s.png" alt="" />
  1056. *
  1057. * ===Backpressure Support:===
  1058. * This operator does not support backpressure as it uses time to control data flow.
  1059. *
  1060. * ===Scheduler:===
  1061. * you specify which `Scheduler` this operator will use
  1062. *
  1063. * @param timespan the period of time each window collects items before it should be emitted
  1064. * @param timeshift the period of time after which a new window will be created
  1065. * @param count the maximum size of each window before it should be emitted
  1066. * @param scheduler the `Scheduler` to use when determining the end and start of a window
  1067. * @return an Observable that emits new windows periodically as a fixed timespan elapses
  1068. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#window">RxJava wiki: window</a>
  1069. * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.window.aspx">MSDN: Observable.Window</a>
  1070. */
  1071. def sliding(timespan: Duration, timeshift: Duration, count: Int, scheduler: Scheduler): Observable[Observable[T]] = {
  1072. val span: Long = timespan.toNanos
  1073. val shift: Long = timespan.toNanos
  1074. val unit: TimeUnit = duration.NANOSECONDS
  1075. Observable.jObsOfJObsToScObsOfScObs(asJavaObservable.window(span, shift, unit, count, scheduler))
  1076. : Observable[Observable[T]] // SI-7818
  1077. }
  1078.  
  1079. /**
  1080. * Returns an Observable which only emits those items for which a given predicate holds.
  1081. *
  1082. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/filter.png" alt="" />
  1083. *
  1084. * @param predicate
  1085. * a function that evaluates the items emitted by the source Observable, returning `true` if they pass the filter
  1086. * @return an Observable that emits only those items in the original Observable that the filter
  1087. * evaluates as `true`
  1088. */
  1089. def filter(predicate: T => Boolean): Observable[T] = {
  1090. toScalaObservable[T](asJavaObservable.filter(predicate))
  1091. }
  1092.  
  1093. /**
  1094. * Registers an function to be called when this Observable invokes [[rx.lang.scala.Observer.onCompleted onCompleted]] or [[rx.lang.scala.Observer.onError onError]].
  1095. *
  1096. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/finallyDo.png" alt="" />
  1097. *
  1098. * @param action
  1099. * an function to be invoked when the source Observable finishes
  1100. * @return an Observable that emits the same items as the source Observable, then invokes the function
  1101. */
  1102. @deprecated("Use [[Observable.doAfterTerminate]] instead", "0.26.1")
  1103. def finallyDo(action: => Unit): Observable[T] = {
  1104. toScalaObservable[T](asJavaObservable.doAfterTerminate(() => action))
  1105. }
  1106.  
  1107. /**
  1108. * Registers an function to be called when this [[Observable]] invokes either [[Observer.onCompleted onCompleted]] or [[Observer.onError onError]].
  1109. *
  1110. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/finallyDo.png" alt="">
  1111. *
  1112. * $noDefaultScheduler
  1113. *
  1114. * @param action an function to be invoked when the source [[Observable]] finishes
  1115. * @return an [[Observable]] that emits the same items as the source [[Observable]], then invokes the `action`
  1116. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  1117. */
  1118. def doAfterTerminate(action: => Unit): Observable[T] = {
  1119. toScalaObservable[T](asJavaObservable.doAfterTerminate(() => action))
  1120. }
  1121.  
  1122. /**
  1123. * Creates a new Observable by applying a function that you supply to each item emitted by
  1124. * the source Observable, where that function returns an Observable, and then merging those
  1125. * resulting Observables and emitting the results of this merger.
  1126. *
  1127. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/flatMap.png" alt="" />
  1128. *
  1129. * @param f
  1130. * a function that, when applied to an item emitted by the source Observable, returns
  1131. * an Observable
  1132. * @return an Observable that emits the result of applying the transformation function to each
  1133. * item emitted by the source Observable and merging the results of the Observables
  1134. * obtained from this transformation.
  1135. */
  1136. def flatMap[R](f: T => Observable[R]): Observable[R] = {
  1137. toScalaObservable[R](asJavaObservable.flatMap[R](new Func1[T, rx.Observable[_ <: R]]{
  1138. def call(t1: T): rx.Observable[_ <: R] = { f(t1).asJavaObservable }
  1139. }))
  1140. }
  1141.  
  1142. /**
  1143. * $beta Returns an [[Observable]] that emits items based on applying a function that you supply to each item emitted
  1144. * by the source [[Observable]] , where that function returns an [[Observable]] , and then merging those resulting
  1145. * [[Observable]]s and emitting the results of this merger, while limiting the maximum number of concurrent
  1146. * subscriptions to these [[Observable]]s.
  1147. *
  1148. * $$noDefaultScheduler
  1149. *
  1150. * @param maxConcurrent the maximum number of [[Observable]]s that may be subscribed to concurrently
  1151. * @param f a function that, when applied to an item emitted by the source [[Observable]], returns an [[Observable]]
  1152. * @return an [[Observable]] that emits the result of applying the transformation function to each item emitted
  1153. * by the source [[Observable]] and merging the results of the [[Observable]]s obtained from this transformation
  1154. * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
  1155. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  1156. */
  1157. @Beta
  1158. def flatMap[R](maxConcurrent: Int, f: T => Observable[R]): Observable[R] = {
  1159. toScalaObservable[R](asJavaObservable.flatMap[R](new Func1[T, rx.Observable[_ <: R]] {
  1160. def call(t1: T): rx.Observable[_ <: R] = {
  1161. f(t1).asJavaObservable
  1162. }
  1163. }, maxConcurrent))
  1164. }
  1165.  
  1166. /**
  1167. * Returns an Observable that applies a function to each item emitted or notification raised by the source
  1168. * Observable and then flattens the Observables returned from these functions and emits the resulting items.
  1169. *
  1170. * <img width="640" height="410" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMap.nce.png" alt="" />
  1171. *
  1172. * @tparam R the result type
  1173. * @param onNext a function that returns an Observable to merge for each item emitted by the source Observable
  1174. * @param onError a function that returns an Observable to merge for an onError notification from the source
  1175. * Observable
  1176. * @param onCompleted a function that returns an Observable to merge for an onCompleted notification from the source
  1177. * Observable
  1178. * @return an Observable that emits the results of merging the Observables returned from applying the
  1179. * specified functions to the emissions and notifications of the source Observable
  1180. */
  1181. def flatMap[R](onNext: T => Observable[R], onError: Throwable => Observable[R], onCompleted: () => Observable[R]): Observable[R] = {
  1182. val jOnNext = new Func1[T, rx.Observable[_ <: R]] {
  1183. override def call(t: T): rx.Observable[_ <: R] = onNext(t).asJavaObservable
  1184. }
  1185. val jOnError = new Func1[Throwable, rx.Observable[_ <: R]] {
  1186. override def call(e: Throwable): rx.Observable[_ <: R] = onError(e).asJavaObservable
  1187. }
  1188. val jOnCompleted = new Func0[rx.Observable[_ <: R]] {
  1189. override def call(): rx.Observable[_ <: R] = onCompleted().asJavaObservable
  1190. }
  1191. toScalaObservable[R](asJavaObservable.flatMap[R](jOnNext, jOnError, jOnCompleted))
  1192. }
  1193.  
  1194. /**
  1195. * $beta Returns an [[Observable]] that applies a function to each item emitted or notification raised by the source
  1196. * [[Observable]] and then flattens the [[Observable]] s returned from these functions and emits the resulting items,
  1197. * while limiting the maximum number of concurrent subscriptions to these [[Observable]]s.
  1198. *
  1199. * $noDefaultScheduler
  1200. *
  1201. * @param maxConcurrent the maximum number of [[Observable]]s that may be subscribed to concurrently
  1202. * @param onNext a function that returns an [[Observable]] to merge for each item emitted by the source [[Observable]]
  1203. * @param onError a function that returns an [[Observable]] to merge for an onError notification from the source [[Observable]]
  1204. * @param onCompleted a function that returns an [[Observable]] to merge for an onCompleted notification from the source [[Observable]]
  1205. * @return an [[Observable]] that emits the results of merging the [[Observable]]s returned from applying the
  1206. * specified functions to the emissions and notifications of the source [[Observable]]
  1207. * @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
  1208. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  1209. */
  1210. @Beta
  1211. def flatMap[R](maxConcurrent: Int, onNext: T => Observable[R], onError: Throwable => Observable[R], onCompleted: () => Observable[R]): Observable[R] = {
  1212. val jOnNext = new Func1[T, rx.Observable[_ <: R]] {
  1213. override def call(t: T): rx.Observable[_ <: R] = onNext(t).asJavaObservable
  1214. }
  1215. val jOnError = new Func1[Throwable, rx.Observable[_ <: R]] {
  1216. override def call(e: Throwable): rx.Observable[_ <: R] = onError(e).asJavaObservable
  1217. }
  1218. val jOnCompleted = new Func0[rx.Observable[_ <: R]] {
  1219. override def call(): rx.Observable[_ <: R] = onCompleted().asJavaObservable
  1220. }
  1221. toScalaObservable[R](asJavaObservable.flatMap[R](jOnNext, jOnError, jOnCompleted, maxConcurrent))
  1222. }
  1223.  
  1224. /**
  1225. * Returns an Observable that emits the results of a specified function to the pair of values emitted by the
  1226. * source Observable and a specified collection Observable.
  1227. *
  1228. * <img width="640" height="390" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMap.r.png" alt="" />
  1229. *
  1230. * @tparam U the type of items emitted by the collection Observable
  1231. * @tparam R the type of items emitted by the resulting Observable
  1232. * @param collectionSelector a function that returns an Observable for each item emitted by the source Observable
  1233. * @param resultSelector a function that combines one item emitted by each of the source and collection Observables and
  1234. * returns an item to be emitted by the resulting Observable
  1235. * @return an Observable that emits the results of applying a function to a pair of values emitted by the
  1236. * source Observable and the collection Observable
  1237. */
  1238. def flatMapWith[U, R](collectionSelector: T => Observable[U])(resultSelector: (T, U) => R): Observable[R] = {
  1239. val jCollectionSelector = new Func1[T, rx.Observable[_ <: U]] {
  1240. override def call(t: T): rx.Observable[_ <: U] = collectionSelector(t).asJavaObservable
  1241. }
  1242. toScalaObservable[R](asJavaObservable.flatMap[U, R](jCollectionSelector, resultSelector))
  1243. }
  1244.  
  1245. /**
  1246. * $beta Returns an Observable that emits the results of a specified function to the pair of values emitted by the
  1247. * source Observable and a specified collection Observable.
  1248. *
  1249. * <img width="640" height="390" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMap.r.png" alt="" />
  1250. *
  1251. * @tparam U the type of items emitted by the collection Observable
  1252. * @tparam R the type of items emitted by the resulting Observable
  1253. * @param maxConcurrent the maximum number of Observables that may be subscribed to concurrently
  1254. * @param collectionSelector a function that returns an Observable for each item emitted by the source Observable
  1255. * @param resultSelector a function that combines one item emitted by each of the source and collection Observables and
  1256. * returns an item to be emitted by the resulting Observable
  1257. * @return an Observable that emits the results of applying a function to a pair of values emitted by the
  1258. * source Observable and the collection Observable
  1259. */
  1260. @Beta
  1261. def flatMapWith[U, R](maxConcurrent: Int, collectionSelector: T => Observable[U])(resultSelector: (T, U) => R): Observable[R] = {
  1262. val jCollectionSelector = new Func1[T, rx.Observable[_ <: U]] {
  1263. override def call(t: T): rx.Observable[_ <: U] = collectionSelector(t).asJavaObservable
  1264. }
  1265. toScalaObservable[R](asJavaObservable.flatMap[U, R](jCollectionSelector, resultSelector, maxConcurrent))
  1266. }
  1267.  
  1268. /**
  1269. * Returns an Observable that merges each item emitted by the source Observable with the values in an
  1270. * Iterable corresponding to that item that is generated by a selector.
  1271. *
  1272. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMapIterable.png" alt="" />
  1273. *
  1274. * @tparam R the type of item emitted by the resulting Observable
  1275. * @param collectionSelector a function that returns an Iterable sequence of values for when given an item emitted by the
  1276. * source Observable
  1277. * @return an Observable that emits the results of merging the items emitted by the source Observable with
  1278. * the values in the Iterables corresponding to those items, as generated by `collectionSelector`
  1279. */
  1280. def flatMapIterable[R](collectionSelector: T => Iterable[R]): Observable[R] = {
  1281. val jCollectionSelector = new Func1[T, java.lang.Iterable[_ <: R]] {
  1282. override def call(t: T): java.lang.Iterable[_ <: R] = collectionSelector(t).asJava
  1283. }
  1284. toScalaObservable[R](asJavaObservable.flatMapIterable[R](jCollectionSelector))
  1285. }
  1286.  
  1287. /**
  1288. * Returns an Observable that emits the results of applying a function to the pair of values from the source
  1289. * Observable and an Iterable corresponding to that item that is generated by a selector.
  1290. *
  1291. * <img width="640" height="390" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMapIterable.r.png" alt="" />
  1292. *
  1293. * @tparam U the collection element type
  1294. * @tparam R the type of item emited by the resulting Observable
  1295. * @param collectionSelector a function that returns an Iterable sequence of values for each item emitted by the source
  1296. * Observable
  1297. * @param resultSelector a function that returns an item based on the item emitted by the source Observable and the
  1298. * Iterable returned for that item by the `collectionSelector`
  1299. * @return an Observable that emits the items returned by `resultSelector` for each item in the source Observable
  1300. */
  1301. def flatMapIterableWith[U, R](collectionSelector: T => Iterable[U])(resultSelector: (T, U) => R): Observable[R] = {
  1302. val jCollectionSelector = new Func1[T, java.lang.Iterable[_ <: U]] {
  1303. override def call(t: T): java.lang.Iterable[_ <: U] = collectionSelector(t).asJava
  1304. }
  1305. toScalaObservable[R](asJavaObservable.flatMapIterable[U, R](jCollectionSelector, resultSelector))
  1306. }
  1307.  
  1308. /**
  1309. * Returns an Observable that applies the given function to each item emitted by an
  1310. * Observable and emits the result.
  1311. *
  1312. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/map.png" alt="" />
  1313. *
  1314. * @param func
  1315. * a function to apply to each item emitted by the Observable
  1316. * @return an Observable that emits the items from the source Observable, transformed by the
  1317. * given function
  1318. */
  1319. def map[R](func: T => R): Observable[R] = {
  1320. toScalaObservable[R](asJavaObservable.map[R](new Func1[T,R] {
  1321. def call(t1: T): R = func(t1)
  1322. }))
  1323. }
  1324.  
  1325. /**
  1326. * Turns all of the notifications from a source Observable into [[rx.lang.scala.Observer.onNext onNext]] emissions,
  1327. * and marks them with their original notification types within [[rx.lang.scala.Notification]] objects.
  1328. *
  1329. * <img width="640" height="315" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/materialize.png" alt="" />
  1330. *
  1331. * @return an Observable whose items are the result of materializing the items and
  1332. * notifications of the source Observable
  1333. */
  1334. def materialize: Observable[Notification[T]] = {
  1335. toScalaObservable[rx.Notification[_ <: T]](asJavaObservable.materialize()).map(Notification(_))
  1336. }
  1337.  
  1338. /**
  1339. * Asynchronously subscribes and unsubscribes Observers on the specified [[rx.lang.scala.Scheduler]].
  1340. *
  1341. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="" />
  1342. *
  1343. * @param scheduler
  1344. * the [[rx.lang.scala.Scheduler]] to perform subscription and unsubscription actions on
  1345. * @return the source Observable modified so that its subscriptions and unsubscriptions happen
  1346. * on the specified [[rx.lang.scala.Scheduler]]
  1347. */
  1348. def subscribeOn(scheduler: Scheduler): Observable[T] = {
  1349. toScalaObservable[T](asJavaObservable.subscribeOn(scheduler))
  1350. }
  1351.  
  1352. /**
  1353. * Asynchronously unsubscribes on the specified [[Scheduler]].
  1354. *
  1355. * @param scheduler the [[Scheduler]] to perform subscription and unsubscription actions on
  1356. * @return the source Observable modified so that its unsubscriptions happen on the specified [[Scheduler]]
  1357. * @since 0.17
  1358. */
  1359. def unsubscribeOn(scheduler: Scheduler): Observable[T] = {
  1360. toScalaObservable[T](asJavaObservable.unsubscribeOn(scheduler))
  1361. }
  1362.  
  1363. /**
  1364. * $experimental Returns an [[Observable]] that requests `n` initially from the upstream and then 75% of `n` subsequently after 75% of `n` values have
  1365. * been emitted to the downstream.
  1366. *
  1367. * This operator allows preventing the downstream to trigger unbounded mode via `request(Long.MaxValue)` or compensate for the per-item
  1368. * overhead of small and frequent requests.
  1369. *
  1370. * ===Backpressure:===
  1371. * The operator expects backpressure from upstream and honors backpressure from downstream.</dd>
  1372. *
  1373. * $noDefaultScheduler
  1374. *
  1375. * @param n the initial request amount, further request will happen after 75% of this value
  1376. * @return the [[Observable]] that rebatches request amounts from downstream
  1377. */
  1378. @Experimental
  1379. def rebatchRequests(n: Int): Observable[T] = {
  1380. toScalaObservable[T](asJavaObservable.rebatchRequests(n))
  1381. }
  1382.  
  1383. /**
  1384. * Asynchronously notify [[rx.lang.scala.Observer]]s on the specified [[rx.lang.scala.Scheduler]].
  1385. *
  1386. * <img width="640" height="308" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="" />
  1387. *
  1388. * @param scheduler
  1389. * the [[rx.lang.scala.Scheduler]] to notify [[rx.lang.scala.Observer]]s on
  1390. * @return the source Observable modified so that its [[rx.lang.scala.Observer]]s are notified on the
  1391. * specified [[rx.lang.scala.Scheduler]]
  1392. */
  1393. def observeOn(scheduler: Scheduler): Observable[T] = {
  1394. toScalaObservable[T](asJavaObservable.observeOn(scheduler))
  1395. }
  1396.  
  1397. /**
  1398. * Return an [[Observable]] to perform its emissions and notifications on a specified [[Scheduler]],
  1399. * asynchronously with a bounded buffer and optionally delays [[Observer.onError onError]] notifications.
  1400. *
  1401. * <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
  1402. *
  1403. * ===Scheduler:===
  1404. * you specify which [[Scheduler]] this operator will use
  1405. *
  1406. * @param scheduler the [[Scheduler]] to notify [[Observer]]s on
  1407. * @param delayError indicates if the [[Observer.onError onError]] notification may not cut ahead of onNext notification on the
  1408. * other side of the scheduling boundary. If true a sequence ending in onError will be replayed in the same order
  1409. * as was received from upstream
  1410. * @return the source [[Observable]] that its [[Observer]]s are notified on the specified [[Scheduler]]
  1411. * @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
  1412. * @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
  1413. */
  1414. def observeOn(scheduler: Scheduler, delayError: Boolean): Observable[T] = {
  1415. toScalaObservable[T](asJavaObservable.observeOn(scheduler, delayError))
  1416. }
  1417.  
  1418. /**
  1419. * REturns an [[Observable]] to perform its emissions and notifications on a specified [[Scheduler]],
  1420. * asynchronously with a bounded buffer of configurable size.
  1421. *
  1422. * Note that `onError` notifications will cut ahead of `onNext` notifications on the emission thread if [[Scheduler]] is truly
  1423. * asynchronous. If strict event ordering is required, consider using the
  1424. * [[Observable.observeOn(scheduler:rx\.lang\.scala\.Scheduler,delayError:Boolean)*]] overload.
  1425. *
  1426. * <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="" />
  1427. *
  1428. * ===Scheduler:===
  1429. * you specify which [[Scheduler]] this operator will use
  1430. *
  1431. * @param scheduler the [[Scheduler]] to notify [[Observer]]s on
  1432. * @param bufferSize the size of the buffer.
  1433. * @return the source [[Observable]] modified so that its [[Observer]]s are notified on the specified [[Scheduler]]
  1434. * @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
  1435. * @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
  1436. */
  1437. def observeOn(scheduler: Scheduler, bufferSize: Int): Observable[T] = {
  1438. toScalaObservable[T](asJavaObservable.observeOn(scheduler, bufferSize))
  1439. }
  1440.  
  1441. /**
  1442. * Returns an [[Observable]] to perform its emissions and notifications on a specified {@link Scheduler},
  1443. * asynchronously with a bounded buffer of configurable size and optionally delays `onError` notifications.
  1444. *
  1445. * <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="" />
  1446. *
  1447. * ===Scheduler:===
  1448. * you specify which [[Scheduler]] this operator will use
  1449. *
  1450. * @param scheduler the [[Scheduler]] to notify [[Observer]]s on
  1451. * @param delayError indicates if the `onError` notification may not cut ahead of `onNext` notification on the other side of the
  1452. * scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received from upstream
  1453. * @param bufferSize the size of the buffer
  1454. * @return the source [[Observable]] modified so that its [[Observer]]s are notified on the specified [[Scheduler]]
  1455. * @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
  1456. * @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
  1457. */
  1458. def observeOn(scheduler: Scheduler, delayError: Boolean, bufferSize: Int): Observable[T] = {
  1459. toScalaObservable[T](asJavaObservable.observeOn(scheduler, delayError, bufferSize))
  1460. }
  1461.  
  1462. /**
  1463. * Returns an Observable that reverses the effect of [[rx.lang.scala.Observable.materialize]] by
  1464. * transforming the [[rx.lang.scala.Notification]] objects emitted by the source Observable into the items
  1465. * or notifications they represent.
  1466. *
  1467. * This operation is only available if `this` is of type `Observable[Notification[U]]` for some `U`,
  1468. * otherwise you will get a compilation error.
  1469. *
  1470. * <img width="640" height="335" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/dematerialize.png" alt="" />
  1471. *
  1472. * @return an Observable that emits the items and notifications embedded in the [[rx.lang.scala.Notification]] objects emitted by the source Observable
  1473. *
  1474. * @usecase def dematerialize[U]: Observable[U]
  1475. * @inheritdoc
  1476. *
  1477. */
  1478. // with =:= it does not work, why?
  1479. def dematerialize[U](implicit evidence: Observable[T] <:< Observable[Notification[U]]): Observable[U] = {
  1480. val o1: Observable[Notification[U]] = this
  1481. val o2: Observable[rx.Notification[_ <: U]] = o1.map(_.asJavaNotification)
  1482. val o3 = o2.asJavaObservable.dematerialize[U]()
  1483. toScalaObservable[U](o3)
  1484. }
  1485.  
  1486. /**
  1487. * Instruct an Observable to pass control to another Observable rather than invoking [[rx.lang.scala.Observer.onError onError]] if it encounters an error.
  1488. *
  1489. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorResumeNext.png" alt="" />
  1490. *
  1491. * By default, when an Observable encounters an error that prevents it from emitting the
  1492. * expected item to its [[rx.lang.scala.Observer]], the Observable invokes its Observer's
  1493. * `onError` method, and then quits without invoking any more of its Observer's
  1494. * methods. The `onErrorResumeNext` method changes this behavior. If you pass a
  1495. * function that returns an Observable (`resumeFunction`) to
  1496. * `onErrorResumeNext`, if the original Observable encounters an error, instead of
  1497. * invoking its Observer's `onError` method, it will instead relinquish control to
  1498. * the Observable returned from `resumeFunction`, which will invoke the Observer's
  1499. * [[rx.lang.scala.Observer.onNext onNext]] method if it is able to do so. In such a case, because no
  1500. * Observable necessarily invokes `onError`, the Observer may never know that an
  1501. * error happened.
  1502. *
  1503. * You can use this to prevent errors from propagating or to supply fallback data should errors
  1504. * be encountered.
  1505. *
  1506. * @param resumeFunction
  1507. * a function that returns an Observable that will take over if the source Observable
  1508. * encounters an error
  1509. * @return the original Observable, with appropriately modified behavior
  1510. */
  1511. def onErrorResumeNext[U >: T](resumeFunction: Throwable => Observable[U]): Observable[U] = {
  1512. val f: Func1[Throwable, rx.Observable[_ <: U]] = (t: Throwable) => resumeFunction(t).asJavaObservable
  1513. val f2 = f.asInstanceOf[Func1[Throwable, rx.Observable[Nothing]]]
  1514. toScalaObservable[U](asJavaObservable.onErrorResumeNext(f2))
  1515. }
  1516.  
  1517. /**
  1518. * Instruct an Observable to pass control to another Observable rather than invoking [[rx.lang.scala.Observer.onError onError]] if it encounters an error of type `java.lang.Exception`.
  1519. *
  1520. * This differs from `Observable.onErrorResumeNext` in that this one does not handle `java.lang.Throwable` or `java.lang.Error` but lets those continue through.
  1521. *
  1522. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorResumeNext.png" alt="" />
  1523. *
  1524. * By default, when an Observable encounters an error that prevents it from emitting the
  1525. * expected item to its [[rx.lang.scala.Observer]], the Observable invokes its Observer's
  1526. * `onError` method, and then quits without invoking any more of its Observer's
  1527. * methods. The `onErrorResumeNext` method changes this behavior. If you pass
  1528. * another Observable (`resumeSequence`) to an Observable's
  1529. * `onErrorResumeNext` method, if the original Observable encounters an error,
  1530. * instead of invoking its Observer's `onError` method, it will instead relinquish
  1531. * control to `resumeSequence` which will invoke the Observer's [[rx.lang.scala.Observer.onNext onNext]]
  1532. * method if it is able to do so. In such a case, because no
  1533. * Observable necessarily invokes `onError`, the Observer may never know that an
  1534. * error happened.
  1535. *
  1536. * You can use this to prevent errors from propagating or to supply fallback data should errors
  1537. * be encountered.
  1538. *
  1539. * @param resumeSequence
  1540. * a function that returns an Observable that will take over if the source Observable
  1541. * encounters an error
  1542. * @return the original Observable, with appropriately modified behavior
  1543. */
  1544. def onExceptionResumeNext[U >: T](resumeSequence: Observable[U]): Observable[U] = {
  1545. val rSeq1: rx.Observable[_ <: U] = resumeSequence.asJavaObservable
  1546. val rSeq2: rx.Observable[Nothing] = rSeq1.asInstanceOf[rx.Observable[Nothing]]
  1547. toScalaObservable[U](asJavaObservable.onExceptionResumeNext(rSeq2))
  1548. }
  1549.  
  1550. /**
  1551. * Instruct an Observable to emit an item (returned by a specified function) rather than
  1552. * invoking [[rx.lang.scala.Observer.onError onError]] if it encounters an error.
  1553. *
  1554. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorReturn.png" alt="" />
  1555. *
  1556. * By default, when an Observable encounters an error that prevents it from emitting the
  1557. * expected item to its [[rx.lang.scala.Observer]], the Observable invokes its Observer's
  1558. * `onError` method, and then quits without invoking any more of its Observer's
  1559. * methods. The `onErrorReturn` method changes this behavior. If you pass a function
  1560. * (`resumeFunction`) to an Observable's `onErrorReturn` method, if the
  1561. * original Observable encounters an error, instead of invoking its Observer's
  1562. * `onError` method, it will instead pass the return value of
  1563. * `resumeFunction` to the Observer's [[rx.lang.scala.Observer.onNext onNext]] method.
  1564. *
  1565. * You can use this to prevent errors from propagating or to supply fallback data should errors
  1566. * be encountered.
  1567. *
  1568. * @param resumeFunction
  1569. * a function that returns an item that the new Observable will emit if the source
  1570. * Observable encounters an error
  1571. * @return the original Observable with appropriately modified behavior
  1572. */
  1573. def onErrorReturn[U >: T](resumeFunction: Throwable => U): Observable[U] = {
  1574. val f1: Func1[Throwable, _ <: U] = resumeFunction
  1575. val f2 = f1.asInstanceOf[Func1[Throwable, Nothing]]
  1576. toScalaObservable[U](asJavaObservable.onErrorReturn(f2))
  1577. }
  1578.  
  1579. /**
  1580. * Returns an Observable that applies a function of your choosing to the first item emitted by a
  1581. * source Observable, then feeds the result of that function along with the second item emitted
  1582. * by the source Observable into the same function, and so on until all items have been emitted
  1583. * by the source Observable, and emits the final result from the final call to your function as
  1584. * its sole item.
  1585. *
  1586. * <img width="640" height="320" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/reduce.png" alt="" />
  1587. *
  1588. * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold,"
  1589. * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance,
  1590. * has an `inject` method that does a similar operation on lists.
  1591. *
  1592. * @param accumulator
  1593. * An accumulator function to be invoked on each item emitted by the source
  1594. * Observable, whose result will be used in the next accumulator call
  1595. * @return an Observable that emits a single item that is the result of accumulating the
  1596. * output from the source Observable
  1597. */
  1598. def reduce[U >: T](accumulator: (U, U) => U): Observable[U] = {
  1599. val func: Func2[_ >: U, _ >: U, _ <: U] = accumulator
  1600. val func2 = func.asInstanceOf[Func2[T, T, T]]
  1601. toScalaObservable[U](asJavaObservable.asInstanceOf[rx.Observable[T]].reduce(func2))
  1602. }
  1603.  
  1604. /**
  1605. * Returns a [[rx.lang.scala.observables.ConnectableObservable]] that shares a single subscription to the underlying
  1606. * Observable that will replay all of its items and notifications to any future [[rx.lang.scala.Observer]].
  1607. *
  1608. * <img width="640" height="515" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.png" alt="" />
  1609. *
  1610. * @return a [[rx.lang.scala.observables.ConnectableObservable]] such that when the `connect` function
  1611. * is called, the [[rx.lang.scala.observables.ConnectableObservable]] starts to emit items to its [[rx.lang.scala.Observer]]s
  1612. */
  1613. def replay: ConnectableObservable[T] = {
  1614. new ConnectableObservable[T](asJavaObservable.replay())
  1615. }
  1616.  
  1617. /**
  1618. * Returns an Observable that emits items that are the results of invoking a specified selector on the items
  1619. * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable.
  1620. * <p>
  1621. * <img width="640" height="450" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.f.png" alt="" />
  1622. *
  1623. * @param selector the selector function, which can use the multicasted sequence as many times as needed, without
  1624. * causing multiple subscriptions to the Observable
  1625. * @return an Observable that emits items that are the results of invoking the selector on a `ConnectableObservable`
  1626. * that shares a single subscription to the source Observable
  1627. */
  1628. def replay[R](selector: Observable[T] => Observable[R]): Observable[R] = {
  1629. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
  1630. val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
  1631. (jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
  1632. toScalaObservable[R](thisJava.replay(fJava))
  1633. }
  1634.  
  1635. /**
  1636. * Returns an Observable that emits items that are the results of invoking a specified selector on items
  1637. * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
  1638. * replaying `bufferSize` notifications.
  1639. * <p>
  1640. * <img width="640" height="440" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.fn.png" alt="" />
  1641. *
  1642. * @param selector the selector function, which can use the multicasted sequence as many times as needed, without
  1643. * causing multiple subscriptions to the Observable
  1644. * @param bufferSize the buffer size that limits the number of items the connectable observable can replay
  1645. * @return an Observable that emits items that are the results of invoking the selector on items emitted by
  1646. * a `ConnectableObservable` that shares a single subscription to the source Observable replaying
  1647. * no more than `bufferSize` items
  1648. */
  1649. def replay[R](selector: Observable[T] => Observable[R], bufferSize: Int): Observable[R] = {
  1650. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
  1651. val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
  1652. (jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
  1653. toScalaObservable[R](thisJava.replay(fJava, bufferSize))
  1654. }
  1655.  
  1656. /**
  1657. * Returns an Observable that emits items that are the results of invoking a specified selector on items
  1658. * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
  1659. * replaying no more than `bufferSize` items that were emitted within a specified time window.
  1660. * <p>
  1661. * <img width="640" height="445" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.fnt.png" alt="" />
  1662. *
  1663. * @param selector a selector function, which can use the multicasted sequence as many times as needed, without
  1664. * causing multiple subscriptions to the Observable
  1665. * @param bufferSize the buffer size that limits the number of items the connectable observable can replay
  1666. * @param time the duration of the window in which the replayed items must have been emitted
  1667. * @return an Observable that emits items that are the results of invoking the selector on items emitted by
  1668. * a `ConnectableObservable` that shares a single subscription to the source Observable, and
  1669. * replays no more than `bufferSize` items that were emitted within the window defined by `time`
  1670. */
  1671. def replay[R](selector: Observable[T] => Observable[R], bufferSize: Int, time: Duration): Observable[R] = {
  1672. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
  1673. val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
  1674. (jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
  1675. toScalaObservable[R](thisJava.replay(fJava, bufferSize, time.length, time.unit))
  1676. }
  1677.  
  1678. /**
  1679. * Returns an Observable that emits items that are the results of invoking a specified selector on items
  1680. * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
  1681. * replaying no more than `bufferSize` items that were emitted within a specified time window.
  1682. * <p>
  1683. * <img width="640" height="445" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.fnts.png" alt="" />
  1684. *
  1685. * @param selector a selector function, which can use the multicasted sequence as many times as needed, without
  1686. * causing multiple subscriptions to the Observable
  1687. * @param bufferSize the buffer size that limits the number of items the connectable observable can replay
  1688. * @param time the duration of the window in which the replayed items must have been emitted
  1689. * @param scheduler the Scheduler that is the time source for the window
  1690. * @return an Observable that emits items that are the results of invoking the selector on items emitted by
  1691. * a `ConnectableObservable` that shares a single subscription to the source Observable, and
  1692. * replays no more than `bufferSize` items that were emitted within the window defined by `time`
  1693. * @throws java.lang.IllegalArgumentException if `bufferSize` is less than zero
  1694. */
  1695. def replay[R](selector: Observable[T] => Observable[R], bufferSize: Int, time: Duration, scheduler: Scheduler): Observable[R] = {
  1696. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
  1697. val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
  1698. (jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
  1699. toScalaObservable[R](thisJava.replay(fJava, bufferSize, time.length, time.unit, scheduler))
  1700. }
  1701.  
  1702. /**
  1703. * Returns an Observable that emits items that are the results of invoking a specified selector on items
  1704. * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
  1705. * replaying a maximum of `bufferSize` items.
  1706. * <p>
  1707. * <img width="640" height="440" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.fns.png" alt="" />
  1708. *
  1709. * @param selector a selector function, which can use the multicasted sequence as many times as needed, without
  1710. * causing multiple subscriptions to the Observable
  1711. * @param bufferSize the buffer size that limits the number of items the connectable observable can replay
  1712. * @param scheduler the Scheduler on which the replay is observed
  1713. * @return an Observable that emits items that are the results of invoking the selector on items emitted by
  1714. * a `ConnectableObservable` that shares a single subscription to the source Observable,
  1715. * replaying no more than `bufferSize` notifications
  1716. */
  1717. def replay[R](selector: Observable[T] => Observable[R], bufferSize: Int, scheduler: Scheduler): Observable[R] = {
  1718. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
  1719. val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
  1720. (jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
  1721. toScalaObservable[R](thisJava.replay(fJava, bufferSize, scheduler))
  1722. }
  1723.  
  1724. /**
  1725. * Returns an Observable that emits items that are the results of invoking a specified selector on items
  1726. * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
  1727. * replaying all items that were emitted within a specified time window.
  1728. * <p>
  1729. * <img width="640" height="435" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.ft.png" alt="" />
  1730. *
  1731. * @param selector a selector function, which can use the multicasted sequence as many times as needed, without
  1732. * causing multiple subscriptions to the Observable
  1733. * @param time the duration of the window in which the replayed items must have been emitted
  1734. * @return an Observable that emits items that are the results of invoking the selector on items emitted by
  1735. * a `ConnectableObservable` that shares a single subscription to the source Observable,
  1736. * replaying all items that were emitted within the window defined by `time`
  1737. */
  1738. def replay[R](selector: Observable[T] => Observable[R], time: Duration, scheduler: Scheduler): Observable[R] = {
  1739. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
  1740. val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
  1741. (jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
  1742. toScalaObservable[R](thisJava.replay(fJava, time.length, time.unit, scheduler))
  1743. }
  1744.  
  1745. /**
  1746. * Returns an Observable that emits items that are the results of invoking a specified selector on items
  1747. * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable.
  1748. * <p>
  1749. * <img width="640" height="445" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.fs.png" alt="" />
  1750. *
  1751. * @param selector a selector function, which can use the multicasted sequence as many times as needed, without
  1752. * causing multiple subscriptions to the Observable
  1753. * @param scheduler the Scheduler where the replay is observed
  1754. * @return an Observable that emits items that are the results of invoking the selector on items emitted by
  1755. * a `ConnectableObservable` that shares a single subscription to the source Observable,
  1756. * replaying all items
  1757. */
  1758. def replay[R](selector: Observable[T] => Observable[R], scheduler: Scheduler): Observable[R] = {
  1759. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
  1760. val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
  1761. (jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
  1762. toScalaObservable[R](thisJava.replay(fJava, scheduler))
  1763. }
  1764.  
  1765. /**
  1766. * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
  1767. * replays at most `bufferSize` items that were emitted during a specified time window.
  1768. * <p>
  1769. * <img width="640" height="515" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.nt.png" alt="" />
  1770. *
  1771. * @param bufferSize the buffer size that limits the number of items that can be replayed
  1772. * @param time the duration of the window in which the replayed items must have been emitted
  1773. * @return a `ConnectableObservable` that shares a single subscription to the source Observable and
  1774. * replays at most `bufferSize` items that were emitted during the window defined by `time`
  1775. */
  1776. def replay(bufferSize: Int, time: Duration): ConnectableObservable[T] = {
  1777. new ConnectableObservable[T](asJavaObservable.replay(bufferSize, time.length, time.unit))
  1778. }
  1779.  
  1780. /**
  1781. * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
  1782. * that replays a maximum of `bufferSize` items that are emitted within a specified time window.
  1783. * <p>
  1784. * <img width="640" height="515" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.nts.png" alt="" />
  1785. *
  1786. * @param bufferSize the buffer size that limits the number of items that can be replayed
  1787. * @param time the duration of the window in which the replayed items must have been emitted
  1788. * @param scheduler the scheduler that is used as a time source for the window
  1789. * @return a `ConnectableObservable` that shares a single subscription to the source Observable and
  1790. * replays at most `bufferSize` items that were emitted during the window defined by `time`
  1791. *@throws java.lang.IllegalArgumentException if `bufferSize` is less than zero
  1792. */
  1793. def replay(bufferSize: Int, time: Duration, scheduler: Scheduler): ConnectableObservable[T] = {
  1794. new ConnectableObservable[T](asJavaObservable.replay(bufferSize, time.length, time.unit, scheduler))
  1795. }
  1796.  
  1797. /**
  1798. * Returns an Observable that emits items that are the results of invoking a specified selector on items
  1799. * emitted by a `ConnectableObservable` that shares a single subscription to the source Observable,
  1800. * replaying all items that were emitted within a specified time window.
  1801. * <p>
  1802. * <img width="640" height="435" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.ft.png" alt="" />
  1803. *
  1804. * @param selector a selector function, which can use the multicasted sequence as many times as needed, without
  1805. * causing multiple subscriptions to the Observable
  1806. * @param time the duration of the window in which the replayed items must have been emitted
  1807. * @return an Observable that emits items that are the results of invoking the selector on items emitted by
  1808. * a `ConnectableObservable` that shares a single subscription to the source Observable,
  1809. * replaying all items that were emitted within the window defined by `time`
  1810. */
  1811. def replay[R](selector: Observable[T] => Observable[R], time: Duration): Observable[R] = {
  1812. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
  1813. val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
  1814. (jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
  1815. toScalaObservable[R](thisJava.replay(fJava, time.length, time.unit))
  1816. }
  1817.  
  1818. /**
  1819. * Returns a `ConnectableObservable` that shares a single subscription to the source Observable that
  1820. * replays at most `bufferSize` items emitted by that Observable.
  1821. * <p>
  1822. * <img width="640" height="515" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.n.png" alt="" />
  1823. *
  1824. * @param bufferSize the buffer size that limits the number of items that can be replayed
  1825. * @return a `ConnectableObservable` that shares a single subscription to the source Observable and
  1826. * replays at most `bufferSize` items emitted by that Observable
  1827. */
  1828. def replay(bufferSize: Int): ConnectableObservable[T] = {
  1829. new ConnectableObservable[T](asJavaObservable.replay(bufferSize))
  1830. }
  1831.  
  1832. /**
  1833. * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
  1834. * replays at most `bufferSize` items emitted by that Observable.
  1835. * <p>
  1836. * <img width="640" height="515" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.ns.png" alt="" />
  1837. *
  1838. * @param bufferSize the buffer size that limits the number of items that can be replayed
  1839. * @param scheduler the scheduler on which the Observers will observe the emitted items
  1840. * @return a `ConnectableObservable` that shares a single subscription to the source Observable and
  1841. * replays at most `bufferSize` items that were emitted by the Observable
  1842. */
  1843. def replay(bufferSize: Int, scheduler: Scheduler): ConnectableObservable[T] = {
  1844. new ConnectableObservable[T](asJavaObservable.replay(bufferSize, scheduler))
  1845. }
  1846.  
  1847. /**
  1848. * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
  1849. * replays all items emitted by that Observable within a specified time window.
  1850. * <p>
  1851. * <img width="640" height="515" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.t.png" alt="" />
  1852. *
  1853. * @param time the duration of the window in which the replayed items must have been emitted
  1854. * @return a `ConnectableObservable` that shares a single subscription to the source Observable and
  1855. * replays the items that were emitted during the window defined by `time`
  1856. */
  1857. def replay(time: Duration): ConnectableObservable[T] = {
  1858. new ConnectableObservable[T](asJavaObservable.replay(time.length, time.unit))
  1859. }
  1860.  
  1861. /**
  1862. * Returns a `ConnectableObservable` that shares a single subscription to the source Observable and
  1863. * replays all items emitted by that Observable within a specified time window.
  1864. * <p>
  1865. * <img width="640" height="515" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.ts.png" alt="" />
  1866. *
  1867. * @param time the duration of the window in which the replayed items must have been emitted
  1868. * @param scheduler the Scheduler that is the time source for the window
  1869. * @return a `ConnectableObservable` that shares a single subscription to the source Observable and
  1870. * replays the items that were emitted during the window defined by `time`
  1871. */
  1872. def replay(time: Duration, scheduler: Scheduler): ConnectableObservable[T] = {
  1873. new ConnectableObservable[T](asJavaObservable.replay(time.length, time.unit, scheduler))
  1874. }
  1875.  
  1876. /**
  1877. * Returns a `ConnectableObservable` that shares a single subscription to the source Observable that
  1878. * will replay all of its items and notifications to any future `Observer` on the given `Scheduler`.
  1879. * <p>
  1880. * <img width="640" height="515" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/replay.s.png" alt="" />
  1881. *
  1882. * @param scheduler the Scheduler on which the Observers will observe the emitted items
  1883. * @return a `ConnectableObservable` that shares a single subscription to the source Observable that
  1884. * will replay all of its items and notifications to any future `bserver` on the given `Scheduler`
  1885. */
  1886. def replay(scheduler: Scheduler): ConnectableObservable[T] = {
  1887. new ConnectableObservable[T](asJavaObservable.replay(scheduler))
  1888. }
  1889.  
  1890. /**
  1891. * This method has similar behavior to `Observable.replay` except that this auto-subscribes to
  1892. * the source Observable rather than returning a start function and an Observable.
  1893. *
  1894. * <img width="640" height="410" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/cache.png" alt="" />
  1895. *
  1896. * This is useful when you want an Observable to cache responses and you can't control the
  1897. * subscribe/unsubscribe behavior of all the [[rx.lang.scala.Observer]]s.
  1898. *
  1899. * When you call `cache`, it does not yet subscribe to the
  1900. * source Observable. This only happens when `subscribe` is called
  1901. * the first time on the Observable returned by `cache`.
  1902. *
  1903. * Note: You sacrifice the ability to unsubscribe from the origin when you use the
  1904. * `cache()` operator so be careful not to use this operator on Observables that
  1905. * emit an infinite or very large number of items that will use up memory.
  1906. *
  1907. * @return an Observable that when first subscribed to, caches all of its notifications for
  1908. * the benefit of subsequent subscribers.
  1909. */
  1910. def cache: Observable[T] = {
  1911. toScalaObservable[T](asJavaObservable.cache())
  1912. }
  1913.  
  1914. /**
  1915. * Caches emissions from the source Observable and replays them in order to any subsequent Subscribers.
  1916. * This method has similar behavior to `Observable.replay` except that this auto-subscribes to the source
  1917. * Observable rather than returning a [[rx.lang.scala.observables.ConnectableObservable ConnectableObservable]] for which you must call
  1918. * `connect` to activate the subscription.
  1919. * <p>
  1920. * <img width="640" height="410" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/cache.png" alt="" />
  1921. * <p>
  1922. * This is useful when you want an Observable to cache responses and you can't control the
  1923. * `subscribe/unsubscribe` behavior of all the [[Subscriber]]s.
  1924. * <p>
  1925. * When you call `cache`, it does not yet subscribe to the source Observable and so does not yet
  1926. * begin cacheing items. This only happens when the first Subscriber calls the resulting Observable's
  1927. * `subscribe` method.
  1928. * <p>
  1929. * <em>Note:</em> You sacrifice the ability to unsubscribe from the origin when you use the `cache`
  1930. * Observer so be careful not to use this Observer on Observables that emit an infinite or very large number
  1931. * of items that will use up memory.
  1932. *
  1933. * ===Backpressure Support:===
  1934. * This operator does not support upstream backpressure as it is purposefully requesting and caching everything emitted.
  1935. *
  1936. * ===Scheduler:===
  1937. * `cache` does not operate by default on a particular `Scheduler`.
  1938. *
  1939. * @param capacity hint for number of items to cache (for optimizing underlying data structure)
  1940. * @return an Observable that, when first subscribed to, caches all of its items and notifications for the
  1941. * benefit of subsequent subscribers
  1942. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#cache">RxJava wiki: cache</a>
  1943. * @since 0.20
  1944. */
  1945. @deprecated("Use [[Observable.cacheWithInitialCapacity]] instead", "0.26.1")
  1946. def cache(capacity: Int): Observable[T] = {
  1947. toScalaObservable[T](asJavaObservable.cacheWithInitialCapacity(capacity))
  1948. }
  1949.  
  1950. /**
  1951. * Caches emissions from the source [[Observable]] and replays them in order to any subsequent [[Subscriber]]s.
  1952. * This method has similar behavior to [[Observable.replay:* replay]] except that this auto-subscribes to the source
  1953. * [[Observable]] rather than returning a [[rx.lang.scala.observables.ConnectableObservable ConnectableObservable]]
  1954. * for which you must call [[rx.lang.scala.observables.ConnectableObservable.connect connect]] to activate the subscription.
  1955. *
  1956. * <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/cache.png" alt="">
  1957. *
  1958. * This is useful when you want an [[Observable]] to cache responses and you can't control the
  1959. * subscribe/unsubscribe behavior of all the [[Subscriber]]s.
  1960. *
  1961. * When you call this method, it does not yet subscribe to the source Observable and so does not yet
  1962. * begin caching items. This only happens when the first [[Subscriber]] calls the resulting [[Observable]]'s `subscribe` method.
  1963. *
  1964. * **Note:** You sacrifice the ability to unsubscribe from the origin when you use the this method.
  1965. * So be careful not to use this method on [[Observable]]s that emit an infinite or very large number
  1966. * of items that will use up memory.
  1967. *
  1968. * ===Backpressure Support:===
  1969. * This operator does not support upstream backpressure as it is purposefully requesting and caching
  1970. * everything emitted.
  1971. *
  1972. * $noDefaultScheduler
  1973. *
  1974. * **Note:** The `capacity` hint is not an upper bound on cache size. For that, consider
  1975. * [[Observable.replay(bufferSize:Int):* replay(Int)]] in combination with
  1976. * [[rx.lang.scala.observables.ConnectableObservable.autoConnect:* ConnectableObservable.autoConnect]] or similar.
  1977. *
  1978. * @param capacity hint for number of items to cache (for optimizing underlying data structure)
  1979. * @return an [[Observable]] that, when first subscribed to, caches all of its items and notifications for the
  1980. * benefit of subsequent [[Subscriber]]s
  1981. * @see <a href="http://reactivex.io/documentation/operators/replay.html">ReactiveX operators documentation: Replay</a>
  1982. */
  1983. def cacheWithInitialCapacity(capacity: Int): Observable[T] = {
  1984. toScalaObservable[T](asJavaObservable.cacheWithInitialCapacity(capacity))
  1985. }
  1986.  
  1987. /**
  1988. * Returns a new [[Observable]] that multicasts (shares) the original [[Observable]]. As long a
  1989. * there is more than 1 [[Subscriber]], this [[Observable]] will be subscribed and emitting data.
  1990. * When all subscribers have unsubscribed it will unsubscribe from the source [[Observable]].
  1991. *
  1992. * This is an alias for `publish().refCount()`
  1993. *
  1994. * <img width="640" height="510" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/publishRefCount.png" alt="" />
  1995. *
  1996. * @return a [[Observable]] that upon connection causes the source Observable to emit items to its [[Subscriber]]s
  1997. * @since 0.19
  1998. */
  1999. def share: Observable[T] = {
  2000. toScalaObservable[T](asJavaObservable.share())
  2001. }
  2002.  
  2003. /**
  2004. * Returns an Observable that emits a Boolean that indicates whether the source Observable emitted a
  2005. * specified item.
  2006. *
  2007. * Note: this method uses `==` to compare elements. It's a bit different from RxJava which uses `Object.equals`.
  2008. * <p>
  2009. * <img width="640" height="320" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/contains.png" alt="" />
  2010. *
  2011. *@param elem the item to search for in the emissions from the source Observable
  2012. * @return an Observable that emits `true` if the specified item is emitted by the source Observable,
  2013. * or `false` if the source Observable completes without emitting that item
  2014. */
  2015. def contains[U >: T](elem: U): Observable[Boolean] = {
  2016. exists(_ == elem)
  2017. }
  2018.  
  2019. /**
  2020. * Returns a [[rx.lang.scala.observables.ConnectableObservable]], which waits until the `connect` function is called
  2021. * before it begins emitting items from `this` [[rx.lang.scala.Observable]] to those [[rx.lang.scala.Observer]]s that
  2022. * have subscribed to it.
  2023. *
  2024. * <img width="640" height="510" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.png" alt="" />
  2025. *
  2026. * @return an [[rx.lang.scala.observables.ConnectableObservable]].
  2027. */
  2028. def publish: ConnectableObservable[T] = {
  2029. new ConnectableObservable[T](asJavaObservable.publish())
  2030. }
  2031.  
  2032. /**
  2033. * Returns an Observable that emits the results of invoking a specified selector on items emitted by a `ConnectableObservable`
  2034. * that shares a single subscription to the underlying sequence.
  2035. * <p>
  2036. * <img width="640" height="510" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.f.png" alt="" />
  2037. *
  2038. * @param selector a function that can use the multicasted source sequence as many times as needed, without
  2039. * causing multiple subscriptions to the source sequence. Subscribers to the given source will
  2040. * receive all notifications of the source from the time of the subscription forward.
  2041. * @return an Observable that emits the results of invoking the selector on the items emitted by a `ConnectableObservable`
  2042. * that shares a single subscription to the underlying sequence
  2043. */
  2044. def publish[R](selector: Observable[T] => Observable[R]): Observable[R] = {
  2045. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
  2046. val fJava: Func1[rx.Observable[T], rx.Observable[R]] =
  2047. (jo: rx.Observable[T]) => selector(toScalaObservable[T](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]]
  2048. toScalaObservable[R](thisJava.publish(fJava))
  2049. }
  2050.  
  2051. // TODO add Scala-like aggregate function
  2052.  
  2053. /**
  2054. * Returns an Observable that applies a function of your choosing to the first item emitted by a
  2055. * source Observable, then feeds the result of that function along with the second item emitted
  2056. * by an Observable into the same function, and so on until all items have been emitted by the
  2057. * source Observable, emitting the final result from the final call to your function as its sole
  2058. * item.
  2059. *
  2060. * <img width="640" height="325" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/reduceSeed.png" alt="" />
  2061. *
  2062. * This technique, which is called "reduce" or "aggregate" here, is sometimes called "fold,"
  2063. * "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance,
  2064. * has an `inject` method that does a similar operation on lists.
  2065. *
  2066. * @param initialValue
  2067. * the initial (seed) accumulator value
  2068. * @param accumulator
  2069. * an accumulator function to be invoked on each item emitted by the source
  2070. * Observable, the result of which will be used in the next accumulator call
  2071. * @return an Observable that emits a single item that is the result of accumulating the output
  2072. * from the items emitted by the source Observable
  2073. */
  2074. def foldLeft[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] = {
  2075. toScalaObservable[R](asJavaObservable.reduce(initialValue, new Func2[R,T,R]{
  2076. def call(t1: R, t2: T): R = accumulator(t1,t2)
  2077. }))
  2078. }
  2079.  
  2080. /**
  2081. * Returns an Observable that emits the results of sampling the items emitted by the source
  2082. * Observable at a specified time interval.
  2083. *
  2084. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.png" alt="" />
  2085. *
  2086. * @param duration the sampling rate
  2087. * @return an Observable that emits the results of sampling the items emitted by the source
  2088. * Observable at the specified time interval
  2089. */
  2090. def sample(duration: Duration): Observable[T] = {
  2091. toScalaObservable[T](asJavaObservable.sample(duration.length, duration.unit))
  2092. }
  2093.  
  2094. /**
  2095. * Returns an Observable that emits the results of sampling the items emitted by the source
  2096. * Observable at a specified time interval.
  2097. *
  2098. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.png" alt="" />
  2099. *
  2100. * @param duration the sampling rate
  2101. * @param scheduler
  2102. * the [[rx.lang.scala.Scheduler]] to use when sampling
  2103. * @return an Observable that emits the results of sampling the items emitted by the source
  2104. * Observable at the specified time interval
  2105. */
  2106. def sample(duration: Duration, scheduler: Scheduler): Observable[T] = {
  2107. toScalaObservable[T](asJavaObservable.sample(duration.length, duration.unit, scheduler))
  2108. }
  2109.  
  2110. /**
  2111. * Return an Observable that emits the results of sampling the items emitted by the source Observable
  2112. * whenever the specified sampler Observable emits an item or completes.
  2113. *
  2114. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.o.png" alt="" />
  2115. *
  2116. * @param sampler
  2117. * the Observable to use for sampling the source Observable
  2118. * @return an Observable that emits the results of sampling the items emitted by this Observable whenever
  2119. * the sampler Observable emits an item or completes
  2120. */
  2121. def sample(sampler: Observable[Any]): Observable[T] = {
  2122. toScalaObservable[T](asJavaObservable.sample(sampler))
  2123. }
  2124.  
  2125. /**
  2126. * Returns an Observable that applies a function of your choosing to the first item emitted by a
  2127. * source Observable, then feeds the result of that function along with the second item emitted
  2128. * by an Observable into the same function, and so on until all items have been emitted by the
  2129. * source Observable, emitting the result of each of these iterations.
  2130. *
  2131. * <img width="640" height="320" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/scanSeed.png" alt="" />
  2132. *
  2133. * This sort of function is sometimes called an accumulator.
  2134. *
  2135. * Note that when you pass a seed to `scan()` the resulting Observable will emit
  2136. * that seed as its first emitted item.
  2137. *
  2138. * @param initialValue
  2139. * the initial (seed) accumulator value
  2140. * @param accumulator
  2141. * an accumulator function to be invoked on each item emitted by the source
  2142. * Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via
  2143. * [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call.
  2144. * @return an Observable that emits the results of each call to the accumulator function
  2145. */
  2146. def scan[R](initialValue: R)(accumulator: (R, T) => R): Observable[R] = {
  2147. toScalaObservable[R](asJavaObservable.scan(initialValue, new Func2[R,T,R]{
  2148. def call(t1: R, t2: T): R = accumulator(t1,t2)
  2149. }))
  2150. }
  2151.  
  2152. /**
  2153. * Returns an Observable that applies a function of your choosing to the
  2154. * first item emitted by a source Observable, then feeds the result of that
  2155. * function along with the second item emitted by an Observable into the
  2156. * same function, and so on until all items have been emitted by the source
  2157. * Observable, emitting the result of each of these iterations.
  2158. * <p>
  2159. * <img width="640" height="320" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/scan.png" alt="" />
  2160. * <p>
  2161. *
  2162. * @param accumulator
  2163. * an accumulator function to be invoked on each item emitted by the source
  2164. * Observable, whose result will be emitted to [[rx.lang.scala.Observer]]s via
  2165. * [[rx.lang.scala.Observer.onNext onNext]] and used in the next accumulator call.
  2166. * @return
  2167. * an Observable that emits the results of each call to the
  2168. * accumulator function
  2169. */
  2170. def scan[U >: T](accumulator: (U, U) => U): Observable[U] = {
  2171. val func: Func2[_ >: U, _ >: U, _ <: U] = accumulator
  2172. val func2 = func.asInstanceOf[Func2[T, T, T]]
  2173. toScalaObservable[U](asJavaObservable.asInstanceOf[rx.Observable[T]].scan(func2))
  2174. }
  2175.  
  2176. /**
  2177. * Returns an Observable that emits a Boolean that indicates whether all of the items emitted by
  2178. * the source Observable satisfy a condition.
  2179. *
  2180. * <img width="640" height="315" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/all.png" alt="" />
  2181. *
  2182. * @param predicate
  2183. * a function that evaluates an item and returns a Boolean
  2184. * @return an Observable that emits `true` if all items emitted by the source
  2185. * Observable satisfy the predicate; otherwise, `false`
  2186. */
  2187. def forall(predicate: T => Boolean): Observable[Boolean] = {
  2188. toScalaObservable[java.lang.Boolean](asJavaObservable.all(predicate)).map(_.booleanValue())
  2189. }
  2190.  
  2191. /**
  2192. * Returns an Observable that skips the first `num` items emitted by the source
  2193. * Observable and emits the remainder.
  2194. *
  2195. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/skip.png" alt="" />
  2196. *
  2197. * @param n
  2198. * the number of items to skip
  2199. * @return an Observable that is identical to the source Observable except that it does not
  2200. * emit the first `num` items that the source emits
  2201. */
  2202. def drop(n: Int): Observable[T] = {
  2203. toScalaObservable[T](asJavaObservable.skip(n))
  2204. }
  2205.  
  2206. /**
  2207. * Returns an Observable that drops values emitted by the source Observable before a specified time window
  2208. * elapses.
  2209. *
  2210. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/skip.t.png" alt="" />
  2211. *
  2212. * @param time the length of the time window to drop
  2213. * @return an Observable that drops values emitted by the source Observable before the time window defined
  2214. * by `time` elapses and emits the remainder
  2215. */
  2216. def drop(time: Duration): Observable[T] = {
  2217. toScalaObservable(asJavaObservable.skip(time.length, time.unit))
  2218. }
  2219.  
  2220. /**
  2221. * Returns an Observable that drops values emitted by the source Observable before a specified time window
  2222. * elapses.
  2223. *
  2224. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/skip.t.png" alt="" />
  2225. *
  2226. * @param time the length of the time window to drop
  2227. * @param scheduler the `Scheduler` on which the timed wait happens
  2228. * @return an Observable that drops values emitted by the source Observable before the time window defined
  2229. * by `time` elapses and emits the remainder
  2230. */
  2231. def drop(time: Duration, scheduler: Scheduler): Observable[T] = {
  2232. toScalaObservable(asJavaObservable.skip(time.length, time.unit, scheduler))
  2233. }
  2234.  
  2235. /**
  2236. * Returns an Observable that bypasses all items from the source Observable as long as the specified
  2237. * condition holds true. Emits all further source items as soon as the condition becomes false.
  2238. *
  2239. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/skipWhile.png" alt="" />
  2240. *
  2241. * @param predicate
  2242. * A function to test each item emitted from the source Observable for a condition.
  2243. * @return an Observable that emits all items from the source Observable as soon as the condition
  2244. * becomes false.
  2245. */
  2246. def dropWhile(predicate: T => Boolean): Observable[T] = {
  2247. toScalaObservable(asJavaObservable.skipWhile(predicate))
  2248. }
  2249.  
  2250. /**
  2251. * Returns an Observable that drops a specified number of items from the end of the sequence emitted by the
  2252. * source Observable.
  2253. * <p>
  2254. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/skipLast.png" alt="" />
  2255. * <p>
  2256. * This Observer accumulates a queue long enough to store the first `n` items. As more items are
  2257. * received, items are taken from the front of the queue and emitted by the returned Observable. This causes
  2258. * such items to be delayed.
  2259. *
  2260. * @param n number of items to drop from the end of the source sequence
  2261. * @return an Observable that emits the items emitted by the source Observable except for the dropped ones
  2262. * at the end
  2263. * @throws java.lang.IndexOutOfBoundsException if `n` is less than zero
  2264. */
  2265. def dropRight(n: Int): Observable[T] = {
  2266. toScalaObservable(asJavaObservable.skipLast(n))
  2267. }
  2268.  
  2269. /**
  2270. * Returns an Observable that drops items emitted by the source Observable during a specified time window
  2271. * before the source completes.
  2272. * <p>
  2273. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/skipLast.t.png" alt="" />
  2274. *
  2275. * Note: this action will cache the latest items arriving in the specified time window.
  2276. *
  2277. * @param time the length of the time window
  2278. * @return an Observable that drops those items emitted by the source Observable in a time window before the
  2279. * source completes defined by `time`
  2280. */
  2281. def dropRight(time: Duration): Observable[T] = {
  2282. toScalaObservable(asJavaObservable.skipLast(time.length, time.unit))
  2283. }
  2284.  
  2285. /**
  2286. * Returns an Observable that drops items emitted by the source Observable during a specified time window
  2287. * (defined on a specified scheduler) before the source completes.
  2288. * <p>
  2289. * <img width="640" height="340" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/skipLast.ts.png" alt="" />
  2290. *
  2291. * Note: this action will cache the latest items arriving in the specified time window.
  2292. *
  2293. * @param time the length of the time window
  2294. * @param scheduler the scheduler used as the time source
  2295. * @return an Observable that drops those items emitted by the source Observable in a time window before the
  2296. * source completes defined by `time` and `scheduler`
  2297. */
  2298. def dropRight(time: Duration, scheduler: Scheduler): Observable[T] = {
  2299. toScalaObservable(asJavaObservable.skipLast(time.length, time.unit, scheduler))
  2300. }
  2301.  
  2302. /**
  2303. * Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
  2304. * <p>
  2305. * <img width="640" height="375" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/skipUntil.png" alt="" />
  2306. *
  2307. * @param other the second Observable that has to emit an item before the source Observable's elements begin
  2308. * to be mirrored by the resulting Observable
  2309. * @return an Observable that skips items from the source Observable until the second Observable emits an
  2310. * item, then emits the remaining items
  2311. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Filtering-Observables#wiki-skipuntil">RxJava Wiki: skipUntil()</a>
  2312. * @see <a href="http://msdn.microsoft.com/en-us/library/hh229358.aspx">MSDN: Observable.SkipUntil</a>
  2313. */
  2314. def dropUntil(other: Observable[Any]): Observable[T] = {
  2315. toScalaObservable[T](asJavaObservable.skipUntil(other))
  2316. }
  2317.  
  2318. /**
  2319. * Returns an Observable that emits only the first `num` items emitted by the source
  2320. * Observable.
  2321. *
  2322. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/take.png" alt="" />
  2323. *
  2324. * This method returns an Observable that will invoke a subscribing [[rx.lang.scala.Observer]]'s
  2325. * [[rx.lang.scala.Observer.onNext onNext]] function a maximum of `num` times before invoking
  2326. * [[rx.lang.scala.Observer.onCompleted onCompleted]].
  2327. *
  2328. * @param n
  2329. * the number of items to take
  2330. * @return an Observable that emits only the first `num` items from the source
  2331. * Observable, or all of the items from the source Observable if that Observable emits
  2332. * fewer than `num` items
  2333. */
  2334. def take(n: Int): Observable[T] = {
  2335. toScalaObservable[T](asJavaObservable.take(n))
  2336. }
  2337.  
  2338. /**
  2339. * Returns an Observable that emits those items emitted by source Observable before a specified time runs out.
  2340. * <p>
  2341. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/take.t.png" alt="" />
  2342. *
  2343. * @param time the length of the time window
  2344. * @return an Observable that emits those items emitted by the source Observable before the time runs out
  2345. */
  2346. def take(time: Duration): Observable[T] = {
  2347. toScalaObservable[T](asJavaObservable.take(time.length, time.unit))
  2348. }
  2349.  
  2350. /**
  2351. * Returns an Observable that emits those items emitted by source Observable before a specified time (on
  2352. * specified Scheduler) runs out
  2353. * <p>
  2354. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/take.ts.png" alt="" />
  2355. *
  2356. * @param time the length of the time window
  2357. * @param scheduler the Scheduler used for time source
  2358. * @return an Observable that emits those items emitted by the source Observable before the time runs out,
  2359. * according to the specified Scheduler
  2360. */
  2361. def take(time: Duration, scheduler: Scheduler): Observable[T] = {
  2362. toScalaObservable[T](asJavaObservable.take(time.length, time.unit, scheduler.asJavaScheduler))
  2363. }
  2364.  
  2365. /**
  2366. * $experimental Returns an [[Observable]] that emits items emitted by the source [[Observable]], checks the specified predicate
  2367. * for each item, and then completes if the condition is satisfied.
  2368. *
  2369. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.p.png" alt="">
  2370. *
  2371. * The difference between this operator and `takeWhile(T => Boolean)` is that here, the condition is
  2372. * evaluated '''after''' the item is emitted.
  2373. *
  2374. * $noDefaultScheduler
  2375. *
  2376. * @param stopPredicate a function that evaluates an item emitted by the source [[Observable]] and returns a Boolean
  2377. * @return an [[Observable]] that first emits items emitted by the source [[Observable]], checks the specified
  2378. * condition after each item, and then completes if the condition is satisfied.
  2379. * @see <a href="http://reactivex.io/documentation/operators/takeuntil.html">ReactiveX operators documentation: TakeUntil</a>
  2380. * @see [[Observable.takeWhile]]
  2381. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  2382. */
  2383. @Experimental
  2384. def takeUntil(stopPredicate: T => Boolean): Observable[T] = {
  2385. val func = new Func1[T, java.lang.Boolean] {
  2386. override def call(t: T): java.lang.Boolean = stopPredicate(t)
  2387. }
  2388. toScalaObservable[T](asJavaObservable.takeUntil(func))
  2389. }
  2390.  
  2391. /**
  2392. * Returns an Observable that emits items emitted by the source Observable so long as a
  2393. * specified condition is true.
  2394. *
  2395. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/takeWhile.png" alt="" />
  2396. *
  2397. * @param predicate
  2398. * a function that evaluates an item emitted by the source Observable and returns a
  2399. * Boolean
  2400. * @return an Observable that emits the items from the source Observable so long as each item
  2401. * satisfies the condition defined by `predicate`
  2402. */
  2403. def takeWhile(predicate: T => Boolean): Observable[T] = {
  2404. toScalaObservable[T](asJavaObservable.takeWhile(predicate))
  2405. }
  2406.  
  2407. /**
  2408. * Returns an Observable that emits only the last `count` items emitted by the source
  2409. * Observable.
  2410. *
  2411. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/last.png" alt="" />
  2412. *
  2413. * @param count
  2414. * the number of items to emit from the end of the sequence emitted by the source
  2415. * Observable
  2416. * @return an Observable that emits only the last `count` items emitted by the source
  2417. * Observable
  2418. */
  2419. def takeRight(count: Int): Observable[T] = {
  2420. toScalaObservable[T](asJavaObservable.takeLast(count))
  2421. }
  2422.  
  2423. /**
  2424. * Return an Observable that emits the items from the source Observable that were emitted in a specified
  2425. * window of `time` before the Observable completed.
  2426. * <p>
  2427. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/takeLast.t.png" alt="" />
  2428. *
  2429. * @param time the length of the time window
  2430. * @return an Observable that emits the items from the source Observable that were emitted in the window of
  2431. * time before the Observable completed specified by `time`
  2432. */
  2433. def takeRight(time: Duration): Observable[T] = {
  2434. toScalaObservable[T](asJavaObservable.takeLast(time.length, time.unit))
  2435. }
  2436.  
  2437. /**
  2438. * Return an Observable that emits the items from the source Observable that were emitted in a specified
  2439. * window of `time` before the Observable completed, where the timing information is provided by a specified
  2440. * Scheduler.
  2441. * <p>
  2442. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/takeLast.ts.png" alt="" />
  2443. *
  2444. * @param time the length of the time window
  2445. * @param scheduler the Scheduler that provides the timestamps for the Observed items
  2446. * @return an Observable that emits the items from the source Observable that were emitted in the window of
  2447. * time before the Observable completed specified by `time`, where the timing information is
  2448. * provided by `scheduler`
  2449. */
  2450. def takeRight(time: Duration, scheduler: Scheduler): Observable[T] = {
  2451. toScalaObservable[T](asJavaObservable.takeLast(time.length, time.unit, scheduler.asJavaScheduler))
  2452. }
  2453.  
  2454. /**
  2455. * Return an Observable that emits at most a specified number of items from the source Observable that were
  2456. * emitted in a specified window of time before the Observable completed.
  2457. * <p>
  2458. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/takeLast.tn.png" alt="" />
  2459. *
  2460. * @param count the maximum number of items to emit
  2461. * @param time the length of the time window
  2462. * @return an Observable that emits at most `count` items from the source Observable that were emitted
  2463. * in a specified window of time before the Observable completed
  2464. * @throws java.lang.IllegalArgumentException if `count` is less than zero
  2465. */
  2466. def takeRight(count: Int, time: Duration): Observable[T] = {
  2467. toScalaObservable[T](asJavaObservable.takeLast(count, time.length, time.unit))
  2468. }
  2469.  
  2470. /**
  2471. * Return an Observable that emits at most a specified number of items from the source Observable that were
  2472. * emitted in a specified window of `time` before the Observable completed, where the timing information is
  2473. * provided by a given Scheduler.
  2474. * <p>
  2475. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/takeLast.tns.png" alt="" />
  2476. *
  2477. * @param count the maximum number of items to emit
  2478. * @param time the length of the time window
  2479. * @param scheduler the Scheduler that provides the timestamps for the observed items
  2480. * @return an Observable that emits at most `count` items from the source Observable that were emitted
  2481. * in a specified window of time before the Observable completed, where the timing information is
  2482. * provided by the given `scheduler`
  2483. * @throws java.lang.IllegalArgumentException if `count` is less than zero
  2484. */
  2485. def takeRight(count: Int, time: Duration, scheduler: Scheduler): Observable[T] = {
  2486. toScalaObservable[T](asJavaObservable.takeLast(count, time.length, time.unit, scheduler.asJavaScheduler))
  2487. }
  2488.  
  2489. /**
  2490. * Returns an Observable that emits the items from the source Observable only until the
  2491. * `other` Observable emits an item.
  2492. *
  2493. * <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/takeUntil.png" alt="" />
  2494. *
  2495. * @param that
  2496. * the Observable whose first emitted item will cause `takeUntil` to stop
  2497. * emitting items from the source Observable
  2498. * @return an Observable that emits the items of the source Observable until such time as
  2499. * `other` emits its first item
  2500. */
  2501. def takeUntil(that: Observable[Any]): Observable[T] = {
  2502. toScalaObservable[T](asJavaObservable.takeUntil(that.asJavaObservable))
  2503. }
  2504.  
  2505. /**
  2506. * Returns an Observable that emits a single item, a list composed of all the items emitted by
  2507. * the source Observable.
  2508. *
  2509. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/toList.png" alt="" />
  2510. *
  2511. * Normally, an Observable that returns multiple items will do so by invoking its [[rx.lang.scala.Observer]]'s
  2512. * [[rx.lang.scala.Observer.onNext onNext]] method for each such item. You can change
  2513. * this behavior, instructing the Observable to compose a list of all of these items and then to
  2514. * invoke the Observer's `onNext` function once, passing it the entire list, by
  2515. * calling the Observable's `toList` method prior to calling its `Observable.subscribe` method.
  2516. *
  2517. * Be careful not to use this operator on Observables that emit infinite or very large numbers
  2518. * of items, as you do not have the option to unsubscribe.
  2519. *
  2520. * @return an Observable that emits a single item: a List containing all of the items emitted by
  2521. * the source Observable.
  2522. */
  2523. def toSeq: Observable[Seq[T]] = {
  2524. Observable.jObsOfListToScObsOfSeq(asJavaObservable.toList)
  2525. : Observable[Seq[T]] // SI-7818
  2526. }
  2527.  
  2528. /**
  2529. * Groups the items emitted by this Observable according to a specified discriminator function.
  2530. *
  2531. * @param f
  2532. * a function that extracts the key from an item
  2533. * @tparam K
  2534. * the type of keys returned by the discriminator function.
  2535. * @return an Observable that emits `(key, observable)` pairs, where `observable`
  2536. * contains all items for which `f` returned `key`.
  2537. */
  2538. def groupBy[K](f: T => K): Observable[(K, Observable[T])] = {
  2539. val o1 = asJavaObservable.groupBy[K](f) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
  2540. val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, toScalaObservable[T](o))
  2541. toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
  2542. }
  2543.  
  2544. /**
  2545. * Groups the items emitted by an [[Observable]] according to a specified criterion, and emits these
  2546. * grouped items as `(key, observable)` pairs.
  2547. *
  2548. * <img width="640" height="360" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/groupBy.png" alt="" />
  2549. *
  2550. * Note: A `(key, observable)` will cache the items it is to emit until such time as it
  2551. * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
  2552. * `(key, observable)` pairs that do not concern you. Instead, you can signal to them that they may
  2553. * discard their buffers by applying an operator like `take(0)` to them.
  2554. *
  2555. * ===Backpressure Support:===
  2556. * This operator does not support backpressure as splitting a stream effectively turns it into a "hot observable"
  2557. * and blocking any one group would block the entire parent stream. If you need backpressure on individual groups
  2558. * then you should use operators such as `nBackpressureDrop` or `@link #onBackpressureBuffer`.</dd>
  2559. * ===Scheduler:===
  2560. * groupBy` does not operate by default on a particular `Scheduler`.
  2561. *
  2562. * @param keySelector a function that extracts the key for each item
  2563. * @param valueSelector a function that extracts the return element for each item
  2564. * @tparam K the key type
  2565. * @tparam V the value type
  2566. * @return an [[Observable]] that emits `(key, observable)` pairs, each of which corresponds to a
  2567. * unique key value and each of which emits those items from the source Observable that share that
  2568. * key value
  2569. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava wiki: groupBy</a>
  2570. * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.groupby.aspx">MSDN: Observable.GroupBy</a>
  2571. */
  2572. def groupBy[K, V](keySelector: T => K, valueSelector: T => V): Observable[(K, Observable[V])] = {
  2573. val jo: rx.Observable[rx.observables.GroupedObservable[K, V]] = asJavaObservable.groupBy[K, V](keySelector, valueSelector)
  2574. toScalaObservable[rx.observables.GroupedObservable[K, V]](jo).map {
  2575. go: rx.observables.GroupedObservable[K, V] => (go.getKey, toScalaObservable[V](go))
  2576. }
  2577. }
  2578.  
  2579. /**
  2580. * Correlates the items emitted by two Observables based on overlapping durations.
  2581. * <p>
  2582. * <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/join_.png" alt="" />
  2583. *
  2584. * @param other
  2585. * the second Observable to join items from
  2586. * @param leftDurationSelector
  2587. * a function to select a duration for each item emitted by the source Observable,
  2588. * used to determine overlap
  2589. * @param rightDurationSelector
  2590. * a function to select a duration for each item emitted by the inner Observable,
  2591. * used to determine overlap
  2592. * @param resultSelector
  2593. * a function that computes an item to be emitted by the resulting Observable for any
  2594. * two overlapping items emitted by the two Observables
  2595. * @return
  2596. * an Observable that emits items correlating to items emitted by the source Observables
  2597. * that have overlapping durations
  2598. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Combining-Observables#join">RxJava Wiki: join()</a>
  2599. * @see <a href="http://msdn.microsoft.com/en-us/library/hh229750.aspx">MSDN: Observable.Join</a>
  2600. */
  2601. def join[S, R] (other: Observable[S])(leftDurationSelector: T => Observable[Any], rightDurationSelector: S => Observable[Any], resultSelector: (T, S) => R): Observable[R] = {
  2602. val outer : rx.Observable[_ <: T] = this.asJavaObservable
  2603. val inner : rx.Observable[_ <: S] = other.asJavaObservable
  2604. val left: Func1[_ >: T, _<: rx.Observable[_ <: Any]] = (t: T) => leftDurationSelector(t).asJavaObservable
  2605. val right: Func1[_ >: S, _<: rx.Observable[_ <: Any]] = (s: S) => rightDurationSelector(s).asJavaObservable
  2606. val f: Func2[_>: T, _ >: S, _ <: R] = resultSelector
  2607.  
  2608. toScalaObservable[R](
  2609. outer.asInstanceOf[rx.Observable[T]].join[S, Any, Any, R](
  2610. inner.asInstanceOf[rx.Observable[S]],
  2611. left. asInstanceOf[Func1[T, rx.Observable[Any]]],
  2612. right.asInstanceOf[Func1[S, rx.Observable[Any]]],
  2613. f.asInstanceOf[Func2[T,S,R]])
  2614. )
  2615. }
  2616.  
  2617. /**
  2618. * Returns an Observable that correlates two Observables when they overlap in time and groups the results.
  2619. *
  2620. * <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/groupJoin.png" alt="" />
  2621. *
  2622. * @param other the other Observable to correlate items from the source Observable with
  2623. * @param leftDuration a function that returns an Observable whose emissions indicate the duration of the values of
  2624. * the source Observable
  2625. * @param rightDuration a function that returns an Observable whose emissions indicate the duration of the values of
  2626. * the `other` Observable
  2627. * @param resultSelector a function that takes an item emitted by each Observable and returns the value to be emitted
  2628. * by the resulting Observable
  2629. * @return an Observable that emits items based on combining those items emitted by the source Observables
  2630. * whose durations overlap
  2631. */
  2632. def groupJoin[S, R](other: Observable[S])(leftDuration: T => Observable[Any], rightDuration: S => Observable[Any], resultSelector: (T, Observable[S]) => R): Observable[R] = {
  2633. val outer: rx.Observable[_ <: T] = this.asJavaObservable
  2634. val inner: rx.Observable[_ <: S] = other.asJavaObservable
  2635. val left: Func1[_ >: T, _ <: rx.Observable[_ <: Any]] = (t: T) => leftDuration(t).asJavaObservable
  2636. val right: Func1[_ >: S, _ <: rx.Observable[_ <: Any]] = (s: S) => rightDuration(s).asJavaObservable
  2637. val f: Func2[_ >: T, _ >: rx.Observable[S], _ <: R] = (t: T, o: rx.Observable[S]) => resultSelector(t, toScalaObservable[S](o))
  2638. toScalaObservable[R](
  2639. outer.asInstanceOf[rx.Observable[T]].groupJoin[S, Any, Any, R](
  2640. inner.asInstanceOf[rx.Observable[S]],
  2641. left.asInstanceOf[Func1[T, rx.Observable[Any]]],
  2642. right.asInstanceOf[Func1[S, rx.Observable[Any]]],
  2643. f)
  2644. )
  2645. }
  2646.  
  2647. /**
  2648. * Returns a new Observable by applying a function that you supply to each item emitted by the source
  2649. * Observable that returns an Observable, and then emitting the items emitted by the most recently emitted
  2650. * of these Observables.
  2651. *
  2652. * <img width="640" height="350" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="" />
  2653. *
  2654. * @param f a function that, when applied to an item emitted by the source Observable, returns an Observable
  2655. * @return an Observable that emits the items emitted by the Observable returned from applying a function to
  2656. * the most recently emitted item emitted by the source Observable
  2657. */
  2658. def switchMap[R](f: T => Observable[R]): Observable[R] = {
  2659. toScalaObservable[R](asJavaObservable.switchMap[R](new Func1[T, rx.Observable[_ <: R]] {
  2660. def call(t: T): rx.Observable[_ <: R] = f(t).asJavaObservable
  2661. }))
  2662. }
  2663.  
  2664. /**
  2665. * $experimental Returns an [[Observable]] that emits the items emitted by the source [[Observable]] or the items of an alternate
  2666. * [[Observable]] if the source [[Observable]] is empty.
  2667. *
  2668. * $noDefaultScheduler
  2669. *
  2670. * @param alternate the alternate [[Observable]] to subscribe to if the source does not emit any items
  2671. * @return an [[Observable]] that emits the items emitted by the source [[Observable]] or the items of an
  2672. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  2673. * alternate [[Observable]] if the source [[Observable]] is empty.
  2674. */
  2675. @Experimental
  2676. def switchIfEmpty[U >: T](alternate: Observable[U]): Observable[U] = {
  2677. val jo = asJavaObservable.asInstanceOf[rx.Observable[U]]
  2678. toScalaObservable[U](jo.switchIfEmpty(alternate.asJavaObservable))
  2679. }
  2680.  
  2681. /**
  2682. * Given an Observable that emits Observables, creates a single Observable that
  2683. * emits the items emitted by the most recently published of those Observables.
  2684. *
  2685. * <img width="640" height="370" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/switchDo.png" alt="" />
  2686. *
  2687. * This operation is only available if `this` is of type `Observable[Observable[U]]` for some `U`,
  2688. * otherwise you'll get a compilation error.
  2689. *
  2690. * @return an Observable that emits only the items emitted by the most recently published
  2691. * Observable
  2692. *
  2693. * @usecase def switch[U]: Observable[U]
  2694. * @inheritdoc
  2695. */
  2696. def switch[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
  2697. val o2: Observable[Observable[U]] = this
  2698. val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
  2699. val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
  2700. val o5 = rx.Observable.switchOnNext[U](o4)
  2701. toScalaObservable[U](o5)
  2702. }
  2703. // Naming: We follow C# (switch), not Java (switchOnNext), because Java just had to avoid clash with keyword
  2704.  
  2705. /**
  2706. * Flattens two Observables into one Observable, without any transformation.
  2707. *
  2708. * <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="" />
  2709. *
  2710. * You can combine items emitted by two Observables so that they act like a single
  2711. * Observable by using the `merge` method.
  2712. *
  2713. * @param that
  2714. * an Observable to be merged
  2715. * @return an Observable that emits items from `this` and `that` until
  2716. * `this` or `that` emits `onError` or both Observables emit `onCompleted`.
  2717. */
  2718. def merge[U >: T](that: Observable[U]): Observable[U] = {
  2719. val thisJava: rx.Observable[_ <: U] = this.asJavaObservable
  2720. val thatJava: rx.Observable[_ <: U] = that.asJavaObservable
  2721. toScalaObservable[U](rx.Observable.merge(thisJava, thatJava))
  2722. }
  2723.  
  2724. /**
  2725. * This behaves like [[rx.lang.scala.Observable.merge]] except that if any of the merged Observables
  2726. * notify of an error via [[rx.lang.scala.Observer.onError onError]], `mergeDelayError` will
  2727. * refrain from propagating that error notification until all of the merged Observables have
  2728. * finished emitting items.
  2729. *
  2730. * <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="" />
  2731. *
  2732. * Even if multiple merged Observables send `onError` notifications, `mergeDelayError` will only invoke the `onError` method of its
  2733. * Observers once.
  2734. *
  2735. * This method allows an Observer to receive all successfully emitted items from all of the
  2736. * source Observables without being interrupted by an error notification from one of them.
  2737. *
  2738. * @param that
  2739. * an Observable to be merged
  2740. * @return an Observable that emits items that are the result of flattening the items emitted by
  2741. * `this` and `that`
  2742. */
  2743. @deprecated("Use [[[rx.lang.scala.observables.ErrorDelayingObservable.merge delayError.merge]]] instead", "0.26.2")
  2744. def mergeDelayError[U >: T](that: Observable[U]): Observable[U] = {
  2745. toScalaObservable[U](rx.Observable.mergeDelayError[U](this.asJavaObservable, that.asJavaObservable))
  2746. }
  2747.  
  2748. /**
  2749. * Flattens the sequence of Observables emitted by `this` into one Observable, without any
  2750. * transformation.
  2751. *
  2752. * <img width="640" height="380" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.png" alt="" />
  2753. *
  2754. * You can combine the items emitted by multiple Observables so that they act like a single
  2755. * Observable by using this method.
  2756. *
  2757. * This operation is only available if `this` is of type `Observable[Observable[U]]` for some `U`,
  2758. * otherwise you'll get a compilation error.
  2759. *
  2760. * @return an Observable that emits items that are the result of flattening the items emitted
  2761. * by the Observables emitted by `this`
  2762. *
  2763. * @usecase def flatten[U]: Observable[U]
  2764. * @inheritdoc
  2765. */
  2766. def flatten[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
  2767. val o2: Observable[Observable[U]] = this
  2768. val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
  2769. val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
  2770. val o5 = rx.Observable.merge[U](o4)
  2771. toScalaObservable[U](o5)
  2772. }
  2773.  
  2774. /**
  2775. * Flattens an Observable that emits Observables into a single Observable that emits the items emitted by
  2776. * those Observables, without any transformation, while limiting the maximum number of concurrent
  2777. * subscriptions to these Observables.
  2778. *
  2779. * <img width="640" height="370" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.oo.png" alt="" />
  2780. *
  2781. * You can combine the items emitted by multiple Observables so that they appear as a single Observable, by
  2782. * using the `flatten` method.
  2783. *
  2784. * @param maxConcurrent the maximum number of Observables that may be subscribed to concurrently
  2785. * @return an Observable that emits items that are the result of flattening the Observables emitted by the `source` Observable
  2786. * @throws java.lang.IllegalArgumentException if `maxConcurrent` is less than or equal to 0
  2787. */
  2788. def flatten[U](maxConcurrent: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
  2789. val o2: Observable[Observable[U]] = this
  2790. val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
  2791. val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
  2792. val o5 = rx.Observable.merge[U](o4, maxConcurrent)
  2793. toScalaObservable[U](o5)
  2794. }
  2795.  
  2796. /**
  2797. * Flattens an [[Observable]] that emits [[Observable]]s into one [[Observable]], in a way that allows an [[Observer]] to
  2798. * receive all successfully emitted items from all of the source [[Observable]]s without being interrupted by
  2799. * an error notification from one of them, while limiting the
  2800. * number of concurrent subscriptions to these [[Observable]]s.
  2801. *
  2802. * This behaves like `flatten` except that if any of the merged [[Observable]]s notify of an
  2803. * error via `onError`, `flattenDelayError` will refrain from propagating that
  2804. * error notification until all of the merged [[Observable]]s have finished emitting items.
  2805. *
  2806. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
  2807. *
  2808. * Even if multiple merged [[Observable]]s send `onError` notifications, `flattenDelayError` will only
  2809. * invoke the `onError` method of its `Observer`s once.
  2810. *
  2811. * $noDefaultScheduler
  2812. *
  2813. * @return an [[Observable]] that emits all of the items emitted by the [[Observable]]s emitted by `this`
  2814. * @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
  2815. */
  2816. @deprecated("Use [[[rx.lang.scala.observables.ErrorDelayingObservable.flatten[U](implicit* delayError.flatten]]] instead", "0.26.2")
  2817. def flattenDelayError[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
  2818. val o2: Observable[Observable[U]] = this
  2819. val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
  2820. val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
  2821. val o5 = rx.Observable.mergeDelayError[U](o4)
  2822. toScalaObservable[U](o5)
  2823. }
  2824.  
  2825. /**
  2826. * Combines two observables, emitting a pair of the latest values of each of
  2827. * the source observables each time an event is received from one of the source observables.
  2828. *
  2829. * @param that
  2830. * The second source observable.
  2831. * @return An Observable that combines the source Observables
  2832. */
  2833. def combineLatest[U](that: Observable[U]): Observable[(T, U)] = {
  2834. val f: Func2[_ >: T, _ >: U, _ <: (T, U)] = (t: T, u: U) => (t, u)
  2835. toScalaObservable[(T, U)](rx.Observable.combineLatest[T, U, (T, U)](this.asJavaObservable, that.asJavaObservable, f))
  2836. }
  2837.  
  2838.  
  2839. /**
  2840. * Combines two observables, emitting some type `R` specified in the function `selector`,
  2841. * each time an event is received from one of the source observables, where the aggregation
  2842. * is defined by the given function.
  2843. *
  2844. * @param that The second source observable.
  2845. * @param selector The function that is used combine the emissions of the two observables.
  2846. * @return An Observable that combines the source Observables according to the function `selector`.
  2847. */
  2848. def combineLatestWith[U, R](that: Observable[U])(selector: (T, U) => R): Observable[R] = {
  2849. toScalaObservable[R](rx.Observable.combineLatest[T, U, R](this.asJavaObservable, that.asJavaObservable, selector))
  2850. }
  2851.  
  2852. /**
  2853. * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
  2854. *
  2855. * NOTE: If events keep firing faster than the timeout then no data will be emitted.
  2856. *
  2857. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleWithTimeout.png" alt="" />
  2858. *
  2859. * $debounceVsThrottle
  2860. *
  2861. * @param timeout
  2862. * The time each value has to be 'the most recent' of the [[rx.lang.scala.Observable]] to ensure that it's not dropped.
  2863. *
  2864. * @return An [[rx.lang.scala.Observable]] which filters out values which are too quickly followed up with newer values.
  2865. * @see `Observable.debounce`
  2866. */
  2867. def throttleWithTimeout(timeout: Duration): Observable[T] = {
  2868. toScalaObservable[T](asJavaObservable.throttleWithTimeout(timeout.length, timeout.unit))
  2869. }
  2870.  
  2871. /**
  2872. * Return an Observable that mirrors the source Observable, except that it drops items emitted by the source
  2873. * Observable that are followed by another item within a computed debounce duration.
  2874. *
  2875. * <img width="640" height="425" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/debounce.f.png" alt="" />
  2876. *
  2877. * @param debounceSelector function to retrieve a sequence that indicates the throttle duration for each item
  2878. * @return an Observable that omits items emitted by the source Observable that are followed by another item
  2879. * within a computed debounce duration
  2880. */
  2881. def debounce(debounceSelector: T => Observable[Any]): Observable[T] = {
  2882. val fJava = new rx.functions.Func1[T, rx.Observable[Any]] {
  2883. override def call(t: T) = debounceSelector(t).asJavaObservable.asInstanceOf[rx.Observable[Any]]
  2884. }
  2885. toScalaObservable[T](asJavaObservable.debounce[Any](fJava))
  2886. }
  2887.  
  2888. /**
  2889. * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
  2890. *
  2891. * NOTE: If events keep firing faster than the timeout then no data will be emitted.
  2892. *
  2893. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/debounce.png" alt="" />
  2894. *
  2895. * $debounceVsThrottle
  2896. *
  2897. * @param timeout
  2898. * The time each value has to be 'the most recent' of the [[rx.lang.scala.Observable]] to ensure that it's not dropped.
  2899. *
  2900. * @return An [[rx.lang.scala.Observable]] which filters out values which are too quickly followed up with newer values.
  2901. * @see `Observable.throttleWithTimeout`
  2902. */
  2903. def debounce(timeout: Duration): Observable[T] = {
  2904. toScalaObservable[T](asJavaObservable.debounce(timeout.length, timeout.unit))
  2905. }
  2906.  
  2907. /**
  2908. * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
  2909. *
  2910. * NOTE: If events keep firing faster than the timeout then no data will be emitted.
  2911. *
  2912. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/debounce.png" alt="" />
  2913. *
  2914. * $debounceVsThrottle
  2915. *
  2916. * @param timeout
  2917. * The time each value has to be 'the most recent' of the [[rx.lang.scala.Observable]] to ensure that it's not dropped.
  2918. * @param scheduler
  2919. * The [[rx.lang.scala.Scheduler]] to use internally to manage the timers which handle timeout for each event.
  2920. * @return Observable which performs the throttle operation.
  2921. * @see `Observable.throttleWithTimeout`
  2922. */
  2923. def debounce(timeout: Duration, scheduler: Scheduler): Observable[T] = {
  2924. toScalaObservable[T](asJavaObservable.debounce(timeout.length, timeout.unit, scheduler))
  2925. }
  2926.  
  2927. /**
  2928. * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
  2929. *
  2930. * NOTE: If events keep firing faster than the timeout then no data will be emitted.
  2931. *
  2932. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleWithTimeout.png" alt="" />
  2933. *
  2934. * @param timeout
  2935. * The time each value has to be 'the most recent' of the [[rx.lang.scala.Observable]] to ensure that it's not dropped.
  2936. * @param scheduler
  2937. * The [[rx.lang.scala.Scheduler]] to use internally to manage the timers which handle timeout for each event.
  2938. * @return Observable which performs the throttle operation.
  2939. * @see `Observable.debounce`
  2940. */
  2941. def throttleWithTimeout(timeout: Duration, scheduler: Scheduler): Observable[T] = {
  2942. toScalaObservable[T](asJavaObservable.throttleWithTimeout(timeout.length, timeout.unit, scheduler))
  2943. }
  2944.  
  2945. /**
  2946. * Throttles by skipping value until `skipDuration` passes and then emits the next received value.
  2947. *
  2948. * This differs from `Observable.throttleLast` in that this only tracks passage of time whereas `Observable.throttleLast` ticks at scheduled intervals.
  2949. *
  2950. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleFirst.png" alt="" />
  2951. *
  2952. * @param skipDuration
  2953. * Time to wait before sending another value after emitting last value.
  2954. * @param scheduler
  2955. * The [[rx.lang.scala.Scheduler]] to use internally to manage the timers which handle timeout for each event.
  2956. * @return Observable which performs the throttle operation.
  2957. */
  2958. def throttleFirst(skipDuration: Duration, scheduler: Scheduler): Observable[T] = {
  2959. toScalaObservable[T](asJavaObservable.throttleFirst(skipDuration.length, skipDuration.unit, scheduler))
  2960. }
  2961.  
  2962. /**
  2963. * Throttles by skipping value until `skipDuration` passes and then emits the next received value.
  2964. *
  2965. * This differs from `Observable.throttleLast` in that this only tracks passage of time whereas `Observable.throttleLast` ticks at scheduled intervals.
  2966. *
  2967. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleFirst.png" alt="" />
  2968. *
  2969. * @param skipDuration
  2970. * Time to wait before sending another value after emitting last value.
  2971. * @return Observable which performs the throttle operation.
  2972. */
  2973. def throttleFirst(skipDuration: Duration): Observable[T] = {
  2974. toScalaObservable[T](asJavaObservable.throttleFirst(skipDuration.length, skipDuration.unit))
  2975. }
  2976.  
  2977. /**
  2978. * Throttles by returning the last value of each interval defined by 'intervalDuration'.
  2979. *
  2980. * This differs from `Observable.throttleFirst` in that this ticks along at a scheduled interval whereas `Observable.throttleFirst` does not tick, it just tracks passage of time.
  2981. *
  2982. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLast.png" alt="" />
  2983. *
  2984. * @param intervalDuration
  2985. * Duration of windows within with the last value will be chosen.
  2986. * @return Observable which performs the throttle operation.
  2987. */
  2988. def throttleLast(intervalDuration: Duration): Observable[T] = {
  2989. toScalaObservable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit))
  2990. }
  2991.  
  2992. /**
  2993. * Throttles by returning the last value of each interval defined by 'intervalDuration'.
  2994. *
  2995. * This differs from `Observable.throttleFirst` in that this ticks along at a scheduled interval whereas `Observable.throttleFirst` does not tick, it just tracks passage of time.
  2996. *
  2997. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLast.png" alt="" />
  2998. *
  2999. * @param intervalDuration
  3000. * Duration of windows within with the last value will be chosen.
  3001. * @return Observable which performs the throttle operation.
  3002. */
  3003. def throttleLast(intervalDuration: Duration, scheduler: Scheduler): Observable[T] = {
  3004. toScalaObservable[T](asJavaObservable.throttleLast(intervalDuration.length, intervalDuration.unit, scheduler))
  3005. }
  3006.  
  3007. /**
  3008. * Applies a timeout policy for each item emitted by the Observable, using
  3009. * the specified scheduler to run timeout timers. If the next item isn't
  3010. * observed within the specified timeout duration starting from its
  3011. * predecessor, observers are notified of a `TimeoutException`.
  3012. * <p>
  3013. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.1.png" alt="" />
  3014. *
  3015. * @param timeout maximum duration between items before a timeout occurs
  3016. * @return the source Observable modified to notify observers of a
  3017. * `TimeoutException` in case of a timeout
  3018. */
  3019. def timeout(timeout: Duration): Observable[T] = {
  3020. toScalaObservable[T](asJavaObservable.timeout(timeout.length, timeout.unit))
  3021. }
  3022.  
  3023. /**
  3024. * Applies a timeout policy for each item emitted by the Observable, using
  3025. * the specified scheduler to run timeout timers. If the next item isn't
  3026. * observed within the specified timeout duration starting from its
  3027. * predecessor, a specified fallback Observable produces future items and
  3028. * notifications from that point on.
  3029. * <p>
  3030. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.2.png" alt="" />
  3031. *
  3032. * @param timeout maximum duration between items before a timeout occurs
  3033. * @param other fallback Observable to use in case of a timeout
  3034. * @return the source Observable modified to switch to the fallback
  3035. * Observable in case of a timeout
  3036. */
  3037. def timeout[U >: T](timeout: Duration, other: Observable[U]): Observable[U] = {
  3038. val otherJava: rx.Observable[_ <: U] = other.asJavaObservable
  3039. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
  3040. toScalaObservable[U](thisJava.timeout(timeout.length, timeout.unit, otherJava))
  3041. }
  3042.  
  3043. /**
  3044. * Applies a timeout policy for each item emitted by the Observable, using
  3045. * the specified scheduler to run timeout timers. If the next item isn't
  3046. * observed within the specified timeout duration starting from its
  3047. * predecessor, the observer is notified of a `TimeoutException`.
  3048. * <p>
  3049. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.1s.png" alt="" />
  3050. *
  3051. * @param timeout maximum duration between items before a timeout occurs
  3052. * @param scheduler Scheduler to run the timeout timers on
  3053. * @return the source Observable modified to notify observers of a
  3054. * `TimeoutException` in case of a timeout
  3055. */
  3056. def timeout(timeout: Duration, scheduler: Scheduler): Observable[T] = {
  3057. toScalaObservable[T](asJavaObservable.timeout(timeout.length, timeout.unit, scheduler.asJavaScheduler))
  3058. }
  3059.  
  3060. /**
  3061. * Applies a timeout policy for each item emitted by the Observable, using
  3062. * the specified scheduler to run timeout timers. If the next item isn't
  3063. * observed within the specified timeout duration starting from its
  3064. * predecessor, a specified fallback Observable sequence produces future
  3065. * items and notifications from that point on.
  3066. * <p>
  3067. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.2s.png" alt="" />
  3068. *
  3069. * @param timeout maximum duration between items before a timeout occurs
  3070. * @param other Observable to use as the fallback in case of a timeout
  3071. * @param scheduler Scheduler to run the timeout timers on
  3072. * @return the source Observable modified so that it will switch to the
  3073. * fallback Observable in case of a timeout
  3074. */
  3075. def timeout[U >: T](timeout: Duration, other: Observable[U], scheduler: Scheduler): Observable[U] = {
  3076. val otherJava: rx.Observable[_ <: U] = other.asJavaObservable
  3077. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
  3078. toScalaObservable[U](thisJava.timeout(timeout.length, timeout.unit, otherJava, scheduler.asJavaScheduler))
  3079. }
  3080.  
  3081. /**
  3082. * Returns an Observable that mirrors the source Observable, but emits a TimeoutException if an item emitted by
  3083. * the source Observable doesn't arrive within a window of time after the emission of the
  3084. * previous item, where that period of time is measured by an Observable that is a function
  3085. * of the previous item.
  3086. * <p>
  3087. * <img width="640" height="400" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout3.png" alt="" />
  3088. * </p>
  3089. * Note: The arrival of the first source item is never timed out.
  3090. *
  3091. * @param timeoutSelector
  3092. * a function that returns an observable for each item emitted by the source
  3093. * Observable and that determines the timeout window for the subsequent item
  3094. * @return an Observable that mirrors the source Observable, but emits a TimeoutException if a item emitted by
  3095. * the source Observable takes longer to arrive than the time window defined by the
  3096. * selector for the previously emitted item
  3097. */
  3098. def timeout(timeoutSelector: T => Observable[Any]): Observable[T] = {
  3099. toScalaObservable[T](asJavaObservable.timeout({ t: T => timeoutSelector(t).asJavaObservable.asInstanceOf[rx.Observable[Any]] }))
  3100. }
  3101.  
  3102. /**
  3103. * Returns an Observable that mirrors the source Observable, but that switches to a fallback
  3104. * Observable if an item emitted by the source Observable doesn't arrive within a window of time
  3105. * after the emission of the previous item, where that period of time is measured by an
  3106. * Observable that is a function of the previous item.
  3107. * <p>
  3108. * <img width="640" height="400" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout4.png" alt="" />
  3109. * </p>
  3110. * Note: The arrival of the first source item is never timed out.
  3111. *
  3112. * @param timeoutSelector
  3113. * a function that returns an observable for each item emitted by the source
  3114. * Observable and that determines the timeout window for the subsequent item
  3115. * @param other
  3116. * the fallback Observable to switch to if the source Observable times out
  3117. * @return an Observable that mirrors the source Observable, but switches to mirroring a
  3118. * fallback Observable if a item emitted by the source Observable takes longer to arrive
  3119. * than the time window defined by the selector for the previously emitted item
  3120. */
  3121. def timeout[U >: T](timeoutSelector: T => Observable[Any], other: Observable[U]): Observable[U] = {
  3122. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
  3123. toScalaObservable[U](thisJava.timeout(
  3124. { t: U => timeoutSelector(t.asInstanceOf[T]).asJavaObservable.asInstanceOf[rx.Observable[Any]] },
  3125. other.asJavaObservable))
  3126. }
  3127.  
  3128. /**
  3129. * Returns an Observable that mirrors the source Observable, but emits a TimeoutException
  3130. * if either the first item emitted by the source Observable or any subsequent item
  3131. * don't arrive within time windows defined by other Observables.
  3132. * <p>
  3133. * <img width="640" height="400" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout5.png" alt="" />
  3134. * </p>
  3135. * @param firstTimeoutSelector
  3136. * a function that returns an Observable that determines the timeout window for the
  3137. * first source item
  3138. * @param timeoutSelector
  3139. * a function that returns an Observable for each item emitted by the source
  3140. * Observable and that determines the timeout window in which the subsequent source
  3141. * item must arrive in order to continue the sequence
  3142. * @return an Observable that mirrors the source Observable, but emits a TimeoutException if either the first item or any subsequent item doesn't
  3143. * arrive within the time windows specified by the timeout selectors
  3144. */
  3145. def timeout(firstTimeoutSelector: () => Observable[Any], timeoutSelector: T => Observable[Any]): Observable[T] = {
  3146. toScalaObservable[T](asJavaObservable.timeout(
  3147. { firstTimeoutSelector().asJavaObservable.asInstanceOf[rx.Observable[Any]] },
  3148. { t: T => timeoutSelector(t).asJavaObservable.asInstanceOf[rx.Observable[Any]] }))
  3149. }
  3150.  
  3151. /**
  3152. * Returns an Observable that mirrors the source Observable, but switches to a fallback
  3153. * Observable if either the first item emitted by the source Observable or any subsequent item
  3154. * don't arrive within time windows defined by other Observables.
  3155. * <p>
  3156. * <img width="640" height="400" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout6.png" alt="" />
  3157. * </p>
  3158. * @param firstTimeoutSelector
  3159. * a function that returns an Observable which determines the timeout window for the
  3160. * first source item
  3161. * @param timeoutSelector
  3162. * a function that returns an Observable for each item emitted by the source
  3163. * Observable and that determines the timeout window in which the subsequent source
  3164. * item must arrive in order to continue the sequence
  3165. * @param other
  3166. * the fallback Observable to switch to if the source Observable times out
  3167. * @return an Observable that mirrors the source Observable, but switches to the `other` Observable if either the first item emitted by the source Observable or any
  3168. * subsequent item don't arrive within time windows defined by the timeout selectors
  3169. */
  3170. def timeout[U >: T](firstTimeoutSelector: () => Observable[Any], timeoutSelector: T => Observable[Any], other: Observable[U]): Observable[U] = {
  3171. val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]]
  3172. toScalaObservable[U](thisJava.timeout(
  3173. { firstTimeoutSelector().asJavaObservable.asInstanceOf[rx.Observable[Any]] },
  3174. { t: U => timeoutSelector(t.asInstanceOf[T]).asJavaObservable.asInstanceOf[rx.Observable[Any]] },
  3175. other.asJavaObservable))
  3176. }
  3177.  
  3178. /**
  3179. * Returns an Observable that sums up the elements of this Observable.
  3180. *
  3181. * This operation is only available if the elements of this Observable are numbers, otherwise
  3182. * you will get a compilation error.
  3183. *
  3184. * @return an Observable emitting the sum of all the elements of the source Observable
  3185. * as its single item.
  3186. *
  3187. * @usecase def sum: Observable[T]
  3188. * @inheritdoc
  3189. */
  3190. def sum[U >: T](implicit num: Numeric[U]): Observable[U] = {
  3191. foldLeft(num.zero)(num.plus)
  3192. }
  3193.  
  3194. /**
  3195. * Returns an Observable that multiplies up the elements of this Observable.
  3196. *
  3197. * This operation is only available if the elements of this Observable are numbers, otherwise
  3198. * you will get a compilation error.
  3199. *
  3200. * @return an Observable emitting the product of all the elements of the source Observable
  3201. * as its single item.
  3202. *
  3203. * @usecase def product: Observable[T]
  3204. * @inheritdoc
  3205. */
  3206. def product[U >: T](implicit num: Numeric[U]): Observable[U] = {
  3207. foldLeft(num.one)(num.times)
  3208. }
  3209.  
  3210. /**
  3211. * Returns an Observable that emits only the very first item emitted by the source Observable, or
  3212. * a default value if the source Observable is empty.
  3213. *
  3214. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrDefault.png" alt="" />
  3215. *
  3216. * @param default
  3217. * The default value to emit if the source Observable doesn't emit anything.
  3218. * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
  3219. * @return an Observable that emits only the very first item from the source, or a default value
  3220. * if the source Observable completes without emitting any item.
  3221. */
  3222. def firstOrElse[U >: T](default: => U): Observable[U] = {
  3223. take(1).singleOrElse(default)
  3224. }
  3225.  
  3226. /**
  3227. * Returns an Observable that emits only an `Option` with the very first item emitted by the source Observable,
  3228. * or `None` if the source Observable is empty.
  3229. *
  3230. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrDefault.png" alt="" />
  3231. *
  3232. * @return an Observable that emits only an `Option` with the very first item from the source, or `None`
  3233. * if the source Observable completes without emitting any item.
  3234. */
  3235. def headOption: Observable[Option[T]] = {
  3236. take(1).singleOption
  3237. }
  3238.  
  3239. /**
  3240. * Returns an Observable that emits only the very first item emitted by the source Observable, or
  3241. * a default value if the source Observable is empty.
  3242. *
  3243. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrDefault.png" alt="" />
  3244. *
  3245. * @param default
  3246. * The default value to emit if the source Observable doesn't emit anything.
  3247. * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
  3248. * @return an Observable that emits only the very first item from the source, or a default value
  3249. * if the source Observable completes without emitting any item.
  3250. */
  3251. def headOrElse[U >: T](default: => U): Observable[U] = firstOrElse(default)
  3252.  
  3253. /**
  3254. * Returns an Observable that emits only the very first item emitted by the source Observable, or raises an
  3255. * `NoSuchElementException` if the source Observable is empty.
  3256. * <p>
  3257. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/first.png" alt="" />
  3258. *
  3259. * @return an Observable that emits only the very first item emitted by the source Observable, or raises an
  3260. * `NoSuchElementException` if the source Observable is empty
  3261. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Filtering-Observables#wiki-first">RxJava Wiki: first()</a>
  3262. * @see "MSDN: Observable.firstAsync()"
  3263. */
  3264. def first: Observable[T] = {
  3265. toScalaObservable[T](asJavaObservable.first)
  3266. }
  3267.  
  3268. /**
  3269. * Returns an Observable that emits only the very first item emitted by the source Observable, or raises an
  3270. * `NoSuchElementException` if the source Observable is empty.
  3271. * <p>
  3272. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/first.png" alt="" />
  3273. *
  3274. * @return an Observable that emits only the very first item emitted by the source Observable, or raises an
  3275. * `NoSuchElementException` if the source Observable is empty
  3276. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Filtering-Observables#wiki-first">RxJava Wiki: first()</a>
  3277. * @see "MSDN: Observable.firstAsync()"
  3278. * @see [[Observable.first]]
  3279. */
  3280. def head: Observable[T] = first
  3281.  
  3282. /**
  3283. * Returns an Observable that emits all items except the first one, or raises an `UnsupportedOperationException`
  3284. * if the source Observable is empty.
  3285. *
  3286. * @return an Observable that emits all items except the first one, or raises an `UnsupportedOperationException`
  3287. * if the source Observable is empty.
  3288. */
  3289. def tail: Observable[T] = {
  3290. switchIfEmpty(Observable.error(new UnsupportedOperationException("tail of empty Observable"))).drop(1)
  3291. }
  3292.  
  3293. /**
  3294. * Returns an Observable that emits the last item emitted by the source Observable or notifies observers of
  3295. * an `NoSuchElementException` if the source Observable is empty.
  3296. *
  3297. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/last.png" alt="" />
  3298. *
  3299. * @return an Observable that emits the last item from the source Observable or notifies observers of an
  3300. * error
  3301. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Filtering-Observable-Operators#wiki-last">RxJava Wiki: last()</a>
  3302. * @see "MSDN: Observable.lastAsync()"
  3303. */
  3304. def last: Observable[T] = {
  3305. toScalaObservable[T](asJavaObservable.last)
  3306. }
  3307.  
  3308. /**
  3309. * Returns an Observable that emits only an `Option` with the last item emitted by the source Observable,
  3310. * or `None` if the source Observable completes without emitting any items.
  3311. *
  3312. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/lastOrDefault.png" alt="" />
  3313. *
  3314. * @return an Observable that emits only an `Option` with the last item emitted by the source Observable,
  3315. * or `None` if the source Observable is empty
  3316. */
  3317. def lastOption: Observable[Option[T]] = {
  3318. takeRight(1).singleOption
  3319. }
  3320.  
  3321. /**
  3322. * Returns an Observable that emits only the last item emitted by the source Observable, or a default item
  3323. * if the source Observable completes without emitting any items.
  3324. *
  3325. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/lastOrDefault.png" alt="" />
  3326. *
  3327. * @param default the default item to emit if the source Observable is empty.
  3328. * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
  3329. * @return an Observable that emits only the last item emitted by the source Observable, or a default item
  3330. * if the source Observable is empty
  3331. */
  3332. def lastOrElse[U >: T](default: => U): Observable[U] = {
  3333. takeRight(1).singleOrElse(default)
  3334. }
  3335.  
  3336. /**
  3337. * If the source Observable completes after emitting a single item, return an Observable that emits that
  3338. * item. If the source Observable emits more than one item or no items, notify of an `IllegalArgumentException`
  3339. * or `NoSuchElementException` respectively.
  3340. *
  3341. * <img width="640" height="315" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/single.png" alt="" />
  3342. *
  3343. * @return an Observable that emits the single item emitted by the source Observable
  3344. * @throws java.lang.IllegalArgumentException if the source emits more than one item
  3345. * @throws java.util.NoSuchElementException if the source emits no items
  3346. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#wiki-single-and-singleordefault">RxJava Wiki: single()</a>
  3347. * @see "MSDN: Observable.singleAsync()"
  3348. */
  3349. def single: Observable[T] = {
  3350. toScalaObservable[T](asJavaObservable.single)
  3351. }
  3352.  
  3353. /**
  3354. * If the source Observable completes after emitting a single item, return an Observable that emits an `Option`
  3355. * with that item; if the source Observable is empty, return an Observable that emits `None`.
  3356. * If the source Observable emits more than one item, throw an `IllegalArgumentException`.
  3357. *
  3358. * <img width="640" height="315" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/singleOrDefault.png" alt="" />
  3359. *
  3360. * @return an Observable that emits an `Option` with the single item emitted by the source Observable, or
  3361. * `None` if the source Observable is empty
  3362. * @throws java.lang.IllegalArgumentException if the source Observable emits more than one item
  3363. */
  3364. def singleOption: Observable[Option[T]] = {
  3365. val jObservableOption = map(Some(_)).asJavaObservable.asInstanceOf[rx.Observable[Option[T]]]
  3366. toScalaObservable[Option[T]](jObservableOption.singleOrDefault(None))
  3367. }
  3368.  
  3369. /**
  3370. * If the source Observable completes after emitting a single item, return an Observable that emits that
  3371. * item; if the source Observable is empty, return an Observable that emits a default item. If the source
  3372. * Observable emits more than one item, throw an `IllegalArgumentException`.
  3373. *
  3374. * <img width="640" height="315" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/singleOrDefault.png" alt="" />
  3375. *
  3376. * @param default a default value to emit if the source Observable emits no item.
  3377. * This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
  3378. * @return an Observable that emits the single item emitted by the source Observable, or a default item if
  3379. * the source Observable is empty
  3380. * @throws java.lang.IllegalArgumentException if the source Observable emits more than one item
  3381. */
  3382. def singleOrElse[U >: T](default: => U): Observable[U] = {
  3383. singleOption.map {
  3384. case Some(element) => element
  3385. case None => default
  3386. }
  3387. }
  3388.  
  3389. /**
  3390. * Returns an Observable that emits the items emitted by the source Observable or a specified default item
  3391. * if the source Observable is empty.
  3392. *
  3393. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/defaultIfEmpty.png" alt="" />
  3394. *
  3395. * @param default the item to emit if the source Observable emits no items. This is a by-name parameter, so it is
  3396. * only evaluated if the source Observable doesn't emit anything.
  3397. * @return an Observable that emits either the specified default item if the source Observable emits no
  3398. * items, or the items emitted by the source Observable
  3399. */
  3400. def orElse[U >: T](default: => U): Observable[U] = {
  3401. val jObservableOption = map(Some(_)).asJavaObservable.asInstanceOf[rx.Observable[Option[T]]]
  3402. val o = toScalaObservable[Option[T]](jObservableOption.defaultIfEmpty(None))
  3403. o map {
  3404. case Some(element) => element
  3405. case None => default
  3406. }
  3407. }
  3408.  
  3409. /**
  3410. * Returns an Observable that forwards all sequentially distinct items emitted from the source Observable.
  3411. *
  3412. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/distinctUntilChanged.png" alt="" />
  3413. *
  3414. * @return an Observable of sequentially distinct items
  3415. */
  3416. def distinctUntilChanged: Observable[T] = {
  3417. toScalaObservable[T](asJavaObservable.distinctUntilChanged)
  3418. }
  3419.  
  3420. /**
  3421. * Returns an Observable that forwards all items emitted from the source Observable that are sequentially
  3422. * distinct according to a key selector function.
  3423. *
  3424. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/distinctUntilChanged.key.png" alt="" />
  3425. *
  3426. * @param keySelector
  3427. * a function that projects an emitted item to a key value which is used for deciding whether an item is sequentially
  3428. * distinct from another one or not
  3429. * @return an Observable of sequentially distinct items
  3430. */
  3431. def distinctUntilChanged[U](keySelector: T => U): Observable[T] = {
  3432. toScalaObservable[T](asJavaObservable.distinctUntilChanged[U](keySelector))
  3433. }
  3434.  
  3435. /**
  3436. * $experimental Returns an [[Observable]] that emits all items emitted by the source [[Observable]] that are distinct from their
  3437. * immediate predecessors when compared with each other via the provided comparator function.
  3438. *
  3439. * <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/distinctUntilChanged.png" alt="" />
  3440. *
  3441. * ===Backpressure:===
  3442. * The operator doesn't interfere with backpressure which is determined by the source [[Observable]]'s backpressure behavior.
  3443. *
  3444. * $noDefaultScheduler
  3445. *
  3446. * @param comparator the function that receives the previous item and the current item and is
  3447. * expected to return true if the two are equal, thus skipping the current value.
  3448. * @return an [[Observable]] that emits those items from the source Observable that are distinct from their immediate predecessors
  3449. * @see <a href="http://reactivex.io/documentation/operators/distinct.html">ReactiveX operators documentation: Distinct</a>
  3450. */
  3451. @Experimental
  3452. def distinctUntilChanged[U](comparator: (T, T) => Boolean): Observable[T] = {
  3453. toScalaObservable[T](asJavaObservable.distinctUntilChanged(comparator))
  3454. }
  3455.  
  3456. /**
  3457. * Returns an Observable that forwards all distinct items emitted from the source Observable.
  3458. *
  3459. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/distinct.png" alt="" />
  3460. *
  3461. * @return an Observable of distinct items
  3462. */
  3463. def distinct: Observable[T] = {
  3464. toScalaObservable[T](asJavaObservable.distinct())
  3465. }
  3466.  
  3467. /**
  3468. * Returns an Observable that forwards all items emitted from the source Observable that are distinct according
  3469. * to a key selector function.
  3470. *
  3471. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/distinct.key.png" alt="" />
  3472. *
  3473. * @param keySelector
  3474. * a function that projects an emitted item to a key value which is used for deciding whether an item is
  3475. * distinct from another one or not
  3476. * @return an Observable of distinct items
  3477. */
  3478. def distinct[U](keySelector: T => U): Observable[T] = {
  3479. toScalaObservable[T](asJavaObservable.distinct[U](keySelector))
  3480. }
  3481.  
  3482. /**
  3483. * Returns an Observable that counts the total number of elements in the source Observable.
  3484. *
  3485. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/count.png" alt="" />
  3486. *
  3487. * @return an Observable emitting the number of counted elements of the source Observable
  3488. * as its single item.
  3489. */
  3490. def length: Observable[Int] = {
  3491. toScalaObservable[Integer](asJavaObservable.count()).map(_.intValue())
  3492. }
  3493.  
  3494. /**
  3495. * Returns an Observable that counts the total number of elements in the source Observable.
  3496. *
  3497. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/count.png" alt="" />
  3498. *
  3499. * @return an Observable emitting the number of counted elements of the source Observable
  3500. * as its single item.
  3501. */
  3502. def size: Observable[Int] = length
  3503.  
  3504. /**
  3505. * Retry subscription to origin Observable upto given retry count.
  3506. *
  3507. * <img width="640" height="315" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.png" alt="" />
  3508. *
  3509. * If [[rx.lang.scala.Observer.onError]] is invoked the source Observable will be re-subscribed to as many times as defined by retryCount.
  3510. *
  3511. * Any [[rx.lang.scala.Observer.onNext]] calls received on each attempt will be emitted and concatenated together.
  3512. *
  3513. * For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and
  3514. * emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
  3515. *
  3516. * @param retryCount
  3517. * Number of retry attempts before failing.
  3518. * @return Observable with retry logic.
  3519. */
  3520. def retry(retryCount: Long): Observable[T] = {
  3521. toScalaObservable[T](asJavaObservable.retry(retryCount))
  3522. }
  3523.  
  3524. /**
  3525. * Retry subscription to origin Observable whenever onError is called (infinite retry count).
  3526. *
  3527. * <img width="640" height="315" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.png" alt="" />
  3528. *
  3529. * If [[rx.lang.scala.Observer.onError]] is invoked the source Observable will be re-subscribed to.
  3530. *
  3531. * Any [[rx.lang.scala.Observer.onNext]] calls received on each attempt will be emitted and concatenated together.
  3532. *
  3533. * For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and
  3534. * emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
  3535. * @return Observable with retry logic.
  3536. */
  3537. def retry: Observable[T] = {
  3538. toScalaObservable[T](asJavaObservable.retry())
  3539. }
  3540.  
  3541. /**
  3542. * Returns an Observable that mirrors the source Observable, resubscribing to it if it calls `onError`
  3543. * and the predicate returns true for that specific exception and retry count.
  3544. *
  3545. * <img width="640" height="315" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.png" alt="" />
  3546. *
  3547. * @param predicate the predicate that determines if a resubscription may happen in case of a specific exception and retry count
  3548. * @return the source Observable modified with retry logic
  3549. */
  3550. def retry(predicate: (Int, Throwable) => Boolean): Observable[T] = {
  3551. val f = new Func2[java.lang.Integer, Throwable, java.lang.Boolean] {
  3552. def call(times: java.lang.Integer, e: Throwable): java.lang.Boolean = predicate(times, e)
  3553. }
  3554. toScalaObservable[T](asJavaObservable.retry(f))
  3555. }
  3556.  
  3557. /**
  3558. * Returns an Observable that emits the same values as the source observable with the exception of an
  3559. * `onError`. An `onError` notification from the source will result in the emission of a
  3560. * `Throwable` to the Observable provided as an argument to the `notificationHandler`
  3561. * function. If the Observable returned `onCompletes` or `onErrors` then `retry` will call
  3562. * `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will
  3563. * resubscribe to the source Observable.
  3564. * <p>
  3565. * <img width="640" height="430" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/retryWhen.f.png" alt="" />
  3566. *
  3567. * Example:
  3568. *
  3569. * This retries 3 times, each time incrementing the number of seconds it waits.
  3570. *
  3571. * @example
  3572. *
  3573. * This retries 3 times, each time incrementing the number of seconds it waits.
  3574. *
  3575. * {{{
  3576. * Observable[String]({ subscriber =>
  3577. * println("subscribing")
  3578. * subscriber.onError(new RuntimeException("always fails"))
  3579. * }).retryWhen({ throwableObservable =>
  3580. * throwableObservable.zipWith(Observable.from(1 to 3))((t, i) => i).flatMap(i => {
  3581. * println("delay retry by " + i + " second(s)")
  3582. * Observable.timer(Duration(i, TimeUnit.SECONDS))
  3583. * })
  3584. * }).toBlocking.foreach(s => println(s))
  3585. * }}}
  3586. *
  3587. * Output is:
  3588. *
  3589. * {{{
  3590. * subscribing
  3591. * delay retry by 1 second(s)
  3592. * subscribing
  3593. * delay retry by 2 second(s)
  3594. * subscribing
  3595. * delay retry by 3 second(s)
  3596. * subscribing
  3597. * }}}
  3598. *
  3599. * <dl>
  3600. * <dt><b>Scheduler:</b></dt>
  3601. * <dd>`retryWhen` operates by default on the `trampoline` [[Scheduler]].</dd>
  3602. * </dl>
  3603. *
  3604. * @param notificationHandler receives an Observable of a Throwable with which a user can complete or error, aborting the
  3605. * retry
  3606. * @return the source Observable modified with retry logic
  3607. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
  3608. * @see RxScalaDemo.retryWhenDifferentExceptionsExample for a more intricate example
  3609. * @since 0.20
  3610. */
  3611. def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any]): Observable[T] = {
  3612. val f: Func1[_ >: rx.Observable[_ <: Throwable], _ <: rx.Observable[_ <: Any]] =
  3613. (jOt: rx.Observable[_ <: Throwable]) => {
  3614. val ot = toScalaObservable[Throwable](jOt)
  3615. notificationHandler(ot).asJavaObservable
  3616. }
  3617.  
  3618. toScalaObservable[T](asJavaObservable.retryWhen(f))
  3619. }
  3620.  
  3621. /**
  3622. * Returns an Observable that emits the same values as the source observable with the exception of an `onError`.
  3623. * An onError will emit a `Throwable` to the Observable provided as an argument to the `notificationHandler`
  3624. * function. If the Observable returned `onCompletes` or `onErrors` then retry will call `onCompleted`
  3625. * or `onError` on the child subscription. Otherwise, this observable will resubscribe to the source observable, on a particular Scheduler.
  3626. * <p>
  3627. * <img width="640" height="430" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/retryWhen.f.png" alt="" />
  3628. * <p>
  3629. * <dl>
  3630. * <dt><b>Scheduler:</b></dt>
  3631. * <dd>you specify which [[Scheduler]] this operator will use</dd>
  3632. * </dl>
  3633. *
  3634. * @param notificationHandler receives an Observable of a Throwable with which a user can complete or error, aborting
  3635. * the retry
  3636. * @param scheduler the Scheduler on which to subscribe to the source Observable
  3637. * @return the source Observable modified with retry logic
  3638. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#retrywhen">RxJava Wiki: retryWhen()</a>
  3639. * @see RxScalaDemo.retryWhenDifferentExceptionsExample for a more intricate example
  3640. * @since 0.20
  3641. */
  3642. def retryWhen(notificationHandler: Observable[Throwable] => Observable[Any], scheduler: Scheduler): Observable[T] = {
  3643. val f: Func1[_ >: rx.Observable[_ <: Throwable], _ <: rx.Observable[_ <: Any]] =
  3644. (jOt: rx.Observable[_ <: Throwable]) => {
  3645. val ot = toScalaObservable[Throwable](jOt)
  3646. notificationHandler(ot).asJavaObservable
  3647. }
  3648.  
  3649. toScalaObservable[T](asJavaObservable.retryWhen(f, scheduler))
  3650. }
  3651.  
  3652. /**
  3653. * Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely.
  3654. * <p>
  3655. * <img width="640" height="309" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/repeat.o.png" alt="" />
  3656. *
  3657. * @return an Observable that emits the items emitted by the source Observable repeatedly and in sequence
  3658. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#wiki-repeat">RxJava Wiki: repeat()</a>
  3659. * @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
  3660. */
  3661. def repeat: Observable[T] = {
  3662. toScalaObservable[T](asJavaObservable.repeat())
  3663. }
  3664.  
  3665. /**
  3666. * Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely,
  3667. * on a particular Scheduler.
  3668. * <p>
  3669. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/repeat.os.png" alt="" />
  3670. *
  3671. * @param scheduler the Scheduler to emit the items on
  3672. * @return an Observable that emits the items emitted by the source Observable repeatedly and in sequence
  3673. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#wiki-repeat">RxJava Wiki: repeat()</a>
  3674. * @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
  3675. */
  3676. def repeat(scheduler: Scheduler): Observable[T] = {
  3677. toScalaObservable[T](asJavaObservable.repeat(scheduler))
  3678. }
  3679.  
  3680. /**
  3681. * Returns an Observable that repeats the sequence of items emitted by the source Observable at most `count` times.
  3682. * <p>
  3683. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/repeat.on.png" alt="" />
  3684. *
  3685. * @param count the number of times the source Observable items are repeated,
  3686. * a count of 0 will yield an empty sequence
  3687. * @return an Observable that repeats the sequence of items emitted by the source Observable at most `count` times
  3688. * @throws java.lang.IllegalArgumentException if `count` is less than zero
  3689. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#wiki-repeat">RxJava Wiki: repeat()</a>
  3690. * @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
  3691. */
  3692. def repeat(count: Long): Observable[T] = {
  3693. toScalaObservable[T](asJavaObservable.repeat(count))
  3694. }
  3695.  
  3696. /**
  3697. * Returns an Observable that repeats the sequence of items emitted by the source Observable
  3698. * at most `count` times, on a particular Scheduler.
  3699. * <p>
  3700. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/repeat.ons.png" alt="" />
  3701. *
  3702. * @param count the number of times the source Observable items are repeated,
  3703. * a count of 0 will yield an empty sequence
  3704. * @param scheduler the `Scheduler` to emit the items on
  3705. * @return an Observable that repeats the sequence of items emitted by the source Observable at most `count` times
  3706. * on a particular Scheduler
  3707. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#wiki-repeat">RxJava Wiki: repeat()</a>
  3708. * @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
  3709. */
  3710. def repeat(count: Long, scheduler: Scheduler): Observable[T] = {
  3711. toScalaObservable[T](asJavaObservable.repeat(count, scheduler))
  3712. }
  3713.  
  3714. /**
  3715. * Returns an Observable that emits the same values as the source Observable with the exception of an
  3716. * `onCompleted`. An `onCompleted` notification from the source will result in the emission of
  3717. * a `scala.Unit` to the Observable provided as an argument to the `notificationHandler`
  3718. * function. If the Observable returned `onCompletes` or `onErrors` then `repeatWhen` will
  3719. * call `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will
  3720. * resubscribe to the source Observable, on a particular Scheduler.
  3721. * <p>
  3722. * <img width="640" height="430" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/repeatWhen.f.png" alt="" />
  3723. * <dl>
  3724. * <dt><b>Scheduler:</b></dt>
  3725. * <dd>you specify which [[Scheduler]] this operator will use</dd>
  3726. * </dl>
  3727. *
  3728. * @param notificationHandler receives an Observable of a Unit with which a user can complete or error, aborting the repeat.
  3729. * @param scheduler the Scheduler to emit the items on
  3730. * @return the source Observable modified with repeat logic
  3731. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
  3732. * @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
  3733. * @since 0.20
  3734. */
  3735. def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any], scheduler: Scheduler): Observable[T] = {
  3736. val f: Func1[_ >: rx.Observable[_ <: Void], _ <: rx.Observable[_ <: Any]] =
  3737. (jOv: rx.Observable[_ <: Void]) => {
  3738. val ov = toScalaObservable[Void](jOv)
  3739. notificationHandler(ov.map( _ => Unit )).asJavaObservable
  3740. }
  3741.  
  3742. toScalaObservable[T](asJavaObservable.repeatWhen(f, scheduler))
  3743. }
  3744.  
  3745. /**
  3746. * Returns an Observable that emits the same values as the source Observable with the exception of an
  3747. * `onCompleted`. An `onCompleted` notification from the source will result in the emission of
  3748. * a `scala.Unit` to the Observable provided as an argument to the `notificationHandler`
  3749. * function. If the Observable returned `onCompletes` or `onErrors` then `repeatWhen` will
  3750. * call `onCompleted` or `onError` on the child subscription. Otherwise, this Observable will
  3751. * resubscribe to the source observable.
  3752. * <p>
  3753. * <img width="640" height="430" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/repeatWhen.f.png" alt="" />
  3754. *
  3755. * @example
  3756. *
  3757. * This repeats 3 times, each time incrementing the number of seconds it waits.
  3758. *
  3759. * {{{
  3760. * Observable[String]({ subscriber =>
  3761. * println("subscribing")
  3762. * subscriber.onCompleted()
  3763. * }).repeatWhen({ unitObservable =>
  3764. * unitObservable.zipWith(Observable.from(1 to 3))((u, i) => i).flatMap(i => {
  3765. * println("delay repeat by " + i + " second(s)")
  3766. * Observable.timer(Duration(i, TimeUnit.SECONDS))
  3767. * })
  3768. * }).toBlocking.foreach(s => println(s))
  3769. * }}}
  3770. *
  3771. * Output is:
  3772. *
  3773. * {{{
  3774. * subscribing
  3775. * delay repeat by 1 second(s)
  3776. * subscribing
  3777. * delay repeat by 2 second(s)
  3778. * subscribing
  3779. * delay repeat by 3 second(s)
  3780. * subscribing
  3781. * }}}
  3782. *
  3783. * <dl>
  3784. * <dt><b>Scheduler:</b></dt>
  3785. * <dd>`repeatWhen` operates by default on the `trampoline` [[Scheduler]].</dd>
  3786. * </dl>
  3787. *
  3788. * @param notificationHandler receives an Observable of a Unit with which a user can complete or error, aborting the repeat.
  3789. * @return the source Observable modified with repeat logic
  3790. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#repeatwhen">RxJava Wiki: repeatWhen()</a>
  3791. * @see <a href="http://msdn.microsoft.com/en-us/library/hh229428.aspx">MSDN: Observable.Repeat</a>
  3792. * @since 0.20
  3793. */
  3794. def repeatWhen(notificationHandler: Observable[Unit] => Observable[Any]): Observable[T] = {
  3795. val f: Func1[_ >: rx.Observable[_ <: Void], _ <: rx.Observable[_ <: Any]] =
  3796. (jOv: rx.Observable[_ <: Void]) => {
  3797. val ov = toScalaObservable[Void](jOv)
  3798. notificationHandler(ov.map( _ => Unit )).asJavaObservable
  3799. }
  3800.  
  3801. toScalaObservable[T](asJavaObservable.repeatWhen(f))
  3802. }
  3803.  
  3804. /**
  3805. * Converts an Observable into a [[rx.lang.scala.observables.BlockingObservable BlockingObservable]] (an Observable with blocking
  3806. * operators).
  3807. *
  3808. * @return a [[rx.lang.scala.observables.BlockingObservable BlockingObservable]] version of this Observable
  3809. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators">Blocking Observable Operators</a>
  3810. * @since 0.19
  3811. */
  3812. def toBlocking: BlockingObservable[T] = {
  3813. new BlockingObservable[T](this)
  3814. }
  3815.  
  3816. /**
  3817. * $experimental Converts an [[Observable]] into a [[rx.lang.scala.observables.ErrorDelayingObservable ErrorDelayingObservable]]
  3818. * that provides operators which delay errors when composing multiple [[Observable]]s.
  3819. */
  3820. @Experimental
  3821. def delayError: ErrorDelayingObservable[T] = {
  3822. new ErrorDelayingObservable[T](this)
  3823. }
  3824.  
  3825. /** Tests whether a predicate holds for some of the elements of this `Observable`.
  3826. *
  3827. * @param p the predicate used to test elements.
  3828. * @return an Observable emitting one single Boolean, which is `true` if the given predicate `p`
  3829. * holds for some of the elements of this Observable, and `false` otherwise.
  3830. */
  3831. def exists(p: T => Boolean): Observable[Boolean] = {
  3832. toScalaObservable[java.lang.Boolean](asJavaObservable.exists(p)).map(_.booleanValue())
  3833. }
  3834.  
  3835. /** Tests whether this `Observable` emits no elements.
  3836. *
  3837. * @return an Observable emitting one single Boolean, which is `true` if this `Observable`
  3838. * emits no elements, and `false` otherwise.
  3839. */
  3840. def isEmpty: Observable[Boolean] = {
  3841. toScalaObservable[java.lang.Boolean](asJavaObservable.isEmpty()).map(_.booleanValue())
  3842. }
  3843.  
  3844. def withFilter(p: T => Boolean): WithFilter[T] = {
  3845. new WithFilter[T](p, asJavaObservable)
  3846. }
  3847.  
  3848. /**
  3849. * Returns an Observable that applies the given function to each item emitted by an
  3850. * Observable.
  3851. *
  3852. * @param observer the observer
  3853. *
  3854. * @return an Observable with the side-effecting behavior applied.
  3855. */
  3856. def doOnEach(observer: Observer[T]): Observable[T] = {
  3857. toScalaObservable[T](asJavaObservable.doOnEach(observer.asJavaObserver))
  3858. }
  3859.  
  3860. /**
  3861. * Invokes an action when the source Observable calls <code>onNext</code>.
  3862. *
  3863. * @param onNext the action to invoke when the source Observable calls <code>onNext</code>
  3864. * @return the source Observable with the side-effecting behavior applied
  3865. */
  3866. def doOnNext(onNext: T => Unit): Observable[T] = {
  3867. toScalaObservable[T](asJavaObservable.doOnNext(onNext))
  3868. }
  3869.  
  3870. /**
  3871. * Invokes an action if the source Observable calls `onError`.
  3872. *
  3873. * @param onError the action to invoke if the source Observable calls
  3874. * `onError`
  3875. * @return the source Observable with the side-effecting behavior applied
  3876. */
  3877. def doOnError(onError: Throwable => Unit): Observable[T] = {
  3878. toScalaObservable[T](asJavaObservable.doOnError(onError))
  3879. }
  3880.  
  3881. /**
  3882. * Invokes an action when the source Observable calls `onCompleted`.
  3883. *
  3884. * @param onCompleted the action to invoke when the source Observable calls
  3885. * `onCompleted`
  3886. * @return the source Observable with the side-effecting behavior applied
  3887. */
  3888. def doOnCompleted(onCompleted: => Unit): Observable[T] = {
  3889. toScalaObservable[T](asJavaObservable.doOnCompleted(() => onCompleted))
  3890. }
  3891.  
  3892. /**
  3893. * Returns an Observable that applies the given function to each item emitted by an
  3894. * Observable.
  3895. *
  3896. * @param onNext this function will be called whenever the Observable emits an item
  3897. *
  3898. * @return an Observable with the side-effecting behavior applied.
  3899. */
  3900. def doOnEach(onNext: T => Unit): Observable[T] = {
  3901. toScalaObservable[T](asJavaObservable.doOnNext(onNext))
  3902. }
  3903.  
  3904. /**
  3905. * Returns an Observable that applies the given function to each item emitted by an
  3906. * Observable.
  3907. *
  3908. * @param onNext this function will be called whenever the Observable emits an item
  3909. * @param onError this function will be called if an error occurs
  3910. *
  3911. * @return an Observable with the side-effecting behavior applied.
  3912. */
  3913. def doOnEach(onNext: T => Unit, onError: Throwable => Unit): Observable[T] = {
  3914. toScalaObservable[T](asJavaObservable.doOnEach(Observer(onNext, onError, ()=>{})))
  3915. }
  3916.  
  3917. /**
  3918. * Returns an Observable that applies the given function to each item emitted by an
  3919. * Observable.
  3920. *
  3921. * @param onNext this function will be called whenever the Observable emits an item
  3922. * @param onError this function will be called if an error occurs
  3923. * @param onCompleted the action to invoke when the source Observable calls
  3924. *
  3925. * @return an Observable with the side-effecting behavior applied.
  3926. */
  3927. def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observable[T] = {
  3928. toScalaObservable[T](asJavaObservable.doOnEach(Observer(onNext, onError,onCompleted)))
  3929. }
  3930.  
  3931. /**
  3932. * Modifies the source `Observable` so that it invokes the given action when it is subscribed from
  3933. * its subscribers. Each subscription will result in an invocation of the given action except when the
  3934. * source `Observable` is reference counted, in which case the source `Observable` will invoke
  3935. * the given action for the first subscription.
  3936. * <p>
  3937. * <img width="640" height="390" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnSubscribe.png" alt="" />
  3938. * <dl>
  3939. * <dt><b>Scheduler:</b></dt>
  3940. * <dd>`onSubscribe` does not operate by default on a particular `Scheduler`.</dd>
  3941. * </dl>
  3942. *
  3943. * @param onSubscribe
  3944. * the action that gets called when an observer subscribes to this `Observable`
  3945. * @return the source `Observable` modified so as to call this Action when appropriate
  3946. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#doonsubscribe">RxJava wiki: doOnSubscribe</a>
  3947. * @since 0.20
  3948. */
  3949. def doOnSubscribe(onSubscribe: => Unit): Observable[T] = {
  3950. toScalaObservable[T](asJavaObservable.doOnSubscribe(() => onSubscribe))
  3951. }
  3952.  
  3953. /**
  3954. * Modifies an Observable so that it invokes an action when it calls `onCompleted` or `onError`.
  3955. * <p>
  3956. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.png" alt="" />
  3957. * <p>
  3958. * This differs from [[Observable.doAfterTerminate doAfterTerminate]] in that this happens **before** onCompleted/onError` are emitted.
  3959. *
  3960. * @param onTerminate the action to invoke when the source Observable calls `onCompleted` or `onError`
  3961. * @return the source Observable with the side-effecting behavior applied
  3962. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#wiki-doonterminate">RxJava Wiki: doOnTerminate()</a>
  3963. * @see <a href="http://msdn.microsoft.com/en-us/library/hh229804.aspx">MSDN: Observable.Do</a>
  3964. */
  3965. def doOnTerminate(onTerminate: => Unit): Observable[T] = {
  3966. toScalaObservable[T](asJavaObservable.doOnTerminate(() => onTerminate))
  3967. }
  3968.  
  3969. /**
  3970. * $experimental Return a new [[Observable]] that will null out references to the upstream [[Producer]] and downstream [[Subscriber]] if
  3971. * the sequence is terminated or downstream unsubscribes.
  3972. *
  3973. * $supportBackpressure
  3974. *
  3975. * $noDefaultScheduler
  3976. *
  3977. * @return an [[Observable]] which out references to the upstream [[Producer]] and downstream [[Subscriber]] if the sequence is
  3978. * terminated or downstream unsubscribes
  3979. */
  3980. @Experimental
  3981. def onTerminateDetach: Observable[T] = {
  3982. toScalaObservable[T](asJavaObservable.onTerminateDetach())
  3983. }
  3984.  
  3985. /**
  3986. * Modifies the source `Observable` so that it invokes the given action when it is unsubscribed from
  3987. * its subscribers. Each un-subscription will result in an invocation of the given action except when the
  3988. * source `Observable` is reference counted, in which case the source `Observable` will invoke
  3989. * the given action for the very last un-subscription.
  3990. * <p>
  3991. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnUnsubscribe.png" alt="" />
  3992. * <dl>
  3993. * <dt><b>Scheduler:</b></dt>
  3994. * <dd>`doOnUnsubscribe` does not operate by default on a particular `Scheduler`.</dd>
  3995. * </dl>
  3996. *
  3997. * @param onUnsubscribe
  3998. * the action that gets called when this `Observable` is unsubscribed
  3999. * @return the source `Observable` modified so as to call this Action when appropriate
  4000. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#doonunsubscribe">RxJava wiki: doOnUnsubscribe</a>
  4001. * @since 0.20
  4002. */
  4003. def doOnUnsubscribe(onUnsubscribe: => Unit): Observable[T] = {
  4004. toScalaObservable[T](asJavaObservable.doOnUnsubscribe(() => onUnsubscribe))
  4005. }
  4006.  
  4007. /**
  4008. * Given two Observables, mirror the one that first emits an item.
  4009. *
  4010. * <img width="640" height="385" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/amb.png" alt="" />
  4011. *
  4012. * @param that
  4013. * an Observable competing to react first
  4014. * @return an Observable that emits the same sequence of items as whichever of `this` or `that` first emitted an item.
  4015. */
  4016. def amb[U >: T](that: Observable[U]): Observable[U] = {
  4017. val thisJava: rx.Observable[_ <: U] = this.asJavaObservable
  4018. val thatJava: rx.Observable[_ <: U] = that.asJavaObservable
  4019. toScalaObservable[U](rx.Observable.amb(thisJava, thatJava))
  4020. }
  4021.  
  4022. /**
  4023. * Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a
  4024. * specified delay. Error notifications from the source Observable are not delayed.
  4025. *
  4026. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.png" alt="" />
  4027. *
  4028. * @param delay the delay to shift the source by
  4029. * @return the source Observable shifted in time by the specified delay
  4030. */
  4031. def delay(delay: Duration): Observable[T] = {
  4032. toScalaObservable[T](asJavaObservable.delay(delay.length, delay.unit))
  4033. }
  4034.  
  4035. /**
  4036. * Returns an Observable that emits the items emitted by the source Observable shifted forward in time by a
  4037. * specified delay. Error notifications from the source Observable are not delayed.
  4038. *
  4039. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.s.png" alt="" />
  4040. *
  4041. * @param delay the delay to shift the source by
  4042. * @param scheduler the Scheduler to use for delaying
  4043. * @return the source Observable shifted in time by the specified delay
  4044. */
  4045. def delay(delay: Duration, scheduler: Scheduler): Observable[T] = {
  4046. toScalaObservable[T](asJavaObservable.delay(delay.length, delay.unit, scheduler))
  4047. }
  4048.  
  4049. /**
  4050. * Returns an Observable that delays the emissions of the source Observable via another Observable on a
  4051. * per-item basis.
  4052. * <p>
  4053. * <img width="640" height="450" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.o.png" alt="" />
  4054. * <p>
  4055. * Note: the resulting Observable will immediately propagate any `onError` notification
  4056. * from the source Observable.
  4057. *
  4058. * @param itemDelay a function that returns an Observable for each item emitted by the source Observable, which is
  4059. * then used to delay the emission of that item by the resulting Observable until the Observable
  4060. * returned from `itemDelay` emits an item
  4061. * @return an Observable that delays the emissions of the source Observable via another Observable on a per-item basis
  4062. */
  4063. def delay(itemDelay: T => Observable[Any]): Observable[T] = {
  4064. val itemDelayJava = new Func1[T, rx.Observable[Any]] {
  4065. override def call(t: T): rx.Observable[Any] =
  4066. itemDelay(t).asJavaObservable.asInstanceOf[rx.Observable[Any]]
  4067. }
  4068. toScalaObservable[T](asJavaObservable.delay[Any](itemDelayJava))
  4069. }
  4070.  
  4071. /**
  4072. * Returns an Observable that delays the subscription to and emissions from the souce Observable via another
  4073. * Observable on a per-item basis.
  4074. * <p>
  4075. * <img width="640" height="450" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/delay.oo.png" alt="" />
  4076. * <p>
  4077. * Note: the resulting Observable will immediately propagate any `onError` notification
  4078. * from the source Observable.
  4079. *
  4080. * @param subscriptionDelay a function that returns an Observable that triggers the subscription to the source Observable
  4081. * once it emits any item
  4082. * @param itemDelay a function that returns an Observable for each item emitted by the source Observable, which is
  4083. * then used to delay the emission of that item by the resulting Observable until the Observable
  4084. * returned from `itemDelay` emits an item
  4085. * @return an Observable that delays the subscription and emissions of the source Observable via another
  4086. * Observable on a per-item basis
  4087. */
  4088. def delay(subscriptionDelay: () => Observable[Any], itemDelay: T => Observable[Any]): Observable[T] = {
  4089. val subscriptionDelayJava = new Func0[rx.Observable[Any]] {
  4090. override def call(): rx.Observable[Any] =
  4091. subscriptionDelay().asJavaObservable.asInstanceOf[rx.Observable[Any]]
  4092. }
  4093. val itemDelayJava = new Func1[T, rx.Observable[Any]] {
  4094. override def call(t: T): rx.Observable[Any] =
  4095. itemDelay(t).asJavaObservable.asInstanceOf[rx.Observable[Any]]
  4096. }
  4097. toScalaObservable[T](asJavaObservable.delay[Any, Any](subscriptionDelayJava, itemDelayJava))
  4098. }
  4099.  
  4100. /**
  4101. * Return an Observable that delays the subscription to the source Observable by a given amount of time.
  4102. *
  4103. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/delaySubscription.png" alt="" />
  4104. *
  4105. * @param delay the time to delay the subscription
  4106. * @return an Observable that delays the subscription to the source Observable by the given amount
  4107. */
  4108. def delaySubscription(delay: Duration): Observable[T] = {
  4109. toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit))
  4110. }
  4111.  
  4112. /**
  4113. * Return an Observable that delays the subscription to the source Observable by a given amount of time,
  4114. * both waiting and subscribing on a given Scheduler.
  4115. *
  4116. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/delaySubscription.s.png" alt="" />
  4117. *
  4118. * @param delay the time to delay the subscription
  4119. * @param scheduler the Scheduler on which the waiting and subscription will happen
  4120. * @return an Observable that delays the subscription to the source Observable by a given
  4121. * amount, waiting and subscribing on the given Scheduler
  4122. */
  4123. def delaySubscription(delay: Duration, scheduler: Scheduler): Observable[T] = {
  4124. toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit, scheduler))
  4125. }
  4126.  
  4127. /**
  4128. * Returns an Observable that delays the subscription to the source Observable until a second Observable
  4129. * emits an item.
  4130. * <p>
  4131. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/delaySubscription.o.png" alt="" />
  4132. * <dl>
  4133. * <dt><b>Scheduler:</b></dt>
  4134. * <dd>This version of `delay` operates by default on the `computation` `Scheduler`.</dd>
  4135. * </dl>
  4136. *
  4137. * @param subscriptionDelay
  4138. * a function that returns an Observable that triggers the subscription to the source Observable
  4139. * once it emits any item
  4140. * @return an Observable that delays the subscription to the source Observable until the Observable returned
  4141. * by `subscriptionDelay` emits an item
  4142. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#delaysubscription">RxJava wiki: delaySubscription</a>
  4143. */
  4144. def delaySubscription(subscriptionDelay: () => Observable[Any]): Observable[T] = {
  4145. val subscriptionDelayJava = new Func0[rx.Observable[Any]] {
  4146. override def call(): rx.Observable[Any] =
  4147. subscriptionDelay().asJavaObservable.asInstanceOf[rx.Observable[Any]]
  4148. }
  4149.  
  4150. toScalaObservable[T](asJavaObservable.delaySubscription(subscriptionDelayJava))
  4151. }
  4152.  
  4153. /**
  4154. * Returns an Observable that emits the single item at a specified index in a sequence of emissions from a
  4155. * source Observbable.
  4156. *
  4157. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAt.png" alt="" />
  4158. *
  4159. * @param index
  4160. * the zero-based index of the item to retrieve
  4161. * @return an Observable that emits a single item: the item at the specified position in the sequence of
  4162. * those emitted by the source Observable
  4163. * @throws java.lang.IndexOutOfBoundsException
  4164. * if index is greater than or equal to the number of items emitted by the source
  4165. * Observable, or index is less than 0
  4166. */
  4167. def elementAt(index: Int): Observable[T] = {
  4168. toScalaObservable[T](asJavaObservable.elementAt(index))
  4169. }
  4170.  
  4171. /**
  4172. * Returns an Observable that emits the item found at a specified index in a sequence of emissions from a
  4173. * source Observable, or a default item if that index is out of range.
  4174. *
  4175. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAtOrDefault.png" alt="" />
  4176. *
  4177. * @param index
  4178. * the zero-based index of the item to retrieve
  4179. * @param default
  4180. * the default item
  4181. * @return an Observable that emits the item at the specified position in the sequence emitted by the source
  4182. * Observable, or the default item if that index is outside the bounds of the source sequence
  4183. * @throws java.lang.IndexOutOfBoundsException
  4184. * if `index` is less than 0
  4185. */
  4186. def elementAtOrDefault[U >: T](index: Int, default: U): Observable[U] = {
  4187. val thisJava = asJavaObservable.asInstanceOf[rx.Observable[U]]
  4188. toScalaObservable[U](thisJava.elementAtOrDefault(index, default))
  4189. }
  4190.  
  4191. /**
  4192. * Return an Observable that emits a single `Map` containing values corresponding to items emitted by the
  4193. * source Observable, mapped by the keys returned by a specified `keySelector` function.
  4194. * <p>
  4195. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/toMap.png" alt="" />
  4196. * <p>
  4197. *
  4198. * @param keySelector the function that extracts the key from a source item to be used in the `Map`
  4199. * @param valueSelector the function that extracts the value from a source item to be used in the `Map`
  4200. * @param cbf `CanBuildFrom` to build the `Map`
  4201. * @return an Observable that emits a single item: a `Map` containing the mapped items from the source
  4202. * Observable
  4203. */
  4204. def to[M[_, _], K, V](keySelector: T => K, valueSelector: T => V)(implicit cbf: CanBuildFrom[Nothing, (K, V), M[K, V]]): Observable[M[K, V]] = {
  4205. val stateFactory = new rx.functions.Func0[mutable.Builder[(K, V), M[K, V]]] {
  4206. override def call(): mutable.Builder[(K, V), M[K, V]] = cbf()
  4207. }
  4208. val collector = new rx.functions.Action2[mutable.Builder[(K, V), M[K, V]], T] {
  4209. override def call(builder: mutable.Builder[(K, V), M[K, V]], t: T): Unit = builder += keySelector(t) -> valueSelector(t)
  4210. }
  4211. toScalaObservable(asJavaObservable.collect[mutable.Builder[(K, V), M[K, V]]](stateFactory, collector)).map(_.result)
  4212. }
  4213.  
  4214. /**
  4215. * Return an Observable that emits a single `Map` containing all pairs emitted by the source Observable.
  4216. * This method is unavailable unless the elements are members of `(K, V)`. Each `(K, V)` becomes a key-value
  4217. * pair in the map. If more than one pairs have the same key, the `Map` will contain the latest of
  4218. * those items.
  4219. *
  4220. * @return an Observable that emits a single item: an `Map` containing all pairs from the source Observable
  4221. */
  4222. def toMap[K, V](implicit ev: Observable[T] <:< Observable[(K, V)]): Observable[Map[K, V]] = {
  4223. val o: Observable[(K, V)] = this
  4224. o.toMap(_._1, _._2)
  4225. }
  4226.  
  4227. /**
  4228. * Return an Observable that emits a single `Map` containing all items emitted by the source Observable,
  4229. * mapped by the keys returned by a specified `keySelector` function.
  4230. * <p>
  4231. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/toMap.png" alt="" />
  4232. * <p>
  4233. * If more than one source item maps to the same key, the `Map` will contain the latest of those items.
  4234. *
  4235. * @param keySelector the function that extracts the key from a source item to be used in the `Map`
  4236. * @return an Observable that emits a single item: an `Map` containing the mapped items from the source
  4237. * Observable
  4238. */
  4239. def toMap[K](keySelector: T => K): Observable[Map[K, T]] = {
  4240. to[Map, K, T](keySelector, v => v)
  4241. }
  4242.  
  4243. /**
  4244. * Return an Observable that emits a single `Map` containing values corresponding to items emitted by the
  4245. * source Observable, mapped by the keys returned by a specified `keySelector` function.
  4246. * <p>
  4247. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/toMap.png" alt="" />
  4248. * <p>
  4249. * If more than one source item maps to the same key, the `Map` will contain a single entry that
  4250. * corresponds to the latest of those items.
  4251. *
  4252. * @param keySelector the function that extracts the key from a source item to be used in the `Map`
  4253. * @param valueSelector the function that extracts the value from a source item to be used in the `Map`
  4254. * @return an Observable that emits a single item: an `Map` containing the mapped items from the source
  4255. * Observable
  4256. */
  4257. def toMap[K, V](keySelector: T => K, valueSelector: T => V): Observable[Map[K, V]] = {
  4258. to[Map, K, V](keySelector, valueSelector)
  4259. }
  4260.  
  4261. /**
  4262. * Returns an Observable that emits a Boolean value that indicates whether `this` and `that` Observable sequences are the
  4263. * same by comparing the items emitted by each Observable pairwise.
  4264. * <p>
  4265. * <img width="640" height="385" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/sequenceEqual.png" alt="" />
  4266. *
  4267. * Note: this method uses `==` to compare elements. It's a bit different from RxJava which uses `Object.equals`.
  4268. *
  4269. * @param that the Observable to compare
  4270. * @return an Observable that emits a `Boolean` value that indicates whether the two sequences are the same
  4271. */
  4272. def sequenceEqual[U >: T](that: Observable[U]): Observable[Boolean] = {
  4273. sequenceEqualWith(that)(_ == _)
  4274. }
  4275.  
  4276. /**
  4277. * Returns an Observable that emits a Boolean value that indicates whether `this` and `that` Observable sequences are the
  4278. * same by comparing the items emitted by each Observable pairwise based on the results of a specified `equality` function.
  4279. * <p>
  4280. * <img width="640" height="385" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/sequenceEqual.png" alt="" />
  4281. *
  4282. * @param that the Observable to compare
  4283. * @param equality a function used to compare items emitted by each Observable
  4284. * @return an Observable that emits a `Boolean` value that indicates whether the two sequences are the same based on the `equality` function.
  4285. */
  4286. def sequenceEqualWith[U >: T](that: Observable[U])(equality: (U, U) => Boolean): Observable[Boolean] = {
  4287. val thisJava: rx.Observable[_ <: U] = this.asJavaObservable
  4288. val thatJava: rx.Observable[_ <: U] = that.asJavaObservable
  4289. val equalityJava: Func2[_ >: U, _ >: U, java.lang.Boolean] = equality
  4290. toScalaObservable[java.lang.Boolean](rx.Observable.sequenceEqual[U](thisJava, thatJava, equalityJava)).map(_.booleanValue)
  4291. }
  4292.  
  4293. /**
  4294. * Returns an Observable that emits records of the time interval between consecutive items emitted by the
  4295. * source Observable.
  4296. * <p>
  4297. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timeInterval.png" alt="" />
  4298. *
  4299. * @return an Observable that emits time interval information items
  4300. */
  4301. def timeInterval: Observable[(Duration, T)] = {
  4302. toScalaObservable(asJavaObservable.timeInterval())
  4303. .map(inv => (Duration(inv.getIntervalInMilliseconds, MILLISECONDS), inv.getValue))
  4304. }
  4305.  
  4306. /**
  4307. * Returns an Observable that emits records of the time interval between consecutive items emitted by the
  4308. * source Observable, where this interval is computed on a specified Scheduler.
  4309. * <p>
  4310. * <img width="640" height="315" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timeInterval.s.png" alt="" />
  4311. *
  4312. * @param scheduler the [[Scheduler]] used to compute time intervals
  4313. * @return an Observable that emits time interval information items
  4314. */
  4315. def timeInterval(scheduler: Scheduler): Observable[(Duration, T)] = {
  4316. toScalaObservable(asJavaObservable.timeInterval(scheduler.asJavaScheduler))
  4317. .map(inv => (Duration(inv.getIntervalInMilliseconds, MILLISECONDS), inv.getValue))
  4318. }
  4319.  
  4320. /**
  4321. * Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass
  4322. * the values of the current Observable through the Operator function.
  4323. * <p>
  4324. * In other words, this allows chaining Observers together on an Observable for acting on the values within
  4325. * the Observable.
  4326. * {{{
  4327. * observable.map(...).filter(...).take(5).lift(new OperatorA()).lift(new OperatorB(...)).subscribe()
  4328. * }}}
  4329. * If the operator you are creating is designed to act on the individual items emitted by a source
  4330. * Observable, use `lift`. If your operator is designed to transform the source Observable as a whole
  4331. * (for instance, by applying a particular set of existing RxJava operators to it) use `#compose`.
  4332. * <dl>
  4333. * <dt><b>Scheduler:</b></dt>
  4334. * <dd>`lift` does not operate by default on a particular [[Scheduler]].</dd>
  4335. * </dl>
  4336. *
  4337. * @param operator the Operator that implements the Observable-operating function to be applied to the source
  4338. * Observable
  4339. * @return an Observable that is the result of applying the lifted Operator to the source Observable
  4340. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
  4341. * @since 0.17
  4342. */
  4343. def lift[R](operator: Subscriber[R] => Subscriber[T]): Observable[R] = {
  4344. toScalaObservable(asJavaObservable.lift(toJavaOperator[T, R](operator)))
  4345. }
  4346.  
  4347. /**
  4348. * Converts the source `Observable[T]` into an `Observable[Observable[T]]` that emits the source Observable as its single emission.
  4349. *
  4350. * <img width="640" height="350" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/nest.png" alt="" />
  4351. *
  4352. * @return an Observable that emits a single item: the source Observable
  4353. */
  4354. def nest: Observable[Observable[T]] = {
  4355. toScalaObservable(asJavaObservable.nest).map(toScalaObservable[T](_))
  4356. }
  4357.  
  4358. /**
  4359. * Subscribes to the [[Observable]] and receives notifications for each element.
  4360. *
  4361. * Alias to `subscribe(T => Unit)`.
  4362. *
  4363. * $noDefaultScheduler
  4364. *
  4365. * @param onNext function to execute for each item.
  4366. * @throws java.lang.IllegalArgumentException if `onNext` is null
  4367. * @throws rx.exceptions.OnErrorNotImplementedException if the [[Observable]] tries to call `onError`
  4368. * @since 0.19
  4369. * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
  4370. */
  4371. def foreach(onNext: T => Unit): Unit = {
  4372. asJavaObservable.subscribe(onNext)
  4373. }
  4374.  
  4375. /**
  4376. * Subscribes to the [[Observable]] and receives notifications for each element and error events.
  4377. *
  4378. * Alias to `subscribe(T => Unit, Throwable => Unit)`.
  4379. *
  4380. * $noDefaultScheduler
  4381. *
  4382. * @param onNext function to execute for each item.
  4383. * @param onError function to execute when an error is emitted.
  4384. * @throws java.lang.IllegalArgumentException if `onNext` is null, or if `onError` is null
  4385. * @since 0.19
  4386. * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
  4387. */
  4388. def foreach(onNext: T => Unit, onError: Throwable => Unit): Unit = {
  4389. asJavaObservable.subscribe(onNext, onError)
  4390. }
  4391.  
  4392. /**
  4393. * Subscribes to the [[Observable]] and receives notifications for each element and the terminal events.
  4394. *
  4395. * Alias to `subscribe(T => Unit, Throwable => Unit, () => Unit)`.
  4396. *
  4397. * $noDefaultScheduler
  4398. *
  4399. * @param onNext function to execute for each item.
  4400. * @param onError function to execute when an error is emitted.
  4401. * @param onComplete function to execute when completion is signalled.
  4402. * @throws java.lang.IllegalArgumentException if `onNext` is null, or if `onError` is null, or if `onComplete` is null
  4403. * @since 0.19
  4404. * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
  4405. */
  4406. def foreach(onNext: T => Unit, onError: Throwable => Unit, onComplete: () => Unit): Unit = {
  4407. asJavaObservable.subscribe(onNext, onError, onComplete)
  4408. }
  4409.  
  4410. /**
  4411. * Returns an Observable that counts the total number of items emitted by the source Observable and emits
  4412. * this count as a 64-bit Long.
  4413. * <p>
  4414. * <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/longCount.png" alt="" />
  4415. * <dl>
  4416. * <dt><b>Backpressure Support:</b></dt>
  4417. * <dd>This operator does not support backpressure because by intent it will receive all values and reduce
  4418. * them to a single `onNext`.</dd>
  4419. * <dt><b>Scheduler:</b></dt>
  4420. * <dd>`countLong` does not operate by default on a particular `Scheduler`.</dd>
  4421. * </dl>
  4422. *
  4423. * @return an Observable that emits a single item: the number of items emitted by the source Observable as a
  4424. * 64-bit Long item
  4425. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Mathematical-and-Aggregate-Operators#count-and-countlong">RxJava wiki: countLong</a>
  4426. * @see <a href="http://msdn.microsoft.com/en-us/library/hh229120.aspx">MSDN: Observable.LongCount</a>
  4427. * @see #count()
  4428. */
  4429. def countLong: Observable[Long] = {
  4430. toScalaObservable[java.lang.Long](asJavaObservable.countLong()).map(_.longValue())
  4431. }
  4432.  
  4433. /**
  4434. * Returns an Observable that emits a single `mutable.MultiMap` that contains items emitted by the
  4435. * source Observable keyed by a specified `keySelector` function. The items having the same
  4436. * key will be put into a `Set`.
  4437. *
  4438. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/toMultiMap.png" alt="" />
  4439. *
  4440. * @param keySelector the function that extracts the key from the source items to be used as key in the `mutable.MultiMap`
  4441. * @return an Observable that emits a single item: a `mutable.MultiMap` that contains items emitted by the
  4442. * source Observable keyed by a specified `keySelector` function.
  4443. */
  4444. def toMultiMap[K, V >: T](keySelector: T => K): Observable[mutable.MultiMap[K, V]] = {
  4445. toMultiMap(keySelector, k => k)
  4446. }
  4447.  
  4448. /**
  4449. * Returns an Observable that emits a single `mutable.MultiMap` that contains values extracted by a
  4450. * specified `valueSelector` function from items emitted by the source Observable, keyed by a
  4451. * specified `keySelector` function. The values having the same key will be put into a `Set`.
  4452. *
  4453. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/toMultiMap.png" alt="" />
  4454. *
  4455. * @param keySelector the function that extracts a key from the source items to be used as key in the `mutable.MultiMap`
  4456. * @param valueSelector the function that extracts a value from the source items to be used as value in the `mutable.MultiMap`
  4457. * @return an Observable that emits a single item: a `mutable.MultiMap` that contains keys and values mapped from
  4458. * the source Observable
  4459. */
  4460. def toMultiMap[K, V](keySelector: T => K, valueSelector: T => V): Observable[mutable.MultiMap[K, V]] = {
  4461. toMultiMap(keySelector, valueSelector, new mutable.HashMap[K, mutable.Set[V]] with mutable.MultiMap[K, V])
  4462. }
  4463.  
  4464. /**
  4465. * Returns an Observable that emits a single `mutable.MultiMap`, returned by a specified `multiMapFactory` function, that
  4466. * contains values extracted by a specified `valueSelector` function from items emitted by the source Observable, and
  4467. * keyed by the `keySelector` function. The values having the same key will be put into a `Set`.
  4468. *
  4469. * <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/toMultiMap.png" alt="" />
  4470. *
  4471. * @param keySelector the function that extracts a key from the source items to be used as the key in the `mutable.MultiMap`
  4472. * @param valueSelector the function that extracts a value from the source items to be used as the value in the `mutable.MultiMap`
  4473. * @param multiMapFactory a `mutable.MultiMap` instance to be used. Note: tis is a by-name parameter.
  4474. * @return an Observable that emits a single item: a `mutable.MultiMap` that contains keys and values mapped from the source Observable.
  4475. */
  4476. def toMultiMap[K, V, M <: mutable.MultiMap[K, V]](keySelector: T => K, valueSelector: T => V, multiMapFactory: => M): Observable[M] = {
  4477. val stateFactory = new rx.functions.Func0[M] {
  4478. override def call(): M = multiMapFactory
  4479. }
  4480. val collector = new rx.functions.Action2[M, T] {
  4481. override def call(mm: M, t: T): Unit = mm.addBinding(keySelector(t), valueSelector(t))
  4482. }
  4483. toScalaObservable(asJavaObservable.collect[M](stateFactory, collector))
  4484. }
  4485.  
  4486. /**
  4487. * Returns an Observable that emits a single item, a collection composed of all the items emitted by
  4488. * the source Observable.
  4489. *
  4490. * Be careful not to use this operator on Observables that emit infinite or very large numbers
  4491. * of items, as you do not have the option to unsubscribe.
  4492. *
  4493. * @tparam Col the collection type to build.
  4494. * @return an Observable that emits a single item, a collection containing all of the items emitted by
  4495. * the source Observable.
  4496. */
  4497. def to[Col[_]](implicit cbf: CanBuildFrom[Nothing, T, Col[T@uncheckedVariance]]): Observable[Col[T@uncheckedVariance]] = {
  4498. val stateFactory = new rx.functions.Func0[mutable.Builder[T, Col[T]]] {
  4499. override def call(): mutable.Builder[T, Col[T]] = cbf()
  4500. }
  4501. val collector = new rx.functions.Action2[mutable.Builder[T, Col[T]], T] {
  4502. override def call(builder: mutable.Builder[T, Col[T]], t: T): Unit = builder += t
  4503. }
  4504. toScalaObservable(asJavaObservable.collect[mutable.Builder[T, Col[T]]](stateFactory, collector)).map(_.result)
  4505. }
  4506.  
  4507. /**
  4508. * Returns an Observable that emits a single item, a `Traversable` composed of all the items emitted by
  4509. * the source Observable.
  4510. *
  4511. * Be careful not to use this operator on Observables that emit infinite or very large numbers
  4512. * of items, as you do not have the option to unsubscribe.
  4513. *
  4514. * @return an Observable that emits a single item, a `Traversable` containing all of the items emitted by
  4515. * the source Observable.
  4516. */
  4517. def toTraversable: Observable[Traversable[T]] = to[Traversable]
  4518.  
  4519. /**
  4520. * Returns an Observable that emits a single item, a `List` composed of all the items emitted by
  4521. * the source Observable.
  4522. *
  4523. * Be careful not to use this operator on Observables that emit infinite or very large numbers
  4524. * of items, as you do not have the option to unsubscribe.
  4525. *
  4526. * @return an Observable that emits a single item, a `List` containing all of the items emitted by
  4527. * the source Observable.
  4528. */
  4529. def toList: Observable[List[T]] = to[List]
  4530.  
  4531. /**
  4532. * Returns an Observable that emits a single item, an `Iterable` composed of all the items emitted by
  4533. * the source Observable.
  4534. *
  4535. * Be careful not to use this operator on Observables that emit infinite or very large numbers
  4536. * of items, as you do not have the option to unsubscribe.
  4537. *
  4538. * @return an Observable that emits a single item, an `Iterable` containing all of the items emitted by
  4539. * the source Observable.
  4540. */
  4541. def toIterable: Observable[Iterable[T]] = to[Iterable]
  4542.  
  4543. /**
  4544. * Returns an Observable that emits a single item, an `Iterator` composed of all the items emitted by
  4545. * the source Observable.
  4546. *
  4547. * Be careful not to use this operator on Observables that emit infinite or very large numbers
  4548. * of items, as you do not have the option to unsubscribe.
  4549. *
  4550. * @return an Observable that emits a single item, an `Iterator` containing all of the items emitted by
  4551. * the source Observable.
  4552. */
  4553. def toIterator: Observable[Iterator[T]] = toIterable.map(_.iterator)
  4554.  
  4555. /**
  4556. * Returns an Observable that emits a single item, a `Stream` composed of all the items emitted by
  4557. * the source Observable.
  4558. *
  4559. * Be careful not to use this operator on Observables that emit infinite or very large numbers
  4560. * of items, as you do not have the option to unsubscribe.
  4561. *
  4562. * @return an Observable that emits a single item, a `Stream` containing all of the items emitted by
  4563. * the source Observable.
  4564. */
  4565. def toStream: Observable[Stream[T]] = to[Stream]
  4566.  
  4567. /**
  4568. * Returns an Observable that emits a single item, an `IndexedSeq` composed of all the items emitted by
  4569. * the source Observable.
  4570. *
  4571. * Be careful not to use this operator on Observables that emit infinite or very large numbers
  4572. * of items, as you do not have the option to unsubscribe.
  4573. *
  4574. * @return an Observable that emits a single item, an `IndexedSeq` containing all of the items emitted by
  4575. * the source Observable.
  4576. */
  4577. def toIndexedSeq: Observable[immutable.IndexedSeq[T]] = to[immutable.IndexedSeq]
  4578.  
  4579. /**
  4580. * Returns an Observable that emits a single item, a `Vector` composed of all the items emitted by
  4581. * the source Observable.
  4582. *
  4583. * Be careful not to use this operator on Observables that emit infinite or very large numbers
  4584. * of items, as you do not have the option to unsubscribe.
  4585. *
  4586. * @return an Observable that emits a single item, a `Vector` containing all of the items emitted by
  4587. * the source Observable.
  4588. */
  4589. def toVector: Observable[Vector[T]] = to[Vector]
  4590.  
  4591. /**
  4592. * Returns an Observable that emits a single item, a `Buffer` composed of all the items emitted by
  4593. * the source Observable.
  4594. *
  4595. * Be careful not to use this operator on Observables that emit infinite or very large numbers
  4596. * of items, as you do not have the option to unsubscribe.
  4597. *
  4598. * @return an Observable that emits a single item, a `Buffer` containing all of the items emitted by
  4599. * the source Observable.
  4600. */
  4601. def toBuffer[U >: T]: Observable[mutable.Buffer[U]] = { // use U >: T because Buffer is invariant
  4602. val us: Observable[U] = this
  4603. us.to[ArrayBuffer]
  4604. }
  4605.  
  4606. /**
  4607. * Returns an Observable that emits a single item, a `Set` composed of all the items emitted by
  4608. * the source Observable.
  4609. *
  4610. * Be careful not to use this operator on Observables that emit infinite or very large numbers
  4611. * of items, as you do not have the option to unsubscribe.
  4612. *
  4613. * @return an Observable that emits a single item, a `Set` containing all of the items emitted by
  4614. * the source Observable.
  4615. */
  4616. def toSet[U >: T]: Observable[immutable.Set[U]] = { // use U >: T because Set is invariant
  4617. val us: Observable[U] = this
  4618. us.to[immutable.Set]
  4619. }
  4620.  
  4621. /**
  4622. * Returns an Observable that emits a single item, an `Array` composed of all the items emitted by
  4623. * the source Observable.
  4624. *
  4625. * Be careful not to use this operator on Observables that emit infinite or very large numbers
  4626. * of items, as you do not have the option to unsubscribe.
  4627. *
  4628. * @return an Observable that emits a single item, an `Array` containing all of the items emitted by
  4629. * the source Observable.
  4630. */
  4631. def toArray[U >: T : ClassTag]: Observable[Array[U]] = // use U >: T because Array is invariant
  4632. toBuffer[U].map(_.toArray)
  4633.  
  4634. /**
  4635. * Returns an [[Observable]] which only emits elements which do not satisfy a predicate.
  4636. *
  4637. * @param p the predicate used to test elements.
  4638. * @return Returns an [[Observable]] which only emits elements which do not satisfy a predicate.
  4639. */
  4640. def filterNot(p: T => Boolean): Observable[T] = {
  4641. filter(!p(_))
  4642. }
  4643.  
  4644. /**
  4645. * Return an [[Observable]] which emits the number of elements in the source [[Observable]] which satisfy a predicate.
  4646. *
  4647. * @param p the predicate used to test elements.
  4648. * @return an [[Observable]] which emits the number of elements in the source [[Observable]] which satisfy a predicate.
  4649. */
  4650. def count(p: T => Boolean): Observable[Int] = {
  4651. filter(p).length
  4652. }
  4653.  
  4654. /**
  4655. * Return an [[Observable]] emitting one single `Boolean`, which is `true` if the source [[Observable]] emits any element, and `false` otherwise.
  4656. *
  4657. * @return an [[Observable]] emitting one single Boolean`, which is `true` if the source [[Observable]] emits any element, and `false otherwise.
  4658. */
  4659. def nonEmpty: Observable[Boolean] = {
  4660. isEmpty.map(!_)
  4661. }
  4662.  
  4663. /**
  4664. * Instructs an Observable that is emitting items faster than its observer can consume them to buffer these
  4665. * items indefinitely until they can be emitted.
  4666. *
  4667. * <img width="640" height="300" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="" />
  4668. *
  4669. * ===Scheduler:===
  4670. * `onBackpressureBuffer` does not operate by default on a particular `Scheduler`.
  4671. *
  4672. * @return the source Observable modified to buffer items to the extent system resources allow
  4673. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
  4674. */
  4675. def onBackpressureBuffer: Observable[T] = {
  4676. toScalaObservable[T](asJavaObservable.onBackpressureBuffer)
  4677. }
  4678.  
  4679. /**
  4680. * $beta Instructs an [[Observable]] that is emitting items faster than its [[Observer]] can consume them to buffer up to
  4681. * a given amount of items until they can be emitted. The resulting [[Observable]] will emit
  4682. * `BufferOverflowException` as soon as the buffer's capacity is exceeded, drop all undelivered
  4683. * items, and unsubscribe from the source.
  4684. *
  4685. * <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
  4686. *
  4687. * $noDefaultScheduler
  4688. *
  4689. * @param capacity capacity of the internal buffer.
  4690. * @return an [[Observable]] that will buffer items up to the given capacity
  4691. * @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
  4692. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  4693. */
  4694. @Beta
  4695. def onBackpressureBuffer(capacity: Long): Observable[T] = {
  4696. asJavaObservable.onBackpressureBuffer(capacity)
  4697. }
  4698.  
  4699. /**
  4700. * $beta Instructs an [[Observable]] that is emitting items faster than its [[Observer]] can consume them to buffer up to
  4701. * a given amount of items until they can be emitted. The resulting [[Observable]] will emit
  4702. * `BufferOverflowException` as soon as the buffer's capacity is exceeded, drop all undelivered
  4703. * items, unsubscribe from the source, and notify `onOverflow`.
  4704. *
  4705. * <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
  4706. *
  4707. * $noDefaultScheduler
  4708. *
  4709. * @param capacity capacity of the internal buffer.
  4710. * @param onOverflow an action to run when the buffer's capacity is exceeded. This is a by-name parameter.
  4711. * @return the source Observable modified to buffer items up to the given capacity
  4712. * @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
  4713. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  4714. */
  4715. @Beta
  4716. def onBackpressureBuffer(capacity: Long, onOverflow: => Unit): Observable[T] = {
  4717. asJavaObservable.onBackpressureBuffer(capacity, new Action0 {
  4718. override def call(): Unit = onOverflow
  4719. })
  4720. }
  4721.  
  4722. /**
  4723. * Use this operator when the upstream does not natively support backpressure and you wish to drop
  4724. * `onNext` when unable to handle further events.
  4725. *
  4726. * <img width="640" height="245" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.drop.png" alt="" />
  4727. *
  4728. * If the downstream request count hits 0 then `onNext` will be dropped until `request(long n)`
  4729. * is invoked again to increase the request count.
  4730. *
  4731. * ===Scheduler:===
  4732. * onBackpressureDrop` does not operate by default on a particular `Scheduler`.
  4733. *
  4734. * @return the source Observable modified to drop `onNext` notifications on overflow
  4735. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
  4736. */
  4737. def onBackpressureDrop: Observable[T] = {
  4738. toScalaObservable[T](asJavaObservable.onBackpressureDrop)
  4739. }
  4740.  
  4741. /**
  4742. * $experimental Instructs an [[Observable]] that is emitting items faster than its observer can consume them to discard,
  4743. * rather than emit, those items that its observer is not prepared to observe.
  4744. *
  4745. * <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.drop.png" alt="">
  4746. *
  4747. * If the downstream request count hits `0` then the [[Observable]] will refrain from calling `onNext` until
  4748. * the observer invokes `request(n)` again to increase the request count.
  4749. *
  4750. * $noDefaultScheduler
  4751. *
  4752. * @param onDrop the action to invoke for each item dropped. `onDrop` action should be fast and should never block.
  4753. * @return an new [[Observable]] that will drop `onNext` notifications on overflow
  4754. * @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
  4755. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  4756. */
  4757. @Experimental
  4758. def onBackpressureDrop(onDrop: T => Unit): Observable[T] = {
  4759. toScalaObservable[T](asJavaObservable.onBackpressureDrop(new Action1[T] {
  4760. override def call(t: T) = onDrop(t)
  4761. }))
  4762. }
  4763.  
  4764. /**
  4765. * $experimental Instructs an Observable that is emitting items faster than its observer can consume them to
  4766. * hold onto the latest value and emit that on request.
  4767. * <p>
  4768. * <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.latest.png" alt="">
  4769. * <p>
  4770. * Its behavior is logically equivalent to {@code toBlocking().latest()} with the exception that
  4771. * the downstream is not blocking while requesting more values.
  4772. * <p>
  4773. * Note that if the upstream Observable does support backpressure, this operator ignores that capability
  4774. * and doesn't propagate any backpressure requests from downstream.
  4775. * <p>
  4776. * Note that due to the nature of how backpressure requests are propagated through subscribeOn/observeOn,
  4777. * requesting more than 1 from downstream doesn't guarantee a continuous delivery of onNext events.
  4778. *
  4779. * @return the source Observable modified so that it emits the most recently-received item upon request
  4780. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  4781. */
  4782. @Experimental
  4783. def onBackpressureLatest: Observable[T] = {
  4784. asJavaObservable.onBackpressureLatest
  4785. }
  4786.  
  4787. /**
  4788. * Return a new [[Observable]] by applying a partial function to all elements of this [[Observable]]
  4789. * on which the function is defined.
  4790. *
  4791. * @tparam R the element type of the returned [[Observable]].
  4792. * @param pf the partial function which filters and maps the [[Observable]].
  4793. * @return a new [[Observable]] by applying a partial function to all elements of this [[Observable]]
  4794. * on which the function is defined.
  4795. */
  4796. def collect[R](pf: PartialFunction[T, R]): Observable[R] = {
  4797. filter(pf.isDefinedAt(_)).map(pf)
  4798. }
  4799.  
  4800. /**
  4801. * $beta An [[Observable]] wrapping the source one that will invokes the given action when it receives a request for more items.
  4802. *
  4803. * $noDefaultScheduler
  4804. *
  4805. * @param onRequest the action that gets called when an [[Observer]] requests items from this [[Observable]]
  4806. * @return an [[Observable]] that will call `onRequest` when appropriate
  4807. * @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
  4808. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  4809. */
  4810. @Beta
  4811. def doOnRequest(onRequest: Long => Unit): Observable[T] = {
  4812. asJavaObservable.doOnRequest(new Action1[java.lang.Long] {
  4813. override def call(request: java.lang.Long): Unit = onRequest(request)
  4814. })
  4815. }
  4816.  
  4817. /**
  4818. * $experimental Merges the specified [[Observable]] into this [[Observable]] sequence by using the `resultSelector`
  4819. * function only when the source [[Observable]] (this instance) emits an item.
  4820. *
  4821. * <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/withLatestFrom.png" alt="">
  4822. *
  4823. * $noDefaultScheduler
  4824. *
  4825. * @param other the other [[Observable]]
  4826. * @param resultSelector the function to call when this [[Observable]] emits an item and the other [[Observable]] has already
  4827. * emitted an item, to generate the item to be emitted by the resulting [[Observable]]
  4828. * @return an [[Observable]] that merges the specified [[Observable]] into this [[Observable]] by using the
  4829. * `resultSelector` function only when the source [[Observable]] sequence (this instance) emits an item
  4830. * @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
  4831. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  4832. */
  4833. @Experimental
  4834. def withLatestFrom[U, R](other: Observable[U])(resultSelector: (T, U) => R): Observable[R] = {
  4835. val func = new Func2[T, U, R] {
  4836. override def call(t1: T, t2: U): R = resultSelector(t1, t2)
  4837. }
  4838. toScalaObservable[R](asJavaObservable.withLatestFrom(other.asJavaObservable, func))
  4839. }
  4840. }
  4841.  
  4842. /**
  4843. * Provides various ways to construct new Observables.
  4844. *
  4845. * @define noDefaultScheduler
  4846. * ===Scheduler:===
  4847. * This method does not operate by default on a particular [[Scheduler]].
  4848. *
  4849. * @define supportBackpressure
  4850. * ===Backpressure:===
  4851. * Fully supports backpressure.
  4852. *
  4853. * @define experimental
  4854. * <span class="badge badge-red" style="float: right;">EXPERIMENTAL</span>
  4855. */
  4856. object Observable {
  4857. import scala.collection.JavaConverters._
  4858. import scala.concurrent.duration.Duration
  4859. import scala.concurrent.{Future, ExecutionContext}
  4860. import scala.util.{Success, Failure}
  4861. import ImplicitFunctionConversions._
  4862. import JavaConversions._
  4863. import rx.lang.scala.subjects.AsyncSubject
  4864.  
  4865. private[scala]
  4866. def jObsOfListToScObsOfSeq[T](jObs: rx.Observable[_ <: java.util.List[T]]): Observable[Seq[T]] = {
  4867. val oScala1: Observable[java.util.List[T]] = new Observable[java.util.List[T]]{ val asJavaObservable = jObs }
  4868. oScala1.map((lJava: java.util.List[T]) => lJava.asScala)
  4869. }
  4870.  
  4871. private[scala]
  4872. def jObsOfJObsToScObsOfScObs[T](jObs: rx.Observable[_ <: rx.Observable[_ <: T]]): Observable[Observable[T]] = {
  4873. val oScala1: Observable[rx.Observable[_ <: T]] = new Observable[rx.Observable[_ <: T]]{ val asJavaObservable = jObs }
  4874. oScala1.map((oJava: rx.Observable[_ <: T]) => oJava)
  4875. }
  4876.  
  4877. /**
  4878. * Creates an Observable that will execute the given function when an [[rx.lang.scala.Observer]] subscribes to it.
  4879. *
  4880. * <img width="640" height="200" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/create.png" alt="" />
  4881. *
  4882. * Write the function you pass to `create` so that it behaves as an Observable: It
  4883. * should invoke the Observer's [[rx.lang.scala.Observer.onNext onNext]], [[rx.lang.scala.Observer.onError onError]], and [[rx.lang.scala.Observer.onCompleted onCompleted]] methods
  4884. * appropriately.
  4885. *
  4886. * See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a>
  4887. * for detailed information.
  4888. *
  4889. * @tparam T
  4890. * the type of the items that this Observable emits.
  4891. * @param f
  4892. * a function that accepts an `Observer[T]`, invokes its `onNext`, `onError`, and `onCompleted` methods
  4893. * as appropriate, and returns a [[rx.lang.scala.Subscription]] to allow the Observer to
  4894. * canceling the subscription.
  4895. * @return
  4896. * an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given function.
  4897. */
  4898. @deprecated("Use [[Observable.apply]] instead", "0.26.2")
  4899. def create[T](f: Observer[T] => Subscription): Observable[T] = {
  4900. Observable(
  4901. (subscriber: Subscriber[T]) => {
  4902. val s = f(subscriber)
  4903. if (s != null && s != subscriber) {
  4904. subscriber.add(s)
  4905. }
  4906. }
  4907. )
  4908. }
  4909.  
  4910. /*
  4911. Note: It's dangerous to have two overloads where one takes an `Observer[T] => Subscription`
  4912. function and the other takes a `Subscriber[T] => Unit` function, because expressions like
  4913. `o => Subscription{}` have both of these types.
  4914. So we call the old create method "create", and the new create method "apply".
  4915. Try it out yourself here:
  4916. def foo[T]: Unit = {
  4917. val fMeant: Observer[T] => Subscription = o => Subscription{}
  4918. val fWrong: Subscriber[T] => Unit = o => Subscription{}
  4919. }
  4920. */
  4921.  
  4922. /**
  4923. * $experimental Returns an [[Observable]] that respects the back-pressure semantics. When the returned [[Observable]] is
  4924. * subscribed to it will initiate the given [[observables.SyncOnSubscribe SyncOnSubscribe]]'s life cycle for generating events.
  4925. *
  4926. * Note: the [[observables.SyncOnSubscribe SyncOnSubscribe]] provides a generic way to fulfill data by iterating
  4927. * over a (potentially stateful) function (e.g. reading data off of a channel, a parser). If your
  4928. * data comes directly from an asynchronous/potentially concurrent source then consider using [[observables.AsyncOnSubscribe AsyncOnSubscribe]].
  4929. *
  4930. * $supportBackpressure
  4931. *
  4932. * $noDefaultScheduler
  4933. *
  4934. * @tparam T the type of the items that this [[Observable]] emits
  4935. * @tparam S the state type
  4936. * @param syncOnSubscribe an implementation of [[observables.SyncOnSubscribe SyncOnSubscribe]] There are many creation methods on the object for convenience.
  4937. * @return an [[Observable]] that, when a [[Subscriber]] subscribes to it, will use the specified [[observables.SyncOnSubscribe SyncOnSubscribe]] to generate events
  4938. * @see [[observables.SyncOnSubscribe.stateful]]
  4939. * @see [[observables.SyncOnSubscribe.singleState]]
  4940. * @see [[observables.SyncOnSubscribe.stateless]]
  4941. */
  4942. @Experimental
  4943. def create[S,T](syncOnSubscribe: SyncOnSubscribe[S,T]): Observable[T] = toScalaObservable[T](rx.Observable.create(syncOnSubscribe))
  4944.  
  4945. /**
  4946. * $experimental Returns an [[Observable]] that respects the back-pressure semantics. When the returned [[Observable]] is
  4947. * subscribed to it will initiate the given [[observables.AsyncOnSubscribe AsyncOnSubscribe]]'s life cycle for generating events.
  4948. *
  4949. * $supportBackpressure
  4950. *
  4951. * $noDefaultScheduler
  4952. *
  4953. * @tparam T the type of the items that this [[Observable]] emits
  4954. * @tparam S the state type
  4955. * @param asyncOnSubscribe an implementation of [[observables.AsyncOnSubscribe AsyncOnSubscribe]]. There are many creation methods on the object for convenience.
  4956. * @return an [[Observable]] that, when a [[Subscriber]] subscribes to it, will use the specified [[observables.AsyncOnSubscribe AsyncOnSubscribe]] to generate events
  4957. * @see [[observables.AsyncOnSubscribe.stateful]]
  4958. * @see [[observables.AsyncOnSubscribe.singleState]]
  4959. * @see [[observables.AsyncOnSubscribe.stateless]]
  4960. */
  4961. @Experimental
  4962. def create[S,T](asyncOnSubscribe: AsyncOnSubscribe[S,T]): Observable[T] = toScalaObservable[T](rx.Observable.create(asyncOnSubscribe))
  4963.  
  4964. /**
  4965. * Returns an Observable that will execute the specified function when someone subscribes to it.
  4966. *
  4967. * <img width="640" height="200" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/create.png" alt="" />
  4968. *
  4969. * Write the function you pass so that it behaves as an Observable: It should invoke the
  4970. * Subscriber's `onNext`, `onError`, and `onCompleted` methods appropriately.
  4971. *
  4972. * You can `add` custom [[Subscription]]s to [[Subscriber]]. These [[Subscription]]s will be called
  4973. * <ul>
  4974. * <li>when someone calls `unsubscribe`.</li>
  4975. * <li>after `onCompleted` or `onError`.</li>
  4976. * </ul>
  4977. *
  4978. * See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a> for detailed
  4979. * information.
  4980. *
  4981. * See `<a href="https://github.com/ReactiveX/RxScala/blob/0.x/examples/src/test/scala/examples/RxScalaDemo.scala">RxScalaDemo</a>.createExampleGood`
  4982. * and `<a href="https://github.com/ReactiveX/RxScala/blob/0.x/examples/src/test/scala/examples/RxScalaDemo.scala">RxScalaDemo</a>.createExampleGood2`.
  4983. *
  4984. * @tparam T
  4985. * the type of the items that this Observable emits
  4986. * @param f
  4987. * a function that accepts a `Subscriber[T]`, and invokes its `onNext`,
  4988. * `onError`, and `onCompleted` methods as appropriate
  4989. * @return an Observable that, when someone subscribes to it, will execute the specified
  4990. * function
  4991. */
  4992. def apply[T](f: Subscriber[T] => Unit): Observable[T] = {
  4993. toScalaObservable(rx.Observable.create(f))
  4994. }
  4995.  
  4996. /**
  4997. * Returns an [[Observable]] that invokes an [[Observer.onError]] method when the [[Observer]] subscribes to it.
  4998. *
  4999. * <img width="640" height="190" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/error.png" alt="">
  5000. *
  5001. * $noDefaultScheduler
  5002. *
  5003. * @param exception the particular `Throwable` to pass to [[Observer.onError]]
  5004. * @return an [[Observable]] that invokes the [[Observer.onError]] method when the [[Observer]] subscribes to it
  5005. * @see <a href="http://reactivex.io/documentation/operators/empty-never-throw.html">ReactiveX operators documentation: Throw</a>
  5006. */
  5007. def error(exception: Throwable): Observable[Nothing] = {
  5008. toScalaObservable[Nothing](rx.Observable.error(exception))
  5009. }
  5010.  
  5011. /**
  5012. * Returns an Observable that emits no data to the [[rx.lang.scala.Observer]] and
  5013. * immediately invokes its [[rx.lang.scala.Observer#onCompleted onCompleted]] method
  5014. * with the specified scheduler.
  5015. * <p>
  5016. * <img width="640" height="190" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/empty.s.png" alt="" />
  5017. *
  5018. * @return an Observable that returns no data to the [[rx.lang.scala.Observer]] and
  5019. * immediately invokes the [[rx.lang.scala.Observer]]r's
  5020. * [[rx.lang.scala.Observer#onCompleted onCompleted]] method with the
  5021. * specified scheduler
  5022. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Creating-Observables#empty-error-and-never">RxJava Wiki: empty()</a>
  5023. * @see <a href="http://msdn.microsoft.com/en-us/library/hh229066.aspx">MSDN: Observable.Empty Method (IScheduler)</a>
  5024. */
  5025. def empty: Observable[Nothing] = {
  5026. toScalaObservable(rx.Observable.empty[Nothing]())
  5027. }
  5028.  
  5029. /**
  5030. * Converts a sequence of values into an Observable.
  5031. *
  5032. * <img width="640" height="315" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/from.png" alt="" />
  5033. *
  5034. * Implementation note: the entire array will be immediately emitted each time an [[rx.lang.scala.Observer]] subscribes.
  5035. * Since this occurs before the [[rx.lang.scala.Subscription]] is returned,
  5036. * it in not possible to unsubscribe from the sequence before it completes.
  5037. *
  5038. * @param items
  5039. * the source Array
  5040. * @tparam T
  5041. * the type of items in the Array, and the type of items to be emitted by the
  5042. * resulting Observable
  5043. * @return an Observable that emits each item in the source Array
  5044. */
  5045. def just[T](items: T*): Observable[T] = {
  5046. toScalaObservable[T](rx.Observable.from(items.toIterable.asJava))
  5047. }
  5048.  
  5049. /** Returns an Observable emitting the value produced by the Future as its single item.
  5050. * If the future fails, the Observable will fail as well.
  5051. *
  5052. * @param f Future whose value ends up in the resulting Observable
  5053. * @return an Observable completed after producing the value of the future, or with an exception
  5054. */
  5055. def from[T](f: Future[T])(implicit execContext: ExecutionContext): Observable[T] = {
  5056. val s = AsyncSubject[T]()
  5057. f.onComplete {
  5058. case Failure(e) =>
  5059. s.onError(e)
  5060. case Success(c) =>
  5061. s.onNext(c)
  5062. s.onCompleted()
  5063. }
  5064. s
  5065. }
  5066.  
  5067. /**
  5068. * Converts an `Iterable` into an Observable.
  5069. *
  5070. * <img width="640" height="315" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/from.png" alt="" />
  5071. *
  5072. * Note: the entire iterable sequence is immediately emitted each time an
  5073. * Observer subscribes. Since this occurs before the
  5074. * `Subscription` is returned, it is not possible to unsubscribe from
  5075. * the sequence before it completes.
  5076. *
  5077. * @param iterable the source `Iterable` sequence
  5078. * @tparam T the type of items in the `Iterable` sequence and the
  5079. * type of items to be emitted by the resulting Observable
  5080. * @return an Observable that emits each item in the source `Iterable`
  5081. * sequence
  5082. */
  5083. def from[T](iterable: Iterable[T]): Observable[T] = {
  5084. toScalaObservable(rx.Observable.from(iterable.asJava))
  5085. }
  5086.  
  5087. /**
  5088. * Converts a `Try` into an `Observable`.
  5089. *
  5090. * Implementation note: the value will be immediately emitted each time an [[rx.lang.scala.Observer]] subscribes.
  5091. * Since this occurs before the [[rx.lang.scala.Subscription]] is returned,
  5092. * it in not possible to unsubscribe from the sequence before it completes.
  5093. *
  5094. * @param t the source Try
  5095. * @tparam T the type of value in the Try, and the type of items to be emitted by the resulting Observable
  5096. * @return an Observable that either emits the value or the error in the Try.
  5097. */
  5098. def from[T](t: Try[T]): Observable[T] = {
  5099. t match {
  5100. case Success(s) => Observable.just(s)
  5101. case Failure(e) => Observable.error(e)
  5102. }
  5103. }
  5104.  
  5105. /**
  5106. * Returns an Observable that calls an Observable factory to create its Observable for each
  5107. * new Observer that subscribes. That is, for each subscriber, the actual Observable is determined
  5108. * by the factory function.
  5109. *
  5110. * <img width="640" height="340" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/defer.png" alt="" />
  5111. *
  5112. * The defer operator allows you to defer or delay emitting items from an Observable until such
  5113. * time as an Observer subscribes to the Observable. This allows an [[rx.lang.scala.Observer]] to easily
  5114. * obtain updates or a refreshed version of the sequence.
  5115. *
  5116. * @param observable
  5117. * the Observable factory function to invoke for each [[rx.lang.scala.Observer]] that
  5118. * subscribes to the resulting Observable
  5119. * @tparam T
  5120. * the type of the items emitted by the Observable
  5121. * @return an Observable whose [[rx.lang.scala.Observer]]s trigger an invocation of the given Observable
  5122. * factory function
  5123. */
  5124. def defer[T](observable: => Observable[T]): Observable[T] = {
  5125. toScalaObservable[T](rx.Observable.defer[T](() => observable.asJavaObservable.asInstanceOf[rx.Observable[T]]))
  5126. }
  5127.  
  5128. /**
  5129. * Returns an Observable that never sends any items or notifications to an [[rx.lang.scala.Observer]].
  5130. *
  5131. * <img width="640" height="185" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/never.png" alt="" />
  5132. *
  5133. * This Observable is useful primarily for testing purposes.
  5134. *
  5135. * @return an Observable that never sends any items or notifications to an [[rx.lang.scala.Observer]]
  5136. */
  5137. def never: Observable[Nothing] = {
  5138. toScalaObservable[Nothing](rx.Observable.never())
  5139. }
  5140.  
  5141. /**
  5142. * Given 3 observables, returns an observable that emits Tuples of 3 elements each.
  5143. * The first emitted Tuple will contain the first element of each source observable,
  5144. * the second Tuple the second element of each source observable, and so on.
  5145. *
  5146. * @return an Observable that emits the zipped Observables
  5147. */
  5148. def zip[A, B, C](obA: Observable[A], obB: Observable[B], obC: Observable[C]): Observable[(A, B, C)] = {
  5149. toScalaObservable[(A, B, C)](rx.Observable.zip[A, B, C, (A, B, C)](obA.asJavaObservable, obB.asJavaObservable, obC.asJavaObservable, (a: A, b: B, c: C) => (a, b, c)))
  5150. }
  5151.  
  5152. /**
  5153. * Given 4 observables, returns an observable that emits Tuples of 4 elements each.
  5154. * The first emitted Tuple will contain the first element of each source observable,
  5155. * the second Tuple the second element of each source observable, and so on.
  5156. *
  5157. * @return an Observable that emits the zipped Observables
  5158. */
  5159. def zip[A, B, C, D](obA: Observable[A], obB: Observable[B], obC: Observable[C], obD: Observable[D]): Observable[(A, B, C, D)] = {
  5160. toScalaObservable[(A, B, C, D)](rx.Observable.zip[A, B, C, D, (A, B, C, D)](obA.asJavaObservable, obB.asJavaObservable, obC.asJavaObservable, obD.asJavaObservable, (a: A, b: B, c: C, d: D) => (a, b, c, d)))
  5161. }
  5162.  
  5163. /**
  5164. * Given an Observable emitting `N` source observables, returns an observable that
  5165. * emits Seqs of `N` elements each.
  5166. * The first emitted Seq will contain the first element of each source observable,
  5167. * the second Seq the second element of each source observable, and so on.
  5168. *
  5169. * Note that the returned Observable will only start emitting items once the given
  5170. * `Observable[Observable[T]]` has completed, because otherwise it cannot know `N`.
  5171. *
  5172. * @param observables
  5173. * An Observable emitting N source Observables
  5174. * @return an Observable that emits the zipped Seqs
  5175. */
  5176. def zip[T](observables: Observable[Observable[T]]): Observable[Seq[T]] = {
  5177. val f: FuncN[Seq[T]] = (args: Seq[java.lang.Object]) => {
  5178. val asSeq: Seq[Object] = args.toSeq
  5179. asSeq.asInstanceOf[Seq[T]]
  5180. }
  5181. val list = observables.map(_.asJavaObservable).asJavaObservable
  5182. val o = rx.Observable.zip(list, f)
  5183. toScalaObservable[Seq[T]](o)
  5184. }
  5185.  
  5186. /**
  5187. * Emits `0`, `1`, `2`, `...` with a delay of `duration` between consecutive numbers.
  5188. *
  5189. * <img width="640" height="195" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/interval.png" alt="" />
  5190. *
  5191. * @param duration
  5192. * duration between two consecutive numbers
  5193. * @return An Observable that emits a number each time interval.
  5194. */
  5195. def interval(duration: Duration): Observable[Long] = {
  5196. toScalaObservable[java.lang.Long](rx.Observable.interval(duration.length, duration.unit)).map(_.longValue())
  5197. }
  5198.  
  5199. /**
  5200. * Emits `0`, `1`, `2`, `...` with a delay of `duration` between consecutive numbers.
  5201. *
  5202. * <img width="640" height="195" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/interval.png" alt="" />
  5203. *
  5204. * @param period
  5205. * duration between two consecutive numbers
  5206. * @param scheduler
  5207. * the scheduler to use
  5208. * @return An Observable that emits a number each time interval.
  5209. */
  5210. def interval(period: Duration, scheduler: Scheduler): Observable[Long] = {
  5211. toScalaObservable[java.lang.Long](rx.Observable.interval(period.length, period.unit, scheduler)).map(_.longValue())
  5212. }
  5213.  
  5214. /**
  5215. * Returns an [[Observable]] that emits a `0L` after the `initialDelay` and ever increasing numbers
  5216. * after each `period` of time thereafter.
  5217. *
  5218. * <img width="640" height="200" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timer.p.png" alt="" />
  5219. *
  5220. * ===Backpressure Support:===
  5221. * This operator does not support backpressure as it uses time. If the downstream needs a slower rate
  5222. * it should slow the timer or use something like [[Observable.onBackpressureDrop:* onBackpressureDrop]].
  5223. *
  5224. * ===Scheduler:===
  5225. * `interval` operates by default on the `computation` [[Scheduler]].
  5226. *
  5227. * @param initialDelay the initial delay time to wait before emitting the first value of 0L
  5228. * @param period the period of time between emissions of the subsequent numbers
  5229. * @return an [[Observable]] that emits a `0L` after the `initialDelay` and ever increasing numbers after
  5230. * each `period` of time thereafter
  5231. * @see <a href="http://reactivex.io/documentation/operators/interval.html">ReactiveX operators documentation: Interval</a>
  5232. */
  5233. def interval(initialDelay: Duration, period: Duration): Observable[Long] = {
  5234. toScalaObservable[java.lang.Long](rx.Observable.interval(initialDelay.toNanos, period.toNanos, duration.NANOSECONDS)).map(_.longValue())
  5235. }
  5236.  
  5237. /**
  5238. * Returns an [[Observable]] that emits a `0L` after the `initialDelay` and ever increasing numbers
  5239. * after each `period` of time thereafter, on a specified [[Scheduler]].
  5240. *
  5241. * <img width="640" height="200" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timer.ps.png" alt="" />
  5242. *
  5243. * ===Backpressure Support:===
  5244. * This operator does not support backpressure as it uses time. If the downstream needs a slower rate
  5245. * it should slow the timer or use something like [[Observable.onBackpressureDrop:* onBackpressureDrop]].
  5246. *
  5247. * ===Scheduler:===
  5248. * you specify which [[Scheduler]] this operator will use.
  5249. *
  5250. * @param initialDelay the initial delay time to wait before emitting the first value of `0L`
  5251. * @param period the period of time between emissions of the subsequent numbers
  5252. * @param scheduler the [[Scheduler]] on which the waiting happens and items are emitted
  5253. * @return an [[Observable]] that emits a `0L` after the `initialDelay` and ever increasing numbers after
  5254. * each `period` of time thereafter, while running on the given [[Scheduler]]
  5255. * @see <a href="http://reactivex.io/documentation/operators/interval.html">ReactiveX operators documentation: Interval</a>
  5256. */
  5257. def interval(initialDelay: Duration, period: Duration, scheduler: Scheduler): Observable[Long] = {
  5258. toScalaObservable[java.lang.Long](rx.Observable.interval(initialDelay.toNanos, period.toNanos, duration.NANOSECONDS, scheduler)).map(_.longValue())
  5259. }
  5260.  
  5261. /**
  5262. * Returns an Observable that emits `0L` after a specified delay, and then completes.
  5263. *
  5264. * <img width="640" height="200" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timer.png" alt="" />
  5265. *
  5266. * @param delay the initial delay before emitting a single `0L`
  5267. * @return Observable that emits `0L` after a specified delay, and then completes
  5268. */
  5269. def timer(delay: Duration): Observable[Long] = {
  5270. toScalaObservable[java.lang.Long](rx.Observable.timer(delay.length, delay.unit)).map(_.longValue())
  5271. }
  5272.  
  5273. /**
  5274. * Returns an Observable that emits `0L` after a specified delay, on a specified Scheduler, and then
  5275. * completes.
  5276. *
  5277. * <img width="640" height="200" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/timer.s.png" alt="" />
  5278. *
  5279. * @param delay the initial delay before emitting a single `0L`
  5280. * @param scheduler the Scheduler to use for scheduling the item
  5281. * @return Observable that emits `0L` after a specified delay, on a specified Scheduler, and then completes
  5282. */
  5283. def timer(delay: Duration, scheduler: Scheduler): Observable[Long] = {
  5284. toScalaObservable[java.lang.Long](rx.Observable.timer(delay.length, delay.unit, scheduler)).map(_.longValue())
  5285. }
  5286.  
  5287. /**
  5288. * Constructs an Observable that creates a dependent resource object.
  5289. *
  5290. * <img width="640" height="400" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/using.png" alt="" />
  5291. *
  5292. * ===Scheduler:===
  5293. * `using` does not operate by default on a particular `Scheduler`.
  5294. *
  5295. * @param resourceFactory the factory function to create a resource object that depends on the Observable.
  5296. * Note: this is a by-name parameter.
  5297. * @param observableFactory the factory function to create an Observable
  5298. * @param dispose the function that will dispose of the resource
  5299. * @param disposeEagerly if `true` then disposal will happen either on unsubscription or just before emission of
  5300. * a terminal event (`onComplete` or `onError`).
  5301. * @return the Observable whose lifetime controls the lifetime of the dependent resource object
  5302. * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#using">RxJava wiki: using</a>
  5303. * @see <a href="http://msdn.microsoft.com/en-us/library/hh229585.aspx">MSDN: Observable.Using</a>
  5304. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
  5305. */
  5306. def using[T, Resource](resourceFactory: => Resource)(observableFactory: Resource => Observable[T], dispose: Resource => Unit, disposeEagerly: Boolean = false): Observable[T] = {
  5307. val jResourceFactory = new rx.functions.Func0[Resource] {
  5308. override def call: Resource = resourceFactory
  5309. }
  5310. val jObservableFactory = new rx.functions.Func1[Resource, rx.Observable[_ <: T]] {
  5311. override def call(r: Resource) = observableFactory(r).asJavaObservable
  5312. }
  5313. toScalaObservable[T](rx.Observable.using[T, Resource](jResourceFactory, jObservableFactory, dispose, disposeEagerly))
  5314. }
  5315.  
  5316. /**
  5317. * Mirror the one Observable in an Iterable of several Observables that first emits an item.
  5318. *
  5319. * <img width="640" height="385" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/amb.png" alt="" />
  5320. *
  5321. * @param sources an Iterable of Observable sources competing to react first
  5322. * @return an Observable that emits the same sequence of items as whichever of the source Observables
  5323. * first emitted an item
  5324. */
  5325. def amb[T](sources: Observable[T]*): Observable[T] = {
  5326. toScalaObservable[T](rx.Observable.amb[T](sources.map(_.asJavaObservable).asJava))
  5327. }
  5328.  
  5329. /**
  5330. * Combines a list of source Observables by emitting an item that aggregates the latest values of each of
  5331. * the source Observables each time an item is received from any of the source Observables, where this
  5332. * aggregation is defined by a specified function.
  5333. *
  5334. * @tparam T the common base type of source values
  5335. * @tparam R the result type
  5336. * @param sources the list of source Observables
  5337. * @param combineFunction the aggregation function used to combine the items emitted by the source Observables
  5338. * @return an Observable that emits items that are the result of combining the items emitted by the source
  5339. * Observables by means of the given aggregation function
  5340. */
  5341. @deprecated("Use [[[Observable.combineLatest[T,R](sources:Iterable[rx\\.lang\\.scala\\.Observable[T]])(combineFunction:Seq[T]=>R):*]]] instead", "0.26.2")
  5342. def combineLatest[T, R](sources: Seq[Observable[T]])(combineFunction: Seq[T] => R): Observable[R] = {
  5343. val jSources = new java.util.ArrayList[rx.Observable[_ <: T]](sources.map(_.asJavaObservable).asJava)
  5344. val jCombineFunction = new rx.functions.FuncN[R] {
  5345. override def call(args: java.lang.Object*): R = combineFunction(args.map(_.asInstanceOf[T]))
  5346. }
  5347. toScalaObservable[R](rx.Observable.combineLatest[T, R](jSources, jCombineFunction))
  5348. }
  5349.  
  5350. /**
  5351. * Combines an [[scala.collection.Iterable Iterable]] of source [[Observable]]s by emitting an item that aggregates the latest
  5352. * values of each of the source [[Observable]]s each time an item is received from any of the source [[Observable]]s, where this
  5353. * aggregation is defined by a specified function.
  5354. *
  5355. * $supportBackpressure
  5356. *
  5357. * $noDefaultScheduler
  5358. *
  5359. * @tparam T the common base type of source values
  5360. * @tparam R the result type
  5361. * @param sources the [[scala.collection.Iterable Iterable]] of source [[Observable]]s
  5362. * @param combineFunction the aggregation function used to combine the items emitted by the source [[Observable]]s
  5363. * @return an [[Observable]] that emits items that are the result of combining the items emitted by the source
  5364. * [[Observable]]s by means of the given aggregation function
  5365. * @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
  5366. */
  5367. def combineLatest[T, R](sources: Iterable[Observable[T]])(combineFunction: Seq[T] => R): Observable[R] = {
  5368. val jSources = sources.map(_.asJavaObservable).asJava
  5369. val jCombineFunction = new rx.functions.FuncN[R] {
  5370. override def call(args: java.lang.Object*): R = combineFunction(args.map(_.asInstanceOf[T]))
  5371. }
  5372. toScalaObservable[R](rx.Observable.combineLatest[T, R](jSources, jCombineFunction))
  5373. }
  5374.  
  5375. /**
  5376. * Combines an [[scala.collection.Iterable Iterable]] of source [[Observable]]s by emitting an item that aggregates the latest
  5377. * values of each of the source [[Observable]]s each time an item is received from any of the source [[Observable]]s, where this
  5378. * aggregation is defined by a specified function and delays any error from the sources until all source [[Observable]]s terminate.
  5379. *
  5380. * $supportBackpressure
  5381. *
  5382. * $noDefaultScheduler
  5383. *
  5384. * @tparam T the common base type of source values
  5385. * @tparam R the result type
  5386. * @param sources the [[scala.collection.Iterable Iterable]] of source [[Observable]]s
  5387. * @param combineFunction the aggregation function used to combine the items emitted by the source [Observable]]s
  5388. * @return an [[Observable]] that emits items that are the result of combining the items emitted by the source
  5389. * [[Observable]]s by means of the given aggregation function
  5390. * @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
  5391. */
  5392. def combineLatestDelayError[T, R](sources: Iterable[Observable[T]])(combineFunction: Seq[T] => R): Observable[R] = {
  5393. val jSources = sources.map(_.asJavaObservable).asJava
  5394. val jCombineFunction = new rx.functions.FuncN[R] {
  5395. override def call(args: java.lang.Object*): R = combineFunction(args.map(_.asInstanceOf[T]))
  5396. }
  5397. toScalaObservable[R](rx.Observable.combineLatestDelayError[T, R](jSources, jCombineFunction))
  5398. }
  5399. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement