Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class FlowableExample {
- public static void main1(String[] args) {
- //reduce 操作符 结果为50+1+2
- Flowable<Integer> flowable = Flowable.just(1, 2);
- flowable.reduce(50, (t1, t2) -> {
- System.out.println("f1:" + t1);
- System.out.println("f2:" + t2);
- return t1 + t2;
- }).subscribeWith(new SingleObserver<Integer>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onSuccess(Integer value) {
- System.out.println(value);
- }
- @Override
- public void onError(Throwable e) {
- }
- });
- }
- public static void main2(String[] args) {
- /*AsyncSubject只会发射最后一次onNext的值,并且只有在执行onComplete方法之后
- * */
- AsyncSubject<Integer> asyncSubject = AsyncSubject.create();
- asyncSubject.subscribe(getFirstObserver());
- asyncSubject.onNext(1);
- asyncSubject.onNext(2);
- asyncSubject.subscribe(getSecondObserver());
- asyncSubject.onNext(3);
- asyncSubject.onNext(4);
- asyncSubject.onComplete();
- }
- public static void main3(String[] args) {
- /*
- BehaviorSubject发射订阅前最近的和之后的
- */
- BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();
- behaviorSubject.subscribe(getFirstObserver());//获得1 2 3 4 onComplete
- behaviorSubject.onNext(1);
- behaviorSubject.onNext(2);
- behaviorSubject.onNext(3);
- behaviorSubject.subscribe(getSecondObserver()); //获得 3 4 onComplete
- behaviorSubject.onNext(4);
- behaviorSubject.onComplete();
- }
- public static void main333(String[] args) {
- PublishSubject<Integer> publishSubject = PublishSubject.create();
- publishSubject.subscribe(getFirstObserver());//onSubscribe 获得 1234onComplete
- publishSubject.onNext(1);
- publishSubject.onNext(2);
- publishSubject.onNext(3);
- publishSubject.subscribe(getSecondObserver()); //onSubscribe 获得 4 onComplete
- publishSubject.onNext(4);
- publishSubject.onComplete();
- }
- public static void main4(String[] args) {
- /*
- buffer将要发射的值包含在一个List中
- */
- Observable<List<String>> listObservable = getStringObservable().buffer(3, 1);
- //3表示最大数量为3,1表示下次重新缓存后要跳过的个数
- listObservable.subscribe(getBufferObserver());
- }
- public static Observer<Integer> getFirstObserver() {
- return new Observer<Integer>() {
- @Override
- public void onSubscribe(Disposable d) {
- System.out.println("first onSubscribe");
- }
- @Override
- public void onNext(Integer value) {
- System.out.println("first onNext" + value);
- }
- @Override
- public void onError(Throwable e) {
- e.printStackTrace();
- }
- @Override
- public void onComplete() {
- System.out.println("first onComplete");
- }
- };
- }
- public static Observer<Integer> getSecondObserver() {
- return new Observer<Integer>() {
- @Override
- public void onSubscribe(Disposable d) {
- System.out.println("Second onSubscribe");
- }
- @Override
- public void onNext(Integer value) {
- System.out.println("Second onNext" + value);
- }
- @Override
- public void onError(Throwable e) {
- e.printStackTrace();
- }
- @Override
- public void onComplete() {
- System.out.println("Second onComplete");
- }
- };
- }
- public static Observable<String> getStringObservable() {
- System.out.println("getStringObservable");
- return Observable.just("one", "two", "three", "four", "five", "one", "two", "three", "four2", "five");
- }
- public static Observer<? super List<String>> getBufferObserver() {
- return new Observer<List<String>>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onNext(List<String> value) {
- System.out.println(value);
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- };
- }
- public static Observer<String> getStringObserver() {
- return new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onNext(String value) {
- System.out.println(value);
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- };
- }
- public static void main5(String[] args) {
- //执行耗时任务但不返回值 只会返回onComplete或者onError事件
- Completable completable = Completable.timer(2, TimeUnit.SECONDS);
- completable.subscribeOn(NewThreadScheduler.instance()).subscribeWith(new CompletableObserver() {
- @Override
- public void onSubscribe(Disposable d) {
- Logger.getGlobal().log(Level.INFO, " onSubscribe Disposable: " + d.isDisposed());
- }
- @Override
- public void onComplete() {
- Logger.getGlobal().log(Level.INFO, "onComplete");
- }
- @Override
- public void onError(Throwable e) {
- e.printStackTrace();
- }
- });
- try {
- Thread.sleep(Integer.MAX_VALUE);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- public static void main7(String[] args) {
- /*
- Using debounce() -> only emit an item from an Observable if a particular time-span has
- passed without it emitting another item, so it will emit 2, 4, 5 as we have simulated it.
- debounce发射一个值,这个值在某个时间范围内没有新的值时才起作用
- 1 过了500,没有新的值 发射
- */
- Observable observable = getDebounceObservable();
- observable.debounce(500, TimeUnit.MILLISECONDS).subscribe(getFirstObserver());
- }
- private static Observable getDebounceObservable() {
- return Observable.create(new ObservableOnSubscribe<Integer>() {
- @Override
- public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
- emitter.onNext(1); // skip
- Thread.sleep(500);
- emitter.onNext(2); // deliver
- Thread.sleep(505); //>500 发射
- emitter.onNext(3); // skip
- Thread.sleep(100); // <500不发射
- emitter.onNext(4); // deliver
- Thread.sleep(605); // >500发射
- emitter.onNext(5); // deliver
- Thread.sleep(510); // >500发射
- emitter.onComplete();
- }
- });
- }
- public static void main8(String[] args) {
- //defer延迟执行Observable的代码 到subscribe时执行
- Car car = new Car();
- Observable<String> defer = Observable.defer(new Callable<ObservableSource<String>>() {
- @Override
- public ObservableSource<String> call() throws Exception {
- return Observable.just(car.getBrand());
- }
- });
- car.setBrand("BMW");
- defer.subscribe(getStringObserver());
- }
- public static void main9(String[] args) throws InterruptedException {
- //delay subscribe延迟订阅
- System.out.println("开始运行");
- getStringObservable().delay(3, TimeUnit.SECONDS).subscribeOn(NewThreadScheduler.instance()).subscribe(getStringObserver());
- Thread.sleep(5000);
- }
- public static void main10(String[] args) throws InterruptedException {
- //CompositeDisposable 用于在某个时候取消注册的事件 可以用在onDestory中取消
- CompositeDisposable compositeDisposable = new CompositeDisposable();
- DisposableObserver<String> disposableObserver = getStringIntObservable().observeOn(NewThreadScheduler.instance()).subscribeOn(NewThreadScheduler.instance()).subscribeWith(new DisposableObserver<String>() {
- @Override
- public void onComplete() {
- System.out.println(" onComplete");
- }
- @Override
- public void onError(Throwable e) {
- System.out.println(" onError");
- }
- @Override
- public void onNext(String value) {
- System.out.println(" onNext" + value);
- }
- });
- compositeDisposable.add(disposableObserver);
- Thread.sleep(10000);
- //取消事件
- compositeDisposable.clear();
- System.out.println("继续睡眠");
- Thread.sleep(Integer.MAX_VALUE);
- }
- private static Observable<String> getStringIntObservable() {
- return Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {
- @Override
- public String apply(Long aLong) throws Exception {
- System.out.println("apply :" + aLong);
- return String.valueOf(aLong);
- }
- });
- }
- public static void main11(String[] args) {
- // distinct 会过滤掉重复的值
- getStringObservable().distinct().subscribe(getStringObserver());
- }
- public static void main12(String[] args) {
- //filter设置条件过滤
- Observable.just(1, 2, 3, 4, 5, 6).filter(new Predicate<Integer>() {
- @Override
- public boolean test(Integer integer) throws Exception {
- return integer % 2 == 1;
- }
- }).subscribe(getFirstObserver());
- }
- public static void main13(String[] args) {
- //flowable reduce两个方法返回的值类型并不相同
- Flowable<Integer> flowable = Flowable.just(1, 2, 3);
- Maybe<Integer> reduce = flowable.reduce(new BiFunction<Integer, Integer, Integer>() {
- @Override
- public Integer apply(Integer integer, Integer integer2) throws Exception {
- return integer - integer2;
- }
- });
- reduce.subscribe(new MaybeObserver<Integer>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onSuccess(Integer value) {
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- });
- Flowable<Integer> observable = Flowable.just(1, 2, 3, 4);
- observable.reduce(50, new BiFunction<Integer, Integer, Integer>() {
- @Override
- public Integer apply(Integer t1, Integer t2) {
- return t1 + t2;
- }
- }).subscribe(new SingleObserver<Integer>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onSuccess(Integer value) {
- }
- @Override
- public void onError(Throwable e) {
- }
- });
- }
- public static void main14(String[] args) throws InterruptedException {
- CompositeDisposable compositeDisposable = new CompositeDisposable();
- DisposableObserver<Long> disposableObserver = Observable.interval(1, TimeUnit.SECONDS).subscribeOn(NewThreadScheduler.instance()).subscribeWith(new DisposableObserver<Long>() {
- @Override
- public void onNext(Long value) {
- System.out.println(value);
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- });
- compositeDisposable.add(disposableObserver);
- Thread.sleep(10000);
- compositeDisposable.clear();
- System.out.println("已清除");
- Thread.sleep(10000);
- }
- public static void main15(String[] args) {
- // last first 取最后或者最前的一个 default为Observable没有发射值时的默认值
- Observable.just("A", "B", "C", "D", "E").first("default").subscribe(new SingleObserver<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onSuccess(String value) {
- System.out.println(value);
- }
- @Override
- public void onError(Throwable e) {
- }
- });
- }
- public static void main16(String[] args) {
- Observable.just(1, 2, 3, 4).map(new Function<Integer, String>() {
- @Override
- public String apply(Integer integer) throws Exception {
- return String.valueOf(integer);
- }
- }).subscribe(getStringObserver());
- }
- public static void main6(String[] args) {
- //concat关联两个observable,先执行完第一个Observable的值,再执行第二个的
- String[] arr1 = {"a1", "a2", "a3", "a4"};
- String[] arr2 = {"b1", "b2", "b3"};
- Observable.concat(Observable.fromArray(arr1), Observable.fromArray(arr2))
- .subscribe(getStringObserver());
- }
- public static void main17(String[] args) {
- //合并两个Observable 但是不保证执行顺序
- String[] arr1 = {"a1", "a2", "a3", "a4"};
- String[] arr2 = {"b1", "b2", "b3"};
- Observable.merge(Observable.fromArray(arr1), Observable.fromArray(arr2)).subscribe(getStringObserver());
- }
- public static void main18(String[] args) {
- //replay操作符用于保证多个Observer获取的相同的结果,及时订阅前事件已发送
- PublishSubject<Integer> publishSubject = PublishSubject.create();
- ConnectableObservable<Integer> connectableObservable = publishSubject.replay(3);
- connectableObservable.connect();
- Observer<Integer> firstObserver = getFirstObserver();
- //获得事件 1234
- connectableObservable.subscribe(firstObserver);
- publishSubject.onNext(1);
- publishSubject.onNext(2);
- publishSubject.onNext(3);
- publishSubject.onNext(4);
- publishSubject.onComplete();
- Observer<Integer> secondObserver = getSecondObserver();
- //获得事件 234 因为replay的buffer size = 3
- connectableObservable.subscribe(secondObserver);
- }
- public static void main19(String[] args) {
- //ReplaySubject会让所有的observer获取到相同的结果,不管是什么时候订阅的
- ReplaySubject<Integer> replaySubject = ReplaySubject.create();
- replaySubject.onNext(1);
- replaySubject.onNext(2);
- replaySubject.subscribe(getFirstObserver());
- replaySubject.onNext(3);
- replaySubject.onNext(4);
- replaySubject.onComplete();
- replaySubject.subscribe(getSecondObserver());
- }
- public static void main20(String[] args) {
- // scan 把之前的结果带过来
- Observable.just(1, 2, 3, 4).scan(new BiFunction<Integer, Integer, Integer>() {
- @Override
- public Integer apply(Integer integer, Integer integer2) throws Exception {
- return integer + integer2;
- }
- }).subscribe(getFirstObserver());
- //结果为 1 3 6 10
- }
- public static void main21(String[] args) {
- Single.just("12").subscribe(new SingleObserver<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- }
- @Override
- public void onSuccess(String value) {
- System.out.println(value);
- }
- @Override
- public void onError(Throwable e) {
- }
- });
- }
- public static void main22(String[] args) {
- //skip会跳过某个数量的值
- Observable.just(1, 2, 3, 4, 5).skip(2).subscribe(getFirstObserver());
- }
- public static void main23(String[] args) {
- //take 只会发射某数量个值
- Observable.just(1, 2, 3, 4, 5).take(3).subscribe(getFirstObserver());
- }
- public static void main35(String[] args) throws InterruptedException {
- // throttleLast throttleFirst
- Observable.create(new ObservableOnSubscribe<Long>() {
- @Override
- public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
- Thread.sleep(0);
- emitter.onNext(1l); // skip
- emitter.onNext(2l); // deliver
- Thread.sleep(505);
- emitter.onNext(3l); // skip
- Thread.sleep(99);
- emitter.onNext(4l); // skip
- Thread.sleep(100);
- emitter.onNext(5l); // skip
- emitter.onNext(6l); // deliver
- Thread.sleep(305);
- emitter.onNext(7l); // deliver
- Thread.sleep(510);
- emitter.onComplete();
- }
- }).observeOn(NewThreadScheduler.instance()).throttleLast(500, TimeUnit.MILLISECONDS).subscribeOn(NewThreadScheduler.instance()).subscribe(new DisposableObserver<Long>() {
- @Override
- public void onNext(Long value) {
- System.out.println(value);
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- }
- });
- Thread.sleep(100000);
- }
- public static void main36(String[] args) throws InterruptedException {
- //timer 过多长时间后执行发送事件
- Observable.timer(2000, TimeUnit.MILLISECONDS).observeOn(NewThreadScheduler.instance()).subscribeOn(NewThreadScheduler.instance()).subscribe(new DisposableObserver<Long>() {
- @Override
- public void onNext(Long value) {
- System.out.println(value);
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- System.out.println("onComplete");
- }
- });
- Thread.sleep(100000);
- }
- public static void main37(String[] args) throws InterruptedException {
- Observable.interval(200, TimeUnit.MILLISECONDS).take(12).window(6, TimeUnit.SECONDS).
- subscribeOn(NewThreadScheduler.instance()).subscribe(new DisposableObserver<Observable<Long>>() {
- @Override
- public void onNext(Observable<Long> value) {
- System.out.println("onNext:" + value);
- value.subscribe(new DisposableObserver<Long>() {
- @Override
- public void onNext(Long value) {
- System.out.println(value);
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- System.out.println("value onComplete");
- }
- });
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onComplete() {
- System.out.println("onComplete");
- }
- });
- Thread.sleep(100000);
- }
- public static void main66(String[] args) {
- //zip 两个Observable 进行处理 得到一个结果
- Observable.zip(Observable.just(3, 4, 5, 6),Observable.just(1, 2, 0), new BiFunction<Integer, Integer, Integer>() {
- @Override
- public Integer apply(Integer integer, Integer integer2) throws Exception {
- return integer;
- }
- }).subscribe(getFirstObserver());
- }
- public static void main(String[] args) {
- Observable.just(1, 2).compose(new ObservableTransformer<Integer, String>() {
- @Override
- public ObservableSource<String> apply(Observable<Integer> upstream) {
- return upstream.map(new Function<Integer, String>() {
- @Override
- public String apply(Integer integer) throws Exception {
- return String.valueOf(integer);
- }
- });
- }
- });
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement