Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- DataStream<Document> streamSource = env
- .addSource(kafkaConsumer).setParallelism(4);
- public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Document> {
- private final long maxOutOfOrderness = 3500; // 3.5 seconds
- private long currentMaxTimestamp;
- @Override
- public long extractTimestamp(Document event, long previousElementTimestamp) {
- Map timeStamp = (Map) event.get("ts");
- this.currentMaxTimestamp = (long) timeStamp.get("value");
- return currentMaxTimestamp;
- }
- @Override
- public Watermark getCurrentWatermark() {
- // return the watermark as current highest timestamp minus the out-of-orderness bound
- return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
- }
- }
Add Comment
Please, Sign In to add comment