package com.vertigo.user; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.regions.Regions; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.ScanRequest; import com.amazonaws.services.dynamodbv2.model.ScanResult; import io.reactivex.BackpressureStrategy; import io.reactivex.Flowable; import lombok.val; import org.apache.commons.lang3.tuple.Pair; public class DynaRun { public static void main(String[] args) { val awsCredentials = new BasicAWSCredentials( "access-key", "secret-key" ); val client = AmazonDynamoDBClientBuilder .standard() .withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) .withRegion(Regions.US_WEST_2) .build(); val disposable = Flowable.range(0, Integer.MAX_VALUE) .zipWith(flowableScan(client, "load-post", 1000), Pair::of) // .map(ScanResult::getItems) .subscribe(pair -> System.out.println( "page: " + pair.getLeft() + ", items: " + pair.getRight().getCount() ) ); disposable.dispose(); // not needed in this case, but in general we might want to do it // other example: the flow of batches of keys // Flowable> itemsFlow = // flowableScan(client, "staging-post", 1000) // .flatMapIterable(ScanResult::getItems) // .map(Map::keySet); } public static Flowable flowableScan( AmazonDynamoDB client, String table, int batchSize ) { val ref = new AtomicReference>(); return Flowable.create(e -> { if (e.isCancelled()) { return; } try { do { ScanRequest scanRequest = new ScanRequest() .withTableName(table) .withAttributesToGet("postid", "body") .withLimit(batchSize) .withExclusiveStartKey(ref.get()); val scanResult = client.scan(scanRequest); // NOTE: the last page returns getLastEvaluatedKey == null ref.set(scanResult.getLastEvaluatedKey()); e.onNext(scanResult); } while (ref.get() != null && !ref.get().isEmpty()); } catch (Exception ex) { e.onError(ex); } }, BackpressureStrategy.BUFFER); } } // pom file com.amazonaws aws-java-sdk-core 1.11.421 com.amazonaws aws-java-sdk-dynamodb 1.11.421 io.reactivex.rxjava2 rxjava 2.2.2