Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * Creates a paginated observable over a potentially large result set. The purpose is to avoid the memory leak which
- * occurs on invocation of a huge view over a long period of time.
- * The created observable emits items in batches, hiding the details about batching & pagination. The consumer
- * does't see any difference between paginated and non-paginated observable.
- */
- public Observable<AsyncViewRow> observableRowsWithPagination(final ViewQuery query) {
- LOG.debug("query with pagination: {}", query);
- //TODO extract initial skip & limit from query and use skip as initial skip
- return Observable.create(emitter -> {
- final Pair<String, Object> empty = Pair.of(null, null);
- //The pair holds details about the last observed row (documentId & key)
- Pair<String, Object> pair = empty;
- int batch = 0;
- final AtomicBoolean cancelled = new AtomicBoolean();
- //Use cancellation to notify the producer about the downstream error.
- emitter.setCancellation(() -> cancelled.set(true));
- do {
- LOG.debug("batch: {}", ++batch);
- final ViewQuery batchQuery = query.limit(BATCH_SIZE);
- pair = batchQuery(batchQuery, pair)
- .flatMap(AsyncViewResult::rows)
- .doOnNext(emitter::onNext)
- .doOnError(t -> LOG.error("cancelled during pagination", t))
- .doOnError(emitter::onError)
- .onErrorResumeNext(empty())
- .map(row -> Pair.of(row.id(), row.key()))
- .reduce(empty, (a, b) -> b)
- .toBlocking()
- .single();
- } while (!cancelled.get() && pair.getKey() != null);
- emitter.onCompleted();
- }, BUFFER);
- }
- /**
- * Actual invocation of the paginated query. The original query is modified so that it changes the startKeyDocId in
- * order to make the query invocation efficient by avoiding large skip value
- * (@link https://docs.couchbase.com/server/5.1/views/views-querying.html#pagination).
- */
- private Observable<AsyncViewResult> batchQuery(final ViewQuery query, final Pair<String, Object> pair) {
- final ViewQuery batchQuery = ofNullable(pair.getKey())
- .map(id -> toBatchQuery(query, pair.getValue(), id))
- .orElse(query);
- LOG.debug("batch query: {}", query);
- return bucket.async().query(batchQuery);
- }
- private ViewQuery toBatchQuery(final ViewQuery query, final Object key, final String docId) {
- if (key instanceof String) {
- query.startKey(key.toString());
- } else if (key instanceof JsonArray) {
- query.startKey((JsonArray) key);
- }
- return query.startKeyDocId(docId).skip(1);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement