Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.example;
- import java.util.Properties;
- import org.apache.kafka.clients.admin.NewTopic;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.common.serialization.Serdes.StringSerde;
- import org.apache.kafka.streams.KafkaStreams;
- import org.apache.kafka.streams.KeyValue;
- import org.apache.kafka.streams.StreamsConfig;
- import org.apache.kafka.streams.kstream.KStreamBuilder;
- import org.apache.kafka.streams.kstream.KTable;
- import org.springframework.boot.ApplicationRunner;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.annotation.Bean;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.core.KafkaTemplate;
- @SpringBootApplication
- public class So51407542Application {
- private static final String SUFFIX = "c";
- private static final String TOPIC_A1 = "topicA1" + SUFFIX;
- private static final String TOPIC_A2 = "topicA2" + SUFFIX;
- private static final String TOPIC_B1 = "topicB1" + SUFFIX;
- private static final String TOPIC_B2 = "topicB2" + SUFFIX;
- private static final String TOPIC_FINAL = "topicFinal." + SUFFIX;
- public static void main(String[] args) {
- SpringApplication.run(So51407542Application.class, args).close();
- }
- @Bean
- public ApplicationRunner runner(KafkaTemplate<String, String> template) {
- return args -> {
- template.send(TOPIC_A1, 0, "foo", "bar");
- template.send(TOPIC_B1, 0, "foo", "baz");
- KStreamBuilder kstreamBuilder = new KStreamBuilder();
- StringSerde serde = new StringSerde();
- KTable<String, String> kTableA = kstreamBuilder.table(serde, serde, TOPIC_A2);
- kstreamBuilder.stream(serde, serde, TOPIC_A1)
- .map((k, v) -> {
- System.out.println(k + ":" + v);
- return new KeyValue<>(k, v);
- })
- .to(serde, serde, TOPIC_A2);
- kstreamBuilder.stream(serde, serde, TOPIC_B1)
- .map((k, v) -> {
- System.out.println(k + ":" + v);
- return new KeyValue<>(k, v);
- })
- .to(serde, serde, TOPIC_B2);
- KTable<String, String> kTableB = kstreamBuilder.table(serde, serde, TOPIC_B2);
- KTable<String, String> resultTable = kTableA.leftJoin(kTableB, (a, b) -> {
- System.out.println("a:" + a);
- System.out.println("b:" + b);
- return a + b;
- });
- resultTable.to(serde, serde, TOPIC_FINAL);
- Properties configs = new Properties();
- configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "So51407542Application");
- KafkaStreams ks = new KafkaStreams(kstreamBuilder, configs);
- ks.start();
- Thread.sleep(10_000);
- };
- }
- @KafkaListener(id = "foo", topics = TOPIC_FINAL)
- public void in(ConsumerRecord<?, ?> in) {
- System.out.println(in);
- }
- @Bean
- public NewTopic topicA1() {
- return new NewTopic(TOPIC_A1, 1, (short) 1);
- }
- @Bean
- public NewTopic topicA2() {
- return new NewTopic(TOPIC_A2, 1, (short) 1);
- }
- @Bean
- public NewTopic topicB1() {
- return new NewTopic(TOPIC_B1, 1, (short) 1);
- }
- @Bean
- public NewTopic topicB2() {
- return new NewTopic(TOPIC_B2, 1, (short) 1);
- }
- @Bean
- public NewTopic topicFinal() {
- return new NewTopic(TOPIC_FINAL, 1, (short) 1);
- }
- }
Add Comment
Please, Sign In to add comment