Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Key::: abc
- Value::: {"customerId":"abc", "rollNo": 7, "subject": "English", "mark": 98}
- Key::: abc
- Value::: {"customerId":"abc", "rollNo": "registerNo", "subject": "sub"}
- Key::: abc
- Value::: {"customerId":"abc", "registerNo": 7, "sub": "English", "mark": 98}
- @Component
- public class JoinMsg {
- msgLookup lookup;
- public JoinMsg(msgLookup lookup) {
- this.lookup = lookup;
- }
- interface dataProcessor {
- @Input("inputStream")
- KStream<?, ?> input1();
- @Input("globalTable")
- GlobalKTable<byte[], Object> input2();
- @Output("OutputStream")
- KStream<?, ?> output3();
- }
- @EnableBinding(dataProcessor.class)
- public static class msgLookup {
- @StreamListener
- @SendTo("OutputStream")
- public KStream<String, String> handle(@Input("inputStream") KStream<String, String> input, @Input("globalTable")GlobalKTable<String, String> table) {
- input.peek((key, value) -> System.out.println("Incoming Stream -- key: " + key + " --value: " + value));
- KStream<String, String> outStream = input.leftJoin(table, (k, v) -> k , (bTag, tTag) -> new tagLookup(tTag == null ? "customer" : tTag, bTag))
- .map((tag, taglookup) -> new KeyValue<>(taglookup.gettTag(), taglookup.getbTag()))
- .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
- .reduce((a,b)-> a+b)
- .toStream();
- outStream.peek((key, value) -> System.out.println("Outgoing Stream from 2nd part -- key: " + key + " --value: " + value));
- return outStream;
- }
- private class tagLookup {
- private final String tTag;
- private final String bTag;
- public tagLookup(String tTag, String bTag) {
- if (tTag == null || tTag.isEmpty()) {
- throw new IllegalArgumentException("tTag must be set");
- }
- if (bTag == null || bTag.isEmpty()) {
- throw new IllegalArgumentException("bTag must be set");
- }
- this.tTag = tTag;
- this.bTag = bTag;
- }
- public String gettTag() {
- return tTag;
- }
- public String getbTag() {
- return bTag;
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement