Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class RHandler {
- private static final String TOPIC = "main";
- private final KafkaSender kafkaSender;
- private final KafkaReceiver<Integer, String> kafkaReceiver;
- private final ConnectableFlux<String> publish;
- public RHandler(KafkaSender kafkaSender, KafkaReceiver<Integer, String> kafkaReceiver) {
- this.kafkaSender = kafkaSender;
- this.kafkaReceiver = kafkaReceiver;
- publish = kafkaReceiver.receive().map(e -> e.value()).publish();
- this.publish.connect();
- this.publish.subscribe(s -> System.out.println("Kafka responce:" + s));
- }
- public Mono<ServerResponse> test(ServerRequest request) {
- Flux<String> fl = this.publish.refCount();
- return kafkaSender.<Integer>send(request.bodyToFlux(String.class)
- .map(s -> SenderRecord.create(new ProducerRecord<>(TOPIC, 1, s), s)))
- .map(e -> ((SenderResult) e).correlationMetadata())
- .map(s -> {return fl.filter(strPair -> s.equals(strPair)); })
- .flatMap(r -> r)
- .flatMap(r -> ServerResponse.ok())
- .single();
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement