Advertisement
Alex_D73

Kafka TransformerSupplier

Sep 9th, 2020
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.08 KB | None | 0 0
  1. // Обогатитель
  2. public class EnrichmentProcessor implements Transformer<Integer, StreamRecord, KeyValue<Integer, EnrichRecord>> {
  3.  
  4.     private HashMap<Integer, DictRecord> dicts;
  5.  
  6.     private String redisAddress;
  7.     private Integer redisPort;
  8.     private RedisBackedCache cache;
  9.  
  10.     public EnrichmentProcessor(String redisAddress, Integer redisPort) {
  11.         this.redisAddress = redisAddress;
  12.         this.redisPort = redisPort;
  13.     }
  14.  
  15.     @Override
  16.     public void init(ProcessorContext context) {
  17.         Jedis jedis = new Jedis(redisAddress, redisPort);
  18.         cache = new RedisBackedCache(jedis, "test");
  19.     }
  20.  
  21.     @Override
  22.     public KeyValue<Integer, EnrichRecord> transform(Integer key, StreamRecord streamRecord) {
  23.         // непосредственно обогощение
  24.         return new KeyValue<>(key, enrichRecord);
  25.     }
  26.  
  27.     @Override
  28.     public void close() {}
  29. }
  30.  
  31. // передача параметров коннекта ко внешним источникам
  32. public class EnrichTransformerSupplier implements TransformerSupplier<Integer, StreamRecord, KeyValue<Integer, EnrichRecord>> {
  33.  
  34.     private final String redisAddress;
  35.     private final Integer redisPort;
  36.  
  37.     public EnrichTransformerSupplier(String redisAddress, Integer redisPort) {
  38.         this.redisAddress = redisAddress;
  39.         this.redisPort = redisPort;
  40.     }
  41.  
  42.     @Override
  43.     public Transformer<Integer, StreamRecord, KeyValue<Integer, EnrichRecord>> get() {
  44.         return new EnrichmentProcessor(redisAddress, redisPort);
  45.     }
  46. }
  47.  
  48. // Пример использования
  49. final StreamsBuilder builder = new StreamsBuilder();
  50.  
  51. KStream<Integer, StreamRecord> streams = builder.stream(IN_TOPIC, Consumed.with(Serdes.Integer(), Serdes.serdeFrom(new StreamRecordSerializer(), new StreamRecordDeserializer())));
  52.  
  53.  streams.transform(new EnrichTransformerSupplier(redisAddress, redisPort))
  54.         .to(ENRICH_TOPIC, Produced.with(Serdes.Integer(), Serdes.serdeFrom(new EnrichRecordSerializer(), new EnrichRecordDeserializer())));
  55. Topology t = builder.build();
  56.  
  57.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement