Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- final BehaviorSubject<Long> bs = BehaviorSubject.createDefault(1490756552L);
- mDisposable = bs
- .subscribeOn(Schedulers.io())
- .flatMap(new Function<Long, ObservableSource<List<MessageRecordTypeList>>>() {
- @Override
- public ObservableSource<List<MessageRecordTypeList>> apply(final Long timestamp) throws Exception {
- return updateFromNetwork(timestamp);
- }
- })
- .takeUntil(new Predicate<List<MessageRecordTypeList>>() {
- @Override
- public boolean test(List<MessageRecordTypeList> msgList) throws Exception {
- return mNoMoreMsg = msgList == null || msgList.size() == 0;
- }
- })
- .doOnNext(new Consumer<List<MessageRecordTypeList>>() {
- @Override
- public void accept(List<MessageRecordTypeList> o) throws Exception {
- if(o != null && o.size() > 0){
- // that's where I want to trigger a new round
- bs.onNext(o.get(o.size() - 1).receiveTime - 1);
- }
- }
- })
- .observeOn(AndroidSchedulers.mainThread())
- .subscribeWith(new DisposableObserver<List<MessageRecordTypeList>>() {
- @Override
- public void onNext(List<MessageRecordTypeList> value) {
- notifyOnSuccess(value);
- }
- @Override
- public void onError(Throwable e) {
- e.printStackTrace();
- }
- @Override
- public void onComplete() {
- notifyOnFinish();
- }
- });
- private Observable<List<MessageRecordTypeList>> updateFromNetwork(final long timestamp) {
- return Observable.create(new ObservableOnSubscribe<List<MessageRecordTypeList>>() {
- @Override
- public void subscribe(final ObservableEmitter<List<MessageRecordTypeList>> e) throws Exception {
- // get data from network
- }
- });
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement