Advertisement
alexo123

Couchbase Memory Leak - Fix with pagination

Aug 12th, 2019
193
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.92 KB | None | 0 0
  1.  
  2.     /**
  3.      * Creates a paginated observable over a potentially large result set. The purpose is to avoid the memory leak which
  4.      * occurs on invocation of a huge view over a long period of time.
  5.      * The created observable emits items in batches, hiding the details about batching & pagination. The consumer
  6.      * does't see any difference between paginated and non-paginated observable.
  7.      */
  8.     public Observable<AsyncViewRow> observableRowsWithPagination(final ViewQuery query) {
  9.         LOG.debug("query with pagination: {}", query);
  10.         //TODO extract initial skip & limit from query and use skip as initial skip
  11.         return Observable.create(emitter -> {
  12.             final Pair<String, Object> empty = Pair.of(null, null);
  13.             //The pair holds details about the last observed row (documentId & key)
  14.             Pair<String, Object> pair = empty;
  15.             int batch = 0;
  16.             final AtomicBoolean cancelled = new AtomicBoolean();
  17.             //Use cancellation to notify the producer about the downstream error.
  18.             emitter.setCancellation(() -> cancelled.set(true));
  19.             do {
  20.                 LOG.debug("batch: {}", ++batch);
  21.                 final ViewQuery batchQuery = query.limit(BATCH_SIZE);
  22.                 pair = batchQuery(batchQuery, pair)
  23.                         .flatMap(AsyncViewResult::rows)
  24.                         .doOnNext(emitter::onNext)
  25.                         .doOnError(t -> LOG.error("cancelled during pagination", t))
  26.                         .doOnError(emitter::onError)
  27.                         .onErrorResumeNext(empty())
  28.                         .map(row -> Pair.of(row.id(), row.key()))
  29.                         .reduce(empty, (a, b) -> b)
  30.                         .toBlocking()
  31.                         .single();
  32.             } while (!cancelled.get() && pair.getKey() != null);
  33.             emitter.onCompleted();
  34.         }, BUFFER);
  35.     }
  36.  
  37.     /**
  38.      * Actual invocation of the paginated query. The original query is modified so that it changes the startKeyDocId in
  39.      * order to make the query invocation efficient by avoiding large skip value
  40.      * (@link https://docs.couchbase.com/server/5.1/views/views-querying.html#pagination).
  41.      */
  42.     private Observable<AsyncViewResult> batchQuery(final ViewQuery query, final Pair<String, Object> pair) {
  43.         final ViewQuery batchQuery = ofNullable(pair.getKey())
  44.                 .map(id -> toBatchQuery(query, pair.getValue(), id))
  45.                 .orElse(query);
  46.         LOG.debug("batch query: {}", query);
  47.         return bucket.async().query(batchQuery);
  48.     }
  49.  
  50.     private ViewQuery toBatchQuery(final ViewQuery query, final Object key, final String docId) {
  51.         if (key instanceof String) {
  52.             query.startKey(key.toString());
  53.         } else if (key instanceof JsonArray) {
  54.             query.startKey((JsonArray) key);
  55.         }
  56.         return query.startKeyDocId(docId).skip(1);
  57.     }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement