Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public Mono<Void> connect(@NonNull String url, FrameParser<I, O> handler) {
- return Mono.defer(() -> {
- // Subscribe each inbound GatewayPayload to the receiver sink
- Flux<I> inboundSub = handler.inbound()
- .doOnError(t -> log.debug("Inbound encountered an error", t))
- .doOnCancel(() -> log.debug("Inbound cancelled"))
- .doOnComplete(() -> log.debug("Inbound completed"))
- .doOnNext(receiverSink::next);
- // Subscribe the receiver to process and transform the inbound payloads into Dispatch events
- Flux<I> receiverSub = receiver.log(log.getName()).doOnError(t -> log.error("Exception receiving websocket data", t));
- // Subscribe the handler's outbound exchange with our outgoing signals
- // routing error and completion signals to close the gateway
- Flux<O> senderSub = sender.log(log.getName())
- .doOnError(t -> handler.close())
- .doOnComplete(handler::close);
- Mono<Void> ws = HttpClient.create()
- .observe((connection, newState) -> log.debug("{} {}", newState, connection))
- .wiretap(true)
- .websocket()
- .uri(url)
- .handle(handler::handle)
- .doOnError(t -> log.error("Exception handling websocket data", t))
- .doOnTerminate(() -> {
- log.debug("Terminating websocket client, disposing subscriptions");
- })
- .then();
- return Mono.when(inboundSub, receiverSub, senderSub, ws);
- });
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement