Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- private HashSet<T> convert(ResultSet rs, Integer page) throws ExecutionException, InterruptedException {
- CassandraConverter converter = cassandra.getConverter();
- HashSet<T> elements = new LinkedHashSet<>();
- if(rs == null) {
- return elements;
- }
- // How far we can go without triggering the blocking fetch:
- int remainingInPage = rs.getAvailableWithoutFetching();
- LOG.info("Remaining rows in page: {}", remainingInPage);
- // fetch rows of the page
- for (Row row : rs) {
- T element = converter.read(getGenericTypeClass(), row);
- elements.add(element);
- if (--remainingInPage == 0) {
- break;
- }
- }
- boolean wasLastPage = rs.getExecutionInfo().getPagingState() == null;
- if(!wasLastPage) {
- ListenableFuture<ResultSet> nextResult = rs.fetchMoreResults();
- HashSet<T> nextPage = convert(nextResult.get(), page + 1);
- elements.addAll(nextPage);
- }
- return elements;
- }
- protected ArrayList<T> fetchMerged(final List<Statement> statements) {
- HashSet<T> elements = new LinkedHashSet<>();
- List<ResultSetFuture> futures = new ArrayList<>();
- for(Statement statement : statements) {
- statement.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
- futures.add(session.executeAsync(statement));
- }
- Future<List<ResultSet>> listListenableFuture = Futures.successfulAsList(futures);
- try {
- for(ResultSet rs : listListenableFuture.get()) {
- HashSet<T> convert = convert(rs, 0);
- elements.addAll(convert);
- }
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Failed to fetch data", e);
- Thread.currentThread().interrupt();
- }
- return new ArrayList<>(elements);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement