Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // Обогатитель
- public class EnrichmentProcessor implements Transformer<Integer, StreamRecord, KeyValue<Integer, EnrichRecord>> {
- private HashMap<Integer, DictRecord> dicts;
- private String redisAddress;
- private Integer redisPort;
- private RedisBackedCache cache;
- public EnrichmentProcessor(String redisAddress, Integer redisPort) {
- this.redisAddress = redisAddress;
- this.redisPort = redisPort;
- }
- @Override
- public void init(ProcessorContext context) {
- Jedis jedis = new Jedis(redisAddress, redisPort);
- cache = new RedisBackedCache(jedis, "test");
- }
- @Override
- public KeyValue<Integer, EnrichRecord> transform(Integer key, StreamRecord streamRecord) {
- // непосредственно обогощение
- return new KeyValue<>(key, enrichRecord);
- }
- @Override
- public void close() {}
- }
- // передача параметров коннекта ко внешним источникам
- public class EnrichTransformerSupplier implements TransformerSupplier<Integer, StreamRecord, KeyValue<Integer, EnrichRecord>> {
- private final String redisAddress;
- private final Integer redisPort;
- public EnrichTransformerSupplier(String redisAddress, Integer redisPort) {
- this.redisAddress = redisAddress;
- this.redisPort = redisPort;
- }
- @Override
- public Transformer<Integer, StreamRecord, KeyValue<Integer, EnrichRecord>> get() {
- return new EnrichmentProcessor(redisAddress, redisPort);
- }
- }
- // Пример использования
- final StreamsBuilder builder = new StreamsBuilder();
- KStream<Integer, StreamRecord> streams = builder.stream(IN_TOPIC, Consumed.with(Serdes.Integer(), Serdes.serdeFrom(new StreamRecordSerializer(), new StreamRecordDeserializer())));
- streams.transform(new EnrichTransformerSupplier(redisAddress, redisPort))
- .to(ENRICH_TOPIC, Produced.with(Serdes.Integer(), Serdes.serdeFrom(new EnrichRecordSerializer(), new EnrichRecordDeserializer())));
- Topology t = builder.build();
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement