Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public static class DefaultToMinTimestampPlusOneHour implements DefaultValueFactory<Long> {
- @Override
- public Long create(PipelineOptions options) {
- return options.as(Options.class).getMinTimestampMillis()
- + Duration.standardHours(1).getMillis();
- }
- }
- ...
- static void runWindowedWordCount(Options options) throws IOException {
- final String output = options.getOutput();
- final Instant minTimestamp = new Instant(options.getMinTimestampMillis());
- final Instant maxTimestamp = new Instant(options.getMaxTimestampMillis());
- Pipeline pipeline = Pipeline.create(options);
- Pipeline p = Pipeline.create(options);
- pipeline
- .apply(ParDo.of(new AddTimestampFn(minTimestamp, maxTimestamp)))
- .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
- .triggering(AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(1)))
- .withLateFirings(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardMinutes(2))))
- .withAllowedLateness(Duration.standardMinutes(1))
- .accumulatingFiredPanes())
- .apply(new WordCount.CountWords())
- .apply(MapElements.via(new WordCount.FormatAsTextFn()))
- .apply(new WriteOneFilePerWindow(output, options.getNumShards()));
- public static void main(String[] args) throws IOException {
- Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
- runWindowedWordCount(options);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement