Guest User

Untitled

a guest
Mar 24th, 2018
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.11 KB | None | 0 0
  1. import io.reactivex.Observable;
  2. import io.reactivex.disposables.Disposable;
  3. import io.reactivex.functions.Consumer;
  4. import io.reactivex.observers.DisposableObserver;
  5. import io.reactivex.schedulers.Schedulers;
  6.  
  7.  
  8. /**
  9. * Замена вызовов лапши из колбеков на RxJava.
  10. * <p>Задача - последовательное выполнение процессов.
  11. * <p>{@link B} подписан на {@link A}, <b>main</b> подписан на {@link C}. Все выполняется в фоне в main.
  12. * <ul>
  13. * <li>Выполняется {@link A}
  14. * <li>{@link B} ожидает завершения {@link A} и выполняется сам
  15. * <li>После завершения {@link B} выполняется {@link C}
  16. * <li>Результат выводится на консоль
  17. * </ul>
  18. * Фишка:
  19. * <ul>
  20. * <li>Все выполнение в фоне
  21. * <li>Результат в главном потоке
  22. * </ul>
  23. */
  24. public class WithRx2 {
  25.  
  26. static class A {
  27.  
  28. Observable<String> process(final String s) {
  29.  
  30. // Иллюстрация выполнения в фоновом потоке
  31. // System.out.println("thread: " + Thread.currentThread().getName());
  32.  
  33. return Observable.create(e -> {
  34.  
  35. // System.out.println("thread: " + Thread.currentThread().getName());
  36.  
  37. if (s == null) {
  38. e.onError(new Exception());
  39. }
  40.  
  41. // Основной цикл
  42. String _s = s;
  43. for (int i = 0; i < 3; ++i) {
  44. Thread.sleep(100);
  45. _s += "A";
  46. e.onNext(_s);
  47. }
  48.  
  49. // И завершение
  50. e.onComplete();
  51. });
  52.  
  53. }
  54.  
  55.  
  56. }
  57.  
  58. static class B {
  59.  
  60. private A a = new A();
  61.  
  62. Observable<String> process(final String s) {
  63.  
  64. final String[] _s = {s};
  65.  
  66. // Задача - последовательное выполнение процессов. Сначала A.process(), потом B.process()
  67. Observable<String> o = Observable.create(e -> {
  68.  
  69. // Иллюстрация выполнения в фоновом потоке
  70. // System.out.println("thread: " + Thread.currentThread().getName());
  71.  
  72. if (_s[0] == null) {
  73. e.onError(new Exception("String is null"));
  74. }
  75.  
  76. // В первую очередь выполнится a.process().
  77. Disposable d = a.process(s)
  78. .subscribeWith(new DisposableObserver<String>() {
  79.  
  80. @Override
  81. public void onNext(String _s1) {
  82. // При помощи .onNext каждый некст переправится к эмиттеру.
  83. e.onNext(_s1);
  84. _s[0] = _s1;
  85. }
  86.  
  87. @Override
  88. public void onError(Throwable throwable) {
  89. }
  90.  
  91. @Override
  92. public void onComplete() {
  93. }
  94. });
  95. d.dispose();
  96.  
  97. // Затем основной цикл
  98. for (int i = 0; i < 3; ++i) {
  99. Thread.sleep(100);
  100. _s[0] += "B";
  101. e.onNext(_s[0]);
  102. }
  103.  
  104. // И завершение
  105. e.onComplete();
  106.  
  107. });
  108.  
  109. return o;
  110. }
  111.  
  112. }
  113.  
  114. static class C {
  115.  
  116. Observable<String> process(final String s) {
  117.  
  118. return Observable.create(e -> {
  119.  
  120. // Иллюстрация выполнения в фоновом потоке
  121. // System.out.println("thread: " + Thread.currentThread().getName());
  122.  
  123. if (s == null) {
  124. e.onError(new Exception("String is null"));
  125. }
  126.  
  127. String _s = s;
  128. for (int i = 0; i < 3; ++i) {
  129. Thread.sleep(100);
  130. _s += "C";
  131. e.onNext(_s);
  132. }
  133.  
  134. e.onComplete();
  135. });
  136.  
  137. }
  138.  
  139. }
  140.  
  141. // ~Мир Rx
  142. //------------------------------------------------------------------------------------------------------------------
  143.  
  144.  
  145. public static void main(String[] args) {
  146.  
  147. final String[] s = {"string_"};
  148.  
  149. System.out.println("started with: " + s[0]);
  150.  
  151. // Задача - последовательное выполнение процессов.
  152. Observable<String> observable = Observable.create(e -> {
  153.  
  154. // Метод выполняется первым и перебрасывает сообщения эмиттеру
  155. new B()
  156. .process(s[0])
  157. .subscribe(e::onNext, e::onError);
  158.  
  159. // Метод выполняется вторым и перебрасывает сообщения эмиттеру.
  160. // Завершение метода завершает все.
  161. new C()
  162. .process(s[0])
  163. .subscribe(e::onNext, e::onError, e::onComplete);
  164.  
  165. });
  166.  
  167. observable
  168. // Выполняем в фоне
  169. .subscribeOn(Schedulers.io())
  170. // Подписываемся в блокирующем (основном) потоке
  171. .blockingSubscribe(new DisposableObserver<String>() {
  172.  
  173. @Override
  174. public void onNext(String _s) {
  175. // Иллюстрация выполнения в основном потоке
  176. // System.out.println("thread: " + Thread.currentThread().getName());
  177. System.out.println("onNext: " + _s);
  178. s[0] = _s;
  179. }
  180.  
  181. @Override
  182. public void onError(Throwable throwable) {
  183. System.out.println("onError: " + throwable.getLocalizedMessage());
  184. }
  185.  
  186. @Override
  187. public void onComplete() {
  188. // Иллюстрация выполнения в основном потоке
  189. // System.out.println("thread: " + Thread.currentThread().getName());
  190. System.out.println("onComplete: " + s[0]);
  191. }
  192. });
  193.  
  194. System.out.println("finished with: " + s[0]);
  195.  
  196. // На выходе - ожидаемые сообщения
  197. // started with: string_
  198. // onNext: string_A
  199. // onNext: string_AA
  200. // onNext: string_AAA
  201. // onNext: string_AAAB
  202. // onNext: string_AAABB
  203. // onNext: string_AAABBB
  204. // onNext: string_AAABBBC
  205. // onNext: string_AAABBBCC
  206. // onNext: string_AAABBBCCC
  207. // onComplete: string_AAABBBCCC
  208. // finished with: string_AAABBBCCC
  209.  
  210.  
  211. }
  212.  
  213.  
  214. }
Add Comment
Please, Sign In to add comment