Advertisement
Guest User

Untitled

a guest
Mar 28th, 2017
56
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.08 KB | None | 0 0
  1. final BehaviorSubject<Long> bs = BehaviorSubject.createDefault(1490756552L);
  2. mDisposable = bs
  3. .subscribeOn(Schedulers.io())
  4. .flatMap(new Function<Long, ObservableSource<List<MessageRecordTypeList>>>() {
  5. @Override
  6. public ObservableSource<List<MessageRecordTypeList>> apply(final Long timestamp) throws Exception {
  7. return updateFromNetwork(timestamp);
  8. }
  9. })
  10. .takeUntil(new Predicate<List<MessageRecordTypeList>>() {
  11. @Override
  12. public boolean test(List<MessageRecordTypeList> msgList) throws Exception {
  13. return mNoMoreMsg = msgList == null || msgList.size() == 0;
  14. }
  15. })
  16. .doOnNext(new Consumer<List<MessageRecordTypeList>>() {
  17. @Override
  18. public void accept(List<MessageRecordTypeList> o) throws Exception {
  19. if(o != null && o.size() > 0){
  20. // that's where I want to trigger a new round
  21. bs.onNext(o.get(o.size() - 1).receiveTime - 1);
  22. }
  23. }
  24. })
  25. .observeOn(AndroidSchedulers.mainThread())
  26. .subscribeWith(new DisposableObserver<List<MessageRecordTypeList>>() {
  27. @Override
  28. public void onNext(List<MessageRecordTypeList> value) {
  29. notifyOnSuccess(value);
  30. }
  31. @Override
  32. public void onError(Throwable e) {
  33. e.printStackTrace();
  34. }
  35. @Override
  36. public void onComplete() {
  37. notifyOnFinish();
  38. }
  39. });
  40.  
  41. private Observable<List<MessageRecordTypeList>> updateFromNetwork(final long timestamp) {
  42. return Observable.create(new ObservableOnSubscribe<List<MessageRecordTypeList>>() {
  43. @Override
  44. public void subscribe(final ObservableEmitter<List<MessageRecordTypeList>> e) throws Exception {
  45. // get data from network
  46. }
  47. });
  48. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement