Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import io.reactivex.Observable;
- import io.reactivex.disposables.Disposable;
- import io.reactivex.functions.Consumer;
- import io.reactivex.observers.DisposableObserver;
- import io.reactivex.schedulers.Schedulers;
- /**
- * Замена вызовов лапши из колбеков на RxJava.
- * <p>Задача - последовательное выполнение процессов.
- * <p>{@link B} подписан на {@link A}, <b>main</b> подписан на {@link C}. Все выполняется в фоне в main.
- * <ul>
- * <li>Выполняется {@link A}
- * <li>{@link B} ожидает завершения {@link A} и выполняется сам
- * <li>После завершения {@link B} выполняется {@link C}
- * <li>Результат выводится на консоль
- * </ul>
- * Фишка:
- * <ul>
- * <li>Все выполнение в фоне
- * <li>Результат в главном потоке
- * </ul>
- */
- public class WithRx2 {
- static class A {
- Observable<String> process(final String s) {
- // Иллюстрация выполнения в фоновом потоке
- // System.out.println("thread: " + Thread.currentThread().getName());
- return Observable.create(e -> {
- // System.out.println("thread: " + Thread.currentThread().getName());
- if (s == null) {
- e.onError(new Exception());
- }
- // Основной цикл
- String _s = s;
- for (int i = 0; i < 3; ++i) {
- Thread.sleep(100);
- _s += "A";
- e.onNext(_s);
- }
- // И завершение
- e.onComplete();
- });
- }
- }
- static class B {
- private A a = new A();
- Observable<String> process(final String s) {
- final String[] _s = {s};
- // Задача - последовательное выполнение процессов. Сначала A.process(), потом B.process()
- Observable<String> o = Observable.create(e -> {
- // Иллюстрация выполнения в фоновом потоке
- // System.out.println("thread: " + Thread.currentThread().getName());
- if (_s[0] == null) {
- e.onError(new Exception("String is null"));
- }
- // В первую очередь выполнится a.process().
- Disposable d = a.process(s)
- .subscribeWith(new DisposableObserver<String>() {
- @Override
- public void onNext(String _s1) {
- // При помощи .onNext каждый некст переправится к эмиттеру.
- e.onNext(_s1);
- _s[0] = _s1;
- }
- @Override
- public void onError(Throwable throwable) {
- }
- @Override
- public void onComplete() {
- }
- });
- d.dispose();
- // Затем основной цикл
- for (int i = 0; i < 3; ++i) {
- Thread.sleep(100);
- _s[0] += "B";
- e.onNext(_s[0]);
- }
- // И завершение
- e.onComplete();
- });
- return o;
- }
- }
- static class C {
- Observable<String> process(final String s) {
- return Observable.create(e -> {
- // Иллюстрация выполнения в фоновом потоке
- // System.out.println("thread: " + Thread.currentThread().getName());
- if (s == null) {
- e.onError(new Exception("String is null"));
- }
- String _s = s;
- for (int i = 0; i < 3; ++i) {
- Thread.sleep(100);
- _s += "C";
- e.onNext(_s);
- }
- e.onComplete();
- });
- }
- }
- // ~Мир Rx
- //------------------------------------------------------------------------------------------------------------------
- public static void main(String[] args) {
- final String[] s = {"string_"};
- System.out.println("started with: " + s[0]);
- // Задача - последовательное выполнение процессов.
- Observable<String> observable = Observable.create(e -> {
- // Метод выполняется первым и перебрасывает сообщения эмиттеру
- new B()
- .process(s[0])
- .subscribe(e::onNext, e::onError);
- // Метод выполняется вторым и перебрасывает сообщения эмиттеру.
- // Завершение метода завершает все.
- new C()
- .process(s[0])
- .subscribe(e::onNext, e::onError, e::onComplete);
- });
- observable
- // Выполняем в фоне
- .subscribeOn(Schedulers.io())
- // Подписываемся в блокирующем (основном) потоке
- .blockingSubscribe(new DisposableObserver<String>() {
- @Override
- public void onNext(String _s) {
- // Иллюстрация выполнения в основном потоке
- // System.out.println("thread: " + Thread.currentThread().getName());
- System.out.println("onNext: " + _s);
- s[0] = _s;
- }
- @Override
- public void onError(Throwable throwable) {
- System.out.println("onError: " + throwable.getLocalizedMessage());
- }
- @Override
- public void onComplete() {
- // Иллюстрация выполнения в основном потоке
- // System.out.println("thread: " + Thread.currentThread().getName());
- System.out.println("onComplete: " + s[0]);
- }
- });
- System.out.println("finished with: " + s[0]);
- // На выходе - ожидаемые сообщения
- // started with: string_
- // onNext: string_A
- // onNext: string_AA
- // onNext: string_AAA
- // onNext: string_AAAB
- // onNext: string_AAABB
- // onNext: string_AAABBB
- // onNext: string_AAABBBC
- // onNext: string_AAABBBCC
- // onNext: string_AAABBBCCC
- // onComplete: string_AAABBBCCC
- // finished with: string_AAABBBCCC
- }
- }
Add Comment
Please, Sign In to add comment