Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- {"identifier": "xxx", "value": 10.0, "ts":"2019-01-16T10:51:26.326242+0000"}
- import org.apache.beam.sdk.io.kafka.KafkaIO;
- pipeline.apply(KafkaIO.<Long, String>read()
- .withBootstrapServers("kafka:9092")
- .withTopic("test")
- .withKeyDeserializer(LongDeserializer.class)
- .withValueDeserializer(StringDeserializer.class)
- .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
- .updateConsumerProperties(ImmutableMap.of("group.id", "Consumer1"))
- .commitOffsetsInFinalize()
- .withoutMetadata()))
- PCollection<Temperature> tempCollection = p.apply(new SetupKafka())
- .apply(ParDo.of(new ReadFromTopic()))
- .apply("ParseTemperature", ParDo.of(new ParseTemperature()));
- tempCollection.apply("AssignTimeStamps", WithTimestamps.of(us -> new Instant(us.getTimestamp())));
- PCollection<Output> output = tempCollection.apply(Window
- .<Temperature>into(FixedWindows.of(Duration.standardSeconds(30)))
- .triggering(AfterWatermark.pastEndOfWindow()
- .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
- .withAllowedLateness(Duration.standardDays(1))
- .accumulatingFiredPanes())
- .apply(new ComputeMax());
Add Comment
Please, Sign In to add comment