Advertisement
Guest User

Untitled

a guest
Mar 23rd, 2017
108
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 20.50 KB | None | 0 0
  1. public class FlowableExample {
  2.  
  3. public static void main1(String[] args) {
  4. //reduce 操作符 结果为50+1+2
  5. Flowable<Integer> flowable = Flowable.just(1, 2);
  6. flowable.reduce(50, (t1, t2) -> {
  7. System.out.println("f1:" + t1);
  8. System.out.println("f2:" + t2);
  9. return t1 + t2;
  10. }).subscribeWith(new SingleObserver<Integer>() {
  11. @Override
  12. public void onSubscribe(Disposable d) {
  13. }
  14.  
  15. @Override
  16. public void onSuccess(Integer value) {
  17. System.out.println(value);
  18. }
  19.  
  20. @Override
  21. public void onError(Throwable e) {
  22. }
  23. });
  24. }
  25.  
  26. public static void main2(String[] args) {
  27. /*AsyncSubject只会发射最后一次onNext的值,并且只有在执行onComplete方法之后
  28. * */
  29. AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
  30. asyncSubject.subscribe(getFirstObserver());
  31. asyncSubject.onNext(1);
  32. asyncSubject.onNext(2);
  33. asyncSubject.subscribe(getSecondObserver());
  34. asyncSubject.onNext(3);
  35. asyncSubject.onNext(4);
  36. asyncSubject.onComplete();
  37. }
  38.  
  39. public static void main3(String[] args) {
  40. /*
  41. BehaviorSubject发射订阅前最近的和之后的
  42. */
  43. BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();
  44. behaviorSubject.subscribe(getFirstObserver());//获得1 2 3 4 onComplete
  45. behaviorSubject.onNext(1);
  46. behaviorSubject.onNext(2);
  47. behaviorSubject.onNext(3);
  48. behaviorSubject.subscribe(getSecondObserver()); //获得 3 4 onComplete
  49. behaviorSubject.onNext(4);
  50. behaviorSubject.onComplete();
  51. }
  52.  
  53. public static void main333(String[] args) {
  54. PublishSubject<Integer> publishSubject = PublishSubject.create();
  55. publishSubject.subscribe(getFirstObserver());//onSubscribe 获得 1234onComplete
  56. publishSubject.onNext(1);
  57. publishSubject.onNext(2);
  58. publishSubject.onNext(3);
  59. publishSubject.subscribe(getSecondObserver()); //onSubscribe 获得 4 onComplete
  60. publishSubject.onNext(4);
  61. publishSubject.onComplete();
  62. }
  63.  
  64. public static void main4(String[] args) {
  65. /*
  66. buffer将要发射的值包含在一个List中
  67. */
  68. Observable<List<String>> listObservable = getStringObservable().buffer(3, 1);
  69. //3表示最大数量为3,1表示下次重新缓存后要跳过的个数
  70. listObservable.subscribe(getBufferObserver());
  71. }
  72.  
  73. public static Observer<Integer> getFirstObserver() {
  74. return new Observer<Integer>() {
  75. @Override
  76. public void onSubscribe(Disposable d) {
  77. System.out.println("first onSubscribe");
  78. }
  79.  
  80. @Override
  81. public void onNext(Integer value) {
  82. System.out.println("first onNext" + value);
  83. }
  84.  
  85. @Override
  86. public void onError(Throwable e) {
  87. e.printStackTrace();
  88. }
  89.  
  90. @Override
  91. public void onComplete() {
  92. System.out.println("first onComplete");
  93. }
  94. };
  95. }
  96.  
  97. public static Observer<Integer> getSecondObserver() {
  98. return new Observer<Integer>() {
  99. @Override
  100. public void onSubscribe(Disposable d) {
  101. System.out.println("Second onSubscribe");
  102. }
  103.  
  104. @Override
  105. public void onNext(Integer value) {
  106. System.out.println("Second onNext" + value);
  107. }
  108.  
  109. @Override
  110. public void onError(Throwable e) {
  111. e.printStackTrace();
  112. }
  113.  
  114. @Override
  115. public void onComplete() {
  116. System.out.println("Second onComplete");
  117. }
  118. };
  119. }
  120.  
  121. public static Observable<String> getStringObservable() {
  122. System.out.println("getStringObservable");
  123. return Observable.just("one", "two", "three", "four", "five", "one", "two", "three", "four2", "five");
  124. }
  125.  
  126. public static Observer<? super List<String>> getBufferObserver() {
  127. return new Observer<List<String>>() {
  128. @Override
  129. public void onSubscribe(Disposable d) {
  130.  
  131. }
  132.  
  133. @Override
  134. public void onNext(List<String> value) {
  135. System.out.println(value);
  136. }
  137.  
  138. @Override
  139. public void onError(Throwable e) {
  140.  
  141. }
  142.  
  143. @Override
  144. public void onComplete() {
  145.  
  146. }
  147. };
  148. }
  149.  
  150. public static Observer<String> getStringObserver() {
  151. return new Observer<String>() {
  152. @Override
  153. public void onSubscribe(Disposable d) {
  154.  
  155. }
  156.  
  157. @Override
  158. public void onNext(String value) {
  159. System.out.println(value);
  160. }
  161.  
  162. @Override
  163. public void onError(Throwable e) {
  164.  
  165. }
  166.  
  167. @Override
  168. public void onComplete() {
  169.  
  170. }
  171. };
  172. }
  173.  
  174. public static void main5(String[] args) {
  175. //执行耗时任务但不返回值 只会返回onComplete或者onError事件
  176. Completable completable = Completable.timer(2, TimeUnit.SECONDS);
  177. completable.subscribeOn(NewThreadScheduler.instance()).subscribeWith(new CompletableObserver() {
  178. @Override
  179. public void onSubscribe(Disposable d) {
  180. Logger.getGlobal().log(Level.INFO, " onSubscribe Disposable: " + d.isDisposed());
  181. }
  182.  
  183. @Override
  184. public void onComplete() {
  185. Logger.getGlobal().log(Level.INFO, "onComplete");
  186. }
  187.  
  188. @Override
  189. public void onError(Throwable e) {
  190. e.printStackTrace();
  191. }
  192. });
  193. try {
  194. Thread.sleep(Integer.MAX_VALUE);
  195. } catch (InterruptedException e) {
  196. e.printStackTrace();
  197. }
  198. }
  199.  
  200.  
  201. public static void main7(String[] args) {
  202. /*
  203. Using debounce() -> only emit an item from an Observable if a particular time-span has
  204. passed without it emitting another item, so it will emit 2, 4, 5 as we have simulated it.
  205. debounce发射一个值,这个值在某个时间范围内没有新的值时才起作用
  206. 1 过了500,没有新的值 发射
  207. */
  208. Observable observable = getDebounceObservable();
  209. observable.debounce(500, TimeUnit.MILLISECONDS).subscribe(getFirstObserver());
  210. }
  211.  
  212. private static Observable getDebounceObservable() {
  213. return Observable.create(new ObservableOnSubscribe<Integer>() {
  214. @Override
  215. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
  216. emitter.onNext(1); // skip
  217. Thread.sleep(500);
  218. emitter.onNext(2); // deliver
  219. Thread.sleep(505); //>500 发射
  220.  
  221. emitter.onNext(3); // skip
  222. Thread.sleep(100); // <500不发射
  223.  
  224. emitter.onNext(4); // deliver
  225. Thread.sleep(605); // >500发射
  226. emitter.onNext(5); // deliver
  227. Thread.sleep(510); // >500发射
  228. emitter.onComplete();
  229. }
  230. });
  231. }
  232.  
  233.  
  234. public static void main8(String[] args) {
  235. //defer延迟执行Observable的代码 到subscribe时执行
  236. Car car = new Car();
  237. Observable<String> defer = Observable.defer(new Callable<ObservableSource<String>>() {
  238. @Override
  239. public ObservableSource<String> call() throws Exception {
  240. return Observable.just(car.getBrand());
  241. }
  242. });
  243. car.setBrand("BMW");
  244. defer.subscribe(getStringObserver());
  245. }
  246.  
  247. public static void main9(String[] args) throws InterruptedException {
  248. //delay subscribe延迟订阅
  249. System.out.println("开始运行");
  250. getStringObservable().delay(3, TimeUnit.SECONDS).subscribeOn(NewThreadScheduler.instance()).subscribe(getStringObserver());
  251. Thread.sleep(5000);
  252. }
  253.  
  254. public static void main10(String[] args) throws InterruptedException {
  255. //CompositeDisposable 用于在某个时候取消注册的事件 可以用在onDestory中取消
  256. CompositeDisposable compositeDisposable = new CompositeDisposable();
  257. DisposableObserver<String> disposableObserver = getStringIntObservable().observeOn(NewThreadScheduler.instance()).subscribeOn(NewThreadScheduler.instance()).subscribeWith(new DisposableObserver<String>() {
  258. @Override
  259. public void onComplete() {
  260. System.out.println(" onComplete");
  261. }
  262.  
  263. @Override
  264. public void onError(Throwable e) {
  265. System.out.println(" onError");
  266. }
  267.  
  268. @Override
  269. public void onNext(String value) {
  270. System.out.println(" onNext" + value);
  271. }
  272. });
  273. compositeDisposable.add(disposableObserver);
  274. Thread.sleep(10000);
  275. //取消事件
  276. compositeDisposable.clear();
  277. System.out.println("继续睡眠");
  278. Thread.sleep(Integer.MAX_VALUE);
  279. }
  280.  
  281. private static Observable<String> getStringIntObservable() {
  282. return Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {
  283. @Override
  284. public String apply(Long aLong) throws Exception {
  285. System.out.println("apply :" + aLong);
  286. return String.valueOf(aLong);
  287. }
  288. });
  289. }
  290.  
  291. public static void main11(String[] args) {
  292. // distinct 会过滤掉重复的值
  293. getStringObservable().distinct().subscribe(getStringObserver());
  294. }
  295.  
  296. public static void main12(String[] args) {
  297. //filter设置条件过滤
  298. Observable.just(1, 2, 3, 4, 5, 6).filter(new Predicate<Integer>() {
  299. @Override
  300. public boolean test(Integer integer) throws Exception {
  301. return integer % 2 == 1;
  302. }
  303. }).subscribe(getFirstObserver());
  304. }
  305.  
  306. public static void main13(String[] args) {
  307. //flowable reduce两个方法返回的值类型并不相同
  308. Flowable<Integer> flowable = Flowable.just(1, 2, 3);
  309. Maybe<Integer> reduce = flowable.reduce(new BiFunction<Integer, Integer, Integer>() {
  310. @Override
  311. public Integer apply(Integer integer, Integer integer2) throws Exception {
  312. return integer - integer2;
  313. }
  314. });
  315. reduce.subscribe(new MaybeObserver<Integer>() {
  316. @Override
  317. public void onSubscribe(Disposable d) {
  318. }
  319.  
  320. @Override
  321. public void onSuccess(Integer value) {
  322. }
  323.  
  324. @Override
  325. public void onError(Throwable e) {
  326. }
  327.  
  328. @Override
  329. public void onComplete() {
  330.  
  331. }
  332. });
  333.  
  334. Flowable<Integer> observable = Flowable.just(1, 2, 3, 4);
  335. observable.reduce(50, new BiFunction<Integer, Integer, Integer>() {
  336. @Override
  337. public Integer apply(Integer t1, Integer t2) {
  338. return t1 + t2;
  339. }
  340. }).subscribe(new SingleObserver<Integer>() {
  341. @Override
  342. public void onSubscribe(Disposable d) {
  343.  
  344. }
  345.  
  346. @Override
  347. public void onSuccess(Integer value) {
  348.  
  349. }
  350.  
  351. @Override
  352. public void onError(Throwable e) {
  353.  
  354. }
  355. });
  356.  
  357. }
  358.  
  359.  
  360. public static void main14(String[] args) throws InterruptedException {
  361. CompositeDisposable compositeDisposable = new CompositeDisposable();
  362. DisposableObserver<Long> disposableObserver = Observable.interval(1, TimeUnit.SECONDS).subscribeOn(NewThreadScheduler.instance()).subscribeWith(new DisposableObserver<Long>() {
  363. @Override
  364. public void onNext(Long value) {
  365. System.out.println(value);
  366. }
  367.  
  368. @Override
  369. public void onError(Throwable e) {
  370.  
  371. }
  372.  
  373. @Override
  374. public void onComplete() {
  375.  
  376. }
  377. });
  378. compositeDisposable.add(disposableObserver);
  379. Thread.sleep(10000);
  380. compositeDisposable.clear();
  381. System.out.println("已清除");
  382. Thread.sleep(10000);
  383. }
  384.  
  385. public static void main15(String[] args) {
  386. // last first 取最后或者最前的一个 default为Observable没有发射值时的默认值
  387. Observable.just("A", "B", "C", "D", "E").first("default").subscribe(new SingleObserver<String>() {
  388. @Override
  389. public void onSubscribe(Disposable d) {
  390.  
  391. }
  392.  
  393. @Override
  394. public void onSuccess(String value) {
  395. System.out.println(value);
  396. }
  397.  
  398. @Override
  399. public void onError(Throwable e) {
  400.  
  401. }
  402. });
  403. }
  404.  
  405. public static void main16(String[] args) {
  406. Observable.just(1, 2, 3, 4).map(new Function<Integer, String>() {
  407.  
  408. @Override
  409. public String apply(Integer integer) throws Exception {
  410. return String.valueOf(integer);
  411. }
  412. }).subscribe(getStringObserver());
  413. }
  414.  
  415. public static void main6(String[] args) {
  416. //concat关联两个observable,先执行完第一个Observable的值,再执行第二个的
  417. String[] arr1 = {"a1", "a2", "a3", "a4"};
  418. String[] arr2 = {"b1", "b2", "b3"};
  419. Observable.concat(Observable.fromArray(arr1), Observable.fromArray(arr2))
  420. .subscribe(getStringObserver());
  421. }
  422.  
  423. public static void main17(String[] args) {
  424. //合并两个Observable 但是不保证执行顺序
  425. String[] arr1 = {"a1", "a2", "a3", "a4"};
  426. String[] arr2 = {"b1", "b2", "b3"};
  427. Observable.merge(Observable.fromArray(arr1), Observable.fromArray(arr2)).subscribe(getStringObserver());
  428. }
  429.  
  430. public static void main18(String[] args) {
  431. //replay操作符用于保证多个Observer获取的相同的结果,及时订阅前事件已发送
  432. PublishSubject<Integer> publishSubject = PublishSubject.create();
  433. ConnectableObservable<Integer> connectableObservable = publishSubject.replay(3);
  434. connectableObservable.connect();
  435. Observer<Integer> firstObserver = getFirstObserver();
  436. //获得事件 1234
  437. connectableObservable.subscribe(firstObserver);
  438. publishSubject.onNext(1);
  439. publishSubject.onNext(2);
  440. publishSubject.onNext(3);
  441. publishSubject.onNext(4);
  442. publishSubject.onComplete();
  443. Observer<Integer> secondObserver = getSecondObserver();
  444. //获得事件 234 因为replay的buffer size = 3
  445. connectableObservable.subscribe(secondObserver);
  446. }
  447.  
  448. public static void main19(String[] args) {
  449. //ReplaySubject会让所有的observer获取到相同的结果,不管是什么时候订阅的
  450. ReplaySubject<Integer> replaySubject = ReplaySubject.create();
  451. replaySubject.onNext(1);
  452. replaySubject.onNext(2);
  453. replaySubject.subscribe(getFirstObserver());
  454. replaySubject.onNext(3);
  455. replaySubject.onNext(4);
  456. replaySubject.onComplete();
  457. replaySubject.subscribe(getSecondObserver());
  458. }
  459.  
  460. public static void main20(String[] args) {
  461. // scan 把之前的结果带过来
  462. Observable.just(1, 2, 3, 4).scan(new BiFunction<Integer, Integer, Integer>() {
  463. @Override
  464. public Integer apply(Integer integer, Integer integer2) throws Exception {
  465. return integer + integer2;
  466. }
  467. }).subscribe(getFirstObserver());
  468. //结果为 1 3 6 10
  469. }
  470.  
  471. public static void main21(String[] args) {
  472. Single.just("12").subscribe(new SingleObserver<String>() {
  473. @Override
  474. public void onSubscribe(Disposable d) {
  475.  
  476. }
  477.  
  478. @Override
  479. public void onSuccess(String value) {
  480. System.out.println(value);
  481. }
  482.  
  483. @Override
  484. public void onError(Throwable e) {
  485.  
  486. }
  487. });
  488. }
  489.  
  490. public static void main22(String[] args) {
  491. //skip会跳过某个数量的值
  492. Observable.just(1, 2, 3, 4, 5).skip(2).subscribe(getFirstObserver());
  493. }
  494.  
  495. public static void main23(String[] args) {
  496. //take 只会发射某数量个值
  497. Observable.just(1, 2, 3, 4, 5).take(3).subscribe(getFirstObserver());
  498. }
  499.  
  500. public static void main35(String[] args) throws InterruptedException {
  501. // throttleLast throttleFirst
  502. Observable.create(new ObservableOnSubscribe<Long>() {
  503. @Override
  504. public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
  505. Thread.sleep(0);
  506. emitter.onNext(1l); // skip
  507. emitter.onNext(2l); // deliver
  508. Thread.sleep(505);
  509. emitter.onNext(3l); // skip
  510. Thread.sleep(99);
  511. emitter.onNext(4l); // skip
  512. Thread.sleep(100);
  513. emitter.onNext(5l); // skip
  514. emitter.onNext(6l); // deliver
  515. Thread.sleep(305);
  516. emitter.onNext(7l); // deliver
  517. Thread.sleep(510);
  518. emitter.onComplete();
  519. }
  520. }).observeOn(NewThreadScheduler.instance()).throttleLast(500, TimeUnit.MILLISECONDS).subscribeOn(NewThreadScheduler.instance()).subscribe(new DisposableObserver<Long>() {
  521. @Override
  522. public void onNext(Long value) {
  523. System.out.println(value);
  524. }
  525.  
  526. @Override
  527. public void onError(Throwable e) {
  528.  
  529. }
  530.  
  531. @Override
  532. public void onComplete() {
  533.  
  534. }
  535. });
  536. Thread.sleep(100000);
  537. }
  538.  
  539. public static void main36(String[] args) throws InterruptedException {
  540. //timer 过多长时间后执行发送事件
  541. Observable.timer(2000, TimeUnit.MILLISECONDS).observeOn(NewThreadScheduler.instance()).subscribeOn(NewThreadScheduler.instance()).subscribe(new DisposableObserver<Long>() {
  542. @Override
  543. public void onNext(Long value) {
  544. System.out.println(value);
  545. }
  546.  
  547. @Override
  548. public void onError(Throwable e) {
  549.  
  550. }
  551.  
  552. @Override
  553. public void onComplete() {
  554. System.out.println("onComplete");
  555. }
  556. });
  557. Thread.sleep(100000);
  558. }
  559.  
  560. public static void main37(String[] args) throws InterruptedException {
  561. Observable.interval(200, TimeUnit.MILLISECONDS).take(12).window(6, TimeUnit.SECONDS).
  562. subscribeOn(NewThreadScheduler.instance()).subscribe(new DisposableObserver<Observable<Long>>() {
  563. @Override
  564. public void onNext(Observable<Long> value) {
  565. System.out.println("onNext:" + value);
  566. value.subscribe(new DisposableObserver<Long>() {
  567. @Override
  568. public void onNext(Long value) {
  569. System.out.println(value);
  570. }
  571.  
  572. @Override
  573. public void onError(Throwable e) {
  574.  
  575. }
  576.  
  577. @Override
  578. public void onComplete() {
  579. System.out.println("value onComplete");
  580. }
  581. });
  582. }
  583.  
  584. @Override
  585. public void onError(Throwable e) {
  586.  
  587. }
  588.  
  589. @Override
  590. public void onComplete() {
  591. System.out.println("onComplete");
  592. }
  593. });
  594. Thread.sleep(100000);
  595. }
  596.  
  597. public static void main66(String[] args) {
  598. //zip 两个Observable 进行处理 得到一个结果
  599. Observable.zip(Observable.just(3, 4, 5, 6),Observable.just(1, 2, 0), new BiFunction<Integer, Integer, Integer>() {
  600.  
  601. @Override
  602. public Integer apply(Integer integer, Integer integer2) throws Exception {
  603. return integer;
  604. }
  605. }).subscribe(getFirstObserver());
  606. }
  607.  
  608. public static void main(String[] args) {
  609. Observable.just(1, 2).compose(new ObservableTransformer<Integer, String>() {
  610. @Override
  611. public ObservableSource<String> apply(Observable<Integer> upstream) {
  612. return upstream.map(new Function<Integer, String>() {
  613. @Override
  614. public String apply(Integer integer) throws Exception {
  615. return String.valueOf(integer);
  616. }
  617. });
  618. }
  619. });
  620. }
  621. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement