Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /**
- * the function to handle {@link DynamodbEvent} from a DynamoDB Stream
- *
- * @param event the event object
- * @param context the context object
- */
- public void handle(DynamodbEvent event, Context context) {
- List<DynamodbEvent.DynamodbStreamRecord> records = event.getRecords();
- BulkRequest bulkRequest = new BulkRequest();
- for (DynamodbEvent.DynamodbStreamRecord record : records) {
- try {
- final JacksonConverterImpl converter = new JacksonConverterImpl();
- final String eventName = record.getEventName();
- final StreamRecord streamRecord = record.getDynamodb();
- /*
- create a primary id from record
- */
- System.out.println(streamRecord);
- String id = streamRecord.getKeys().entrySet().stream().map(entry -> entry.getValue().getS()).collect(joining(":"));
- if (EventType.valueOf(eventName) == EventType.INSERT || EventType.valueOf(eventName) == EventType.MODIFY) {
- final Map<String, AttributeValue> newImage = streamRecord.getNewImage();
- if (newImage == null) {
- throw new RuntimeException("NewImage cannot be null, sequenceNumber:" + streamRecord.getSequenceNumber());
- }
- JsonNode payload = converter.mapToJsonObject(newImage);
- final IndexRequest indexRequest = new IndexRequest(INDEX, "_doc", id).source(payload.toString(), XContentType.JSON);
- bulkRequest.add(indexRequest);
- System.out.println("IndexRequest:" + indexRequest.toString());
- } else if (EventType.valueOf(eventName) == EventType.REMOVE) {
- final DeleteRequest deleteRequest = new DeleteRequest(INDEX, "_doc", id);
- bulkRequest.add(deleteRequest);
- System.out.println("IndexRequest:" + deleteRequest.toString());
- }
- } catch (JacksonConverterException e) {
- e.printStackTrace();
- }
- }
- executeElasticsearchRESTRequest(bulkRequest);
- }
Add Comment
Please, Sign In to add comment