Advertisement
Guest User

Untitled

a guest
Aug 20th, 2019
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.35 KB | None | 0 0
  1. Key::: abc
  2. Value::: {"customerId":"abc", "rollNo": 7, "subject": "English", "mark": 98}
  3.  
  4. Key::: abc
  5. Value::: {"customerId":"abc", "rollNo": "registerNo", "subject": "sub"}
  6.  
  7. Key::: abc
  8. Value::: {"customerId":"abc", "registerNo": 7, "sub": "English", "mark": 98}
  9.  
  10. @Component
  11.  
  12. public class JoinMsg {
  13.  
  14. msgLookup lookup;
  15.  
  16. public JoinMsg(msgLookup lookup) {
  17. this.lookup = lookup;
  18. }
  19.  
  20. interface dataProcessor {
  21.  
  22. @Input("inputStream")
  23. KStream<?, ?> input1();
  24.  
  25. @Input("globalTable")
  26. GlobalKTable<byte[], Object> input2();
  27.  
  28. @Output("OutputStream")
  29. KStream<?, ?> output3();
  30. }
  31.  
  32. @EnableBinding(dataProcessor.class)
  33. public static class msgLookup {
  34. @StreamListener
  35. @SendTo("OutputStream")
  36. public KStream<String, String> handle(@Input("inputStream") KStream<String, String> input, @Input("globalTable")GlobalKTable<String, String> table) {
  37.  
  38. input.peek((key, value) -> System.out.println("Incoming Stream -- key: " + key + " --value: " + value));
  39.  
  40. KStream<String, String> outStream = input.leftJoin(table, (k, v) -> k , (bTag, tTag) -> new tagLookup(tTag == null ? "customer" : tTag, bTag))
  41. .map((tag, taglookup) -> new KeyValue<>(taglookup.gettTag(), taglookup.getbTag()))
  42. .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
  43. .reduce((a,b)-> a+b)
  44. .toStream();
  45.  
  46. outStream.peek((key, value) -> System.out.println("Outgoing Stream from 2nd part -- key: " + key + " --value: " + value));
  47.  
  48. return outStream;
  49. }
  50.  
  51. private class tagLookup {
  52.  
  53. private final String tTag;
  54. private final String bTag;
  55.  
  56. public tagLookup(String tTag, String bTag) {
  57. if (tTag == null || tTag.isEmpty()) {
  58. throw new IllegalArgumentException("tTag must be set");
  59. }
  60. if (bTag == null || bTag.isEmpty()) {
  61. throw new IllegalArgumentException("bTag must be set");
  62. }
  63. this.tTag = tTag;
  64. this.bTag = bTag;
  65. }
  66. public String gettTag() {
  67. return tTag;
  68. }
  69.  
  70. public String getbTag() {
  71. return bTag;
  72. }
  73. }
  74. }
  75. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement