Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // imports and license left out for clarity
- public class OptimizedStreams {
- public static void main(String[] args) {
- // Details left out for clarity
- final StreamsBuilder builder = new StreamsBuilder();
- final KStream<String, String> inputStream = builder.stream("inputTopic");
- final KStream<String, String> changedKeyStream = inputStream.selectKey((k, v) -> v.substring(0,5));
- // first repartition
- changedKeyStream.groupByKey(Grouped.as("count-repartition"))
- .count(Materialized.as("count-store"))
- .toStream().to("count-topic");
- // second repartition
- changedKeyStream.groupByKey(Grouped.as("windowed-repartition"))
- .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
- .count(Materialized.as("windowed-count-store"))
- .toStream().to("windowed-count");
- final Topology topology = builder.build(properties);
- final KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);
- kafkaStreams.start();
- }
- }
Add Comment
Please, Sign In to add comment