Guest User

Untitled

a guest
Dec 6th, 2018
105
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.09 KB | None | 0 0
  1. // imports and license left out for clarity
  2. public class OptimizedStreams {
  3.  
  4. public static void main(String[] args) {
  5. // Details left out for clarity
  6. final StreamsBuilder builder = new StreamsBuilder();
  7. final KStream<String, String> inputStream = builder.stream("inputTopic");
  8.  
  9. final KStream<String, String> changedKeyStream = inputStream.selectKey((k, v) -> v.substring(0,5));
  10.  
  11. // first repartition
  12. changedKeyStream.groupByKey(Grouped.as("count-repartition"))
  13. .count(Materialized.as("count-store"))
  14. .toStream().to("count-topic");
  15.  
  16. // second repartition
  17. changedKeyStream.groupByKey(Grouped.as("windowed-repartition"))
  18. .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
  19. .count(Materialized.as("windowed-count-store"))
  20. .toStream().to("windowed-count");
  21.  
  22. final Topology topology = builder.build(properties);
  23. final KafkaStreams kafkaStreams = new KafkaStreams(topology, properties);
  24. kafkaStreams.start();
  25. }
  26. }
Add Comment
Please, Sign In to add comment