Guest User

Untitled

a guest
Jun 19th, 2018
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.73 KB | None | 0 0
  1. /**
  2. * the function to handle {@link DynamodbEvent} from a DynamoDB Stream
  3. *
  4. * @param event the event object
  5. * @param context the context object
  6. */
  7. public void handle(DynamodbEvent event, Context context) {
  8. List<DynamodbEvent.DynamodbStreamRecord> records = event.getRecords();
  9. BulkRequest bulkRequest = new BulkRequest();
  10. for (DynamodbEvent.DynamodbStreamRecord record : records) {
  11. try {
  12. final JacksonConverterImpl converter = new JacksonConverterImpl();
  13. final String eventName = record.getEventName();
  14. final StreamRecord streamRecord = record.getDynamodb();
  15. /*
  16. create a primary id from record
  17. */
  18. System.out.println(streamRecord);
  19. String id = streamRecord.getKeys().entrySet().stream().map(entry -> entry.getValue().getS()).collect(joining(":"));
  20.  
  21. if (EventType.valueOf(eventName) == EventType.INSERT || EventType.valueOf(eventName) == EventType.MODIFY) {
  22. final Map<String, AttributeValue> newImage = streamRecord.getNewImage();
  23. if (newImage == null) {
  24. throw new RuntimeException("NewImage cannot be null, sequenceNumber:" + streamRecord.getSequenceNumber());
  25. }
  26. JsonNode payload = converter.mapToJsonObject(newImage);
  27. final IndexRequest indexRequest = new IndexRequest(INDEX, "_doc", id).source(payload.toString(), XContentType.JSON);
  28. bulkRequest.add(indexRequest);
  29. System.out.println("IndexRequest:" + indexRequest.toString());
  30. } else if (EventType.valueOf(eventName) == EventType.REMOVE) {
  31. final DeleteRequest deleteRequest = new DeleteRequest(INDEX, "_doc", id);
  32. bulkRequest.add(deleteRequest);
  33. System.out.println("IndexRequest:" + deleteRequest.toString());
  34. }
  35. } catch (JacksonConverterException e) {
  36. e.printStackTrace();
  37. }
  38. }
  39. executeElasticsearchRESTRequest(bulkRequest);
  40. }
Add Comment
Please, Sign In to add comment