Guest User

Untitled

a guest
Jun 19th, 2018
89
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.38 KB | None | 0 0
  1. package com.dexels.kafka.streams.remotejoin.ranged;
  2.  
  3. import java.util.ArrayList;
  4. import java.util.List;
  5. import java.util.Optional;
  6. import java.util.function.BiFunction;
  7.  
  8. import org.apache.kafka.streams.KeyValue;
  9. import org.apache.kafka.streams.kstream.Predicate;
  10. import org.apache.kafka.streams.processor.AbstractProcessor;
  11. import org.apache.kafka.streams.processor.ProcessorContext;
  12. import org.apache.kafka.streams.state.KeyValueIterator;
  13. import org.apache.kafka.streams.state.KeyValueStore;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16.  
  17. import com.dexels.kafka.streams.base.StreamOperators;
  18. import com.dexels.kafka.streams.remotejoin.PreJoinProcessor;
  19. import com.dexels.replication.api.ReplicationMessage;
  20. import com.dexels.replication.api.ReplicationMessage.Operation;
  21.  
  22. public class OneToManyGroupedProcessor extends AbstractProcessor<String, ReplicationMessage> {
  23. private final static Logger logger = LoggerFactory.getLogger(OneToManyGroupedProcessor.class);
  24.  
  25. private String storeName;
  26. private String groupedStoreName;
  27.  
  28. private boolean optional;
  29.  
  30. private KeyValueStore<String, ReplicationMessage> groupedLookupStore;
  31. private KeyValueStore<String, ReplicationMessage> lookupStore;
  32. private BiFunction<ReplicationMessage, List<ReplicationMessage>, ReplicationMessage> joinFunction;
  33.  
  34. private Predicate<String, ReplicationMessage> filterPredicate;
  35.  
  36. public OneToManyGroupedProcessor(String storeName, String groupedStoreName, boolean optional,
  37. Optional<Predicate<String, ReplicationMessage>> filterPredicate,
  38. BiFunction<ReplicationMessage, List<ReplicationMessage>, ReplicationMessage> joinFunction) {
  39. this.storeName = storeName;
  40. this.groupedStoreName = groupedStoreName;
  41. this.optional = optional;
  42. this.joinFunction = joinFunction;
  43. this.filterPredicate = filterPredicate.orElse((k, v) -> true);
  44. }
  45.  
  46. @SuppressWarnings("unchecked")
  47. @Override
  48. public void init(ProcessorContext context) {
  49. this.lookupStore = (KeyValueStore<String, ReplicationMessage>) context.getStateStore(storeName);
  50. this.groupedLookupStore = (KeyValueStore<String, ReplicationMessage>) context.getStateStore(groupedStoreName);
  51. super.init(context);
  52. }
  53.  
  54. @Override
  55. public void process(String key, ReplicationMessage msg) {
  56. boolean reverse = false;
  57. if (key.endsWith(PreJoinProcessor.REVERSE_IDENTIFIER)) {
  58. reverse = true;
  59. key = key.substring(0, key.length() - PreJoinProcessor.REVERSE_IDENTIFIER.length());
  60. }
  61.  
  62. if (reverse) {
  63. reverseJoin(key, msg);
  64. } else {
  65. if (msg == null) {
  66. logger.debug("O2M Emitting null message with key: {}", key);
  67. context().forward(key, null);
  68. return;
  69. }
  70. forwardJoin(key, msg);
  71. }
  72.  
  73. }
  74.  
  75. private void forwardJoin(String key, ReplicationMessage msg) {
  76.  
  77. try {
  78. if (!filterPredicate.test(key, msg)) {
  79. // filter says no
  80. forwardMessage(key, msg.withOperation(Operation.DELETE));
  81. return;
  82. }
  83. } catch (Throwable t) {
  84. logger.error("Error on checking filter predicate: {}", t);
  85. }
  86.  
  87. List<ReplicationMessage> msgs = new ArrayList<>();
  88. try (KeyValueIterator<String, ReplicationMessage> it = groupedLookupStore.range(key + "|", key + "}")) {
  89. while (it.hasNext()) {
  90. KeyValue<String, ReplicationMessage> keyValue = it.next();
  91. msgs.add(keyValue.value);
  92. }
  93. }
  94.  
  95.  
  96. ReplicationMessage joined = msg;
  97. if (msgs.size() > 0) {
  98. joined = joinFunction.apply(msg, msgs);
  99. }
  100. if (optional || msgs.size() > 0) {
  101. forwardMessage(key, joined);
  102. } else {
  103. // We are not optional, and have not joined with any messages. Forward a delete
  104. forwardMessage(key, joined.withOperation(Operation.DELETE));
  105. }
  106.  
  107. }
  108.  
  109. private void reverseJoin(String key, ReplicationMessage msg) {
  110.  
  111. String actualKey = StreamOperators.ungroupKeyReverse(key);
  112. ReplicationMessage one = lookupStore.get(actualKey);
  113. if (one == null) {
  114. // We are doing a reverse join, but the original message isn't there.
  115. // Nothing to do for us here
  116. return;
  117. }
  118. // OneToMany, thus we need to find all the other messages that
  119. // we also have to join with us. Effectively the same as a
  120. // forward join.
  121. forwardJoin(actualKey, one);
  122. }
  123.  
  124. private void forwardMessage(String key, ReplicationMessage innerMessage) {
  125. context().forward(key, innerMessage);
  126. // flush downstream stores with null:
  127. if (innerMessage.operation() == Operation.DELETE) {
  128. logger.debug("Delete forwarded, appending null forward with key: {}", key);
  129. context().forward(key, null);
  130. }
  131. }
Add Comment
Please, Sign In to add comment