Advertisement
Guest User

Untitled

a guest
Jun 24th, 2019
57
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.83 KB | None | 0 0
  1. public static class DefaultToMinTimestampPlusOneHour implements DefaultValueFactory<Long> {
  2. @Override
  3. public Long create(PipelineOptions options) {
  4. return options.as(Options.class).getMinTimestampMillis()
  5. + Duration.standardHours(1).getMillis();
  6. }
  7. }
  8.  
  9. ...
  10.  
  11. static void runWindowedWordCount(Options options) throws IOException {
  12. final String output = options.getOutput();
  13. final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
  14. final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());
  15.  
  16. Pipeline pipeline = Pipeline.create(options);
  17. Pipeline p = Pipeline.create(options);
  18.  
  19. pipeline
  20. .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)))
  21. .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
  22. .triggering(AfterWatermark.pastEndOfWindow()
  23. .withEarlyFirings(
  24. AfterProcessingTime.pastFirstElementInPane()
  25. .plusDelayOf(Duration.standardMinutes(1)))
  26. .withLateFirings(
  27. AfterProcessingTime.pastFirstElementInPane()
  28. .plusDelayOf(Duration.standardMinutes(2))))
  29. .withAllowedLateness(Duration.standardMinutes(1))
  30. .accumulatingFiredPanes())
  31. .apply(new WordCount.CountWords())
  32. .apply(MapElements.via(new WordCount.FormatAsTextFn()))
  33. .apply(new WriteOneFilePerWindow(output, options.getNumShards()));
  34.  
  35. public static void main(String[] args) throws IOException {
  36. Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  37.  
  38. runWindowedWordCount(options);
  39. }
  40. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement