Advertisement
NLinker

Flow of scan results for DynamoDb

Oct 3rd, 2018
302
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.44 KB | None | 0 0
  1. package com.vertigo.user;
  2.  
  3. import java.util.Collections;
  4. import java.util.Map;
  5. import java.util.concurrent.atomic.AtomicReference;
  6. import com.amazonaws.auth.AWSStaticCredentialsProvider;
  7. import com.amazonaws.auth.BasicAWSCredentials;
  8. import com.amazonaws.regions.Regions;
  9. import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
  10. import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
  11. import com.amazonaws.services.dynamodbv2.model.AttributeValue;
  12. import com.amazonaws.services.dynamodbv2.model.ScanRequest;
  13. import com.amazonaws.services.dynamodbv2.model.ScanResult;
  14. import io.reactivex.BackpressureStrategy;
  15. import io.reactivex.Flowable;
  16. import lombok.val;
  17. import org.apache.commons.lang3.tuple.Pair;
  18.  
  19. public class DynaRun {
  20.  
  21.     public static void main(String[] args) {
  22.         val awsCredentials = new BasicAWSCredentials(
  23.             "access-key",
  24.             "secret-key"
  25.         );
  26.         val client = AmazonDynamoDBClientBuilder
  27.             .standard()
  28.             .withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
  29.             .withRegion(Regions.US_WEST_2)
  30.             .build();
  31.  
  32.         val disposable = Flowable.range(0, Integer.MAX_VALUE)
  33.             .zipWith(flowableScan(client, "load-post", 1000), Pair::of)
  34.             // .map(ScanResult::getItems)
  35.             .subscribe(pair ->
  36.                 System.out.println(
  37.                     "page: " + pair.getLeft() +
  38.                     ", items: " + pair.getRight().getCount()
  39.                 )
  40.             );
  41.         disposable.dispose(); // not needed in this case, but in general we might want to do it
  42.  
  43.         // other example: the flow of batches of keys
  44.         // Flowable<Set<String>> itemsFlow =
  45.         //     flowableScan(client, "staging-post", 1000)
  46.         //     .flatMapIterable(ScanResult::getItems)
  47.         //     .map(Map::keySet);
  48.  
  49.     }
  50.  
  51.     public static Flowable<ScanResult> flowableScan(
  52.         AmazonDynamoDB client, String table, int batchSize
  53.     ) {
  54.         val ref = new AtomicReference<Map<String, AttributeValue>>();
  55.         return Flowable.create(e -> {
  56.             if (e.isCancelled()) {
  57.                 return;
  58.             }
  59.             try {
  60.                 do {
  61.                     ScanRequest scanRequest = new ScanRequest()
  62.                         .withTableName(table)
  63.                         .withAttributesToGet("postid", "body")
  64.                         .withLimit(batchSize)
  65.                         .withExclusiveStartKey(ref.get());
  66.                    
  67.                     val scanResult = client.scan(scanRequest);
  68.                     // NOTE: the last page returns getLastEvaluatedKey == null
  69.                     ref.set(scanResult.getLastEvaluatedKey());
  70.                     e.onNext(scanResult);
  71.                 } while (ref.get() != null && !ref.get().isEmpty());
  72.             } catch (Exception ex) {
  73.                 e.onError(ex);
  74.             }
  75.         }, BackpressureStrategy.BUFFER);
  76.     }
  77.  
  78.  
  79. }
  80.  
  81. // pom file
  82.     <dependency>
  83.       <groupId>com.amazonaws</groupId>
  84.       <artifactId>aws-java-sdk-core</artifactId>
  85.       <version>1.11.421</version>
  86.     </dependency>
  87.     <dependency>
  88.       <groupId>com.amazonaws</groupId>
  89.       <artifactId>aws-java-sdk-dynamodb</artifactId>
  90.       <version>1.11.421</version>
  91.     </dependency>
  92.     <dependency>
  93.       <groupId>io.reactivex.rxjava2</groupId>
  94.       <artifactId>rxjava</artifactId>
  95.       <version>2.2.2</version>
  96.     </dependency>
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement