Guest User

Untitled

a guest
Jun 19th, 2018
66
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.78 KB | None | 0 0
  1. DataStream<Document> streamSource = env
  2. .addSource(kafkaConsumer).setParallelism(4);
  3.  
  4. public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Document> {
  5.  
  6. private final long maxOutOfOrderness = 3500; // 3.5 seconds
  7.  
  8. private long currentMaxTimestamp;
  9.  
  10. @Override
  11. public long extractTimestamp(Document event, long previousElementTimestamp) {
  12. Map timeStamp = (Map) event.get("ts");
  13. this.currentMaxTimestamp = (long) timeStamp.get("value");
  14. return currentMaxTimestamp;
  15. }
  16.  
  17. @Override
  18. public Watermark getCurrentWatermark() {
  19. // return the watermark as current highest timestamp minus the out-of-orderness bound
  20. return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  21. }
  22. }
Add Comment
Please, Sign In to add comment