Guest User

Untitled

a guest
Jul 18th, 2018
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.18 KB | None | 0 0
  1. package com.example;
  2.  
  3. import java.util.Properties;
  4.  
  5. import org.apache.kafka.clients.admin.NewTopic;
  6. import org.apache.kafka.clients.consumer.ConsumerRecord;
  7. import org.apache.kafka.common.serialization.Serdes.StringSerde;
  8. import org.apache.kafka.streams.KafkaStreams;
  9. import org.apache.kafka.streams.KeyValue;
  10. import org.apache.kafka.streams.StreamsConfig;
  11. import org.apache.kafka.streams.kstream.KStreamBuilder;
  12. import org.apache.kafka.streams.kstream.KTable;
  13.  
  14. import org.springframework.boot.ApplicationRunner;
  15. import org.springframework.boot.SpringApplication;
  16. import org.springframework.boot.autoconfigure.SpringBootApplication;
  17. import org.springframework.context.annotation.Bean;
  18. import org.springframework.kafka.annotation.KafkaListener;
  19. import org.springframework.kafka.core.KafkaTemplate;
  20.  
  21. @SpringBootApplication
  22. public class So51407542Application {
  23.  
  24. private static final String SUFFIX = "c";
  25.  
  26. private static final String TOPIC_A1 = "topicA1" + SUFFIX;
  27.  
  28. private static final String TOPIC_A2 = "topicA2" + SUFFIX;
  29.  
  30. private static final String TOPIC_B1 = "topicB1" + SUFFIX;
  31.  
  32. private static final String TOPIC_B2 = "topicB2" + SUFFIX;
  33.  
  34. private static final String TOPIC_FINAL = "topicFinal." + SUFFIX;
  35.  
  36. public static void main(String[] args) {
  37. SpringApplication.run(So51407542Application.class, args).close();
  38. }
  39.  
  40. @Bean
  41. public ApplicationRunner runner(KafkaTemplate<String, String> template) {
  42. return args -> {
  43. template.send(TOPIC_A1, 0, "foo", "bar");
  44. template.send(TOPIC_B1, 0, "foo", "baz");
  45. KStreamBuilder kstreamBuilder = new KStreamBuilder();
  46. StringSerde serde = new StringSerde();
  47.  
  48. KTable<String, String> kTableA = kstreamBuilder.table(serde, serde, TOPIC_A2);
  49.  
  50. kstreamBuilder.stream(serde, serde, TOPIC_A1)
  51. .map((k, v) -> {
  52. System.out.println(k + ":" + v);
  53. return new KeyValue<>(k, v);
  54. })
  55. .to(serde, serde, TOPIC_A2);
  56.  
  57. kstreamBuilder.stream(serde, serde, TOPIC_B1)
  58. .map((k, v) -> {
  59. System.out.println(k + ":" + v);
  60. return new KeyValue<>(k, v);
  61. })
  62. .to(serde, serde, TOPIC_B2);
  63.  
  64. KTable<String, String> kTableB = kstreamBuilder.table(serde, serde, TOPIC_B2);
  65.  
  66. KTable<String, String> resultTable = kTableA.leftJoin(kTableB, (a, b) -> {
  67. System.out.println("a:" + a);
  68. System.out.println("b:" + b);
  69. return a + b;
  70. });
  71.  
  72. resultTable.to(serde, serde, TOPIC_FINAL);
  73.  
  74. Properties configs = new Properties();
  75. configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  76. configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "So51407542Application");
  77. KafkaStreams ks = new KafkaStreams(kstreamBuilder, configs);
  78. ks.start();
  79. Thread.sleep(10_000);
  80. };
  81. }
  82.  
  83. @KafkaListener(id = "foo", topics = TOPIC_FINAL)
  84. public void in(ConsumerRecord<?, ?> in) {
  85. System.out.println(in);
  86. }
  87.  
  88. @Bean
  89. public NewTopic topicA1() {
  90. return new NewTopic(TOPIC_A1, 1, (short) 1);
  91. }
  92.  
  93. @Bean
  94. public NewTopic topicA2() {
  95. return new NewTopic(TOPIC_A2, 1, (short) 1);
  96. }
  97.  
  98. @Bean
  99. public NewTopic topicB1() {
  100. return new NewTopic(TOPIC_B1, 1, (short) 1);
  101. }
  102.  
  103. @Bean
  104. public NewTopic topicB2() {
  105. return new NewTopic(TOPIC_B2, 1, (short) 1);
  106. }
  107.  
  108. @Bean
  109. public NewTopic topicFinal() {
  110. return new NewTopic(TOPIC_FINAL, 1, (short) 1);
  111. }
  112.  
  113. }
Add Comment
Please, Sign In to add comment