Advertisement
tterrag1098

Untitled

Nov 1st, 2019
138
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.72 KB | None | 0 0
  1. public Mono<Void> connect(@NonNull String url, FrameParser<I, O> handler) {
  2. return Mono.defer(() -> {
  3. // Subscribe each inbound GatewayPayload to the receiver sink
  4. Flux<I> inboundSub = handler.inbound()
  5. .doOnError(t -> log.debug("Inbound encountered an error", t))
  6. .doOnCancel(() -> log.debug("Inbound cancelled"))
  7. .doOnComplete(() -> log.debug("Inbound completed"))
  8. .doOnNext(receiverSink::next);
  9.  
  10. // Subscribe the receiver to process and transform the inbound payloads into Dispatch events
  11. Flux<I> receiverSub = receiver.log(log.getName()).doOnError(t -> log.error("Exception receiving websocket data", t));
  12.  
  13. // Subscribe the handler's outbound exchange with our outgoing signals
  14. // routing error and completion signals to close the gateway
  15. Flux<O> senderSub = sender.log(log.getName())
  16. .doOnError(t -> handler.close())
  17. .doOnComplete(handler::close);
  18.  
  19. Mono<Void> ws = HttpClient.create()
  20. .observe((connection, newState) -> log.debug("{} {}", newState, connection))
  21. .wiretap(true)
  22. .websocket()
  23. .uri(url)
  24. .handle(handler::handle)
  25. .doOnError(t -> log.error("Exception handling websocket data", t))
  26. .doOnTerminate(() -> {
  27. log.debug("Terminating websocket client, disposing subscriptions");
  28. })
  29. .then();
  30.  
  31. return Mono.when(inboundSub, receiverSub, senderSub, ws);
  32. });
  33. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement