Advertisement
Alex_D73

Untitled

Oct 26th, 2020
494
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 1.14 KB | None | 0 0
  1. public class RHandler {
  2.  
  3.     private static final String TOPIC = "main";
  4.     private final KafkaSender kafkaSender;
  5.     private final KafkaReceiver<Integer, String> kafkaReceiver;
  6.     private final ConnectableFlux<String> publish;
  7.  
  8.     public RHandler(KafkaSender kafkaSender, KafkaReceiver<Integer, String> kafkaReceiver) {
  9.         this.kafkaSender = kafkaSender;
  10.         this.kafkaReceiver = kafkaReceiver;
  11.         publish = kafkaReceiver.receive().map(e -> e.value()).publish();
  12.         this.publish.connect();
  13.         this.publish.subscribe(s -> System.out.println("Kafka responce:" + s));
  14.     }
  15.  
  16.     public Mono<ServerResponse> test(ServerRequest request) {
  17.  
  18.         Flux<String> fl = this.publish.refCount();
  19.  
  20.         return kafkaSender.<Integer>send(request.bodyToFlux(String.class)
  21.                 .map(s -> SenderRecord.create(new ProducerRecord<>(TOPIC, 1, s), s)))
  22.                 .map(e -> ((SenderResult) e).correlationMetadata())
  23.                 .map(s -> {return fl.filter(strPair -> s.equals(strPair)); })
  24.                 .flatMap(r -> r)
  25.                 .flatMap(r -> ServerResponse.ok())
  26.                 .single();
  27.     }
  28. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement