Advertisement
Guest User

Untitled

a guest
Oct 17th, 2018
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 5.91 KB | None | 0 0
  1. package com.coingi.service.websockets.nettywebsocket;
  2.  
  3. import com.coingi.service.websocket.listener.WebSocketListener;
  4. import io.netty.bootstrap.Bootstrap;
  5. import io.netty.channel.*;
  6. import io.netty.channel.nio.NioEventLoopGroup;
  7. import io.netty.channel.socket.SocketChannel;
  8. import io.netty.channel.socket.nio.NioSocketChannel;
  9. import io.netty.handler.codec.http.HttpClientCodec;
  10. import io.netty.handler.codec.http.HttpHeaders;
  11. import io.netty.handler.codec.http.HttpObjectAggregator;
  12. import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
  13. import io.netty.handler.codec.http.websocketx.WebSocketVersion;
  14. import io.netty.handler.ssl.SslContext;
  15. import io.netty.handler.ssl.SslContextBuilder;
  16. import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
  17. import org.apache.logging.log4j.LogManager;
  18. import org.apache.logging.log4j.Logger;
  19. import org.springframework.context.annotation.Scope;
  20. import org.springframework.stereotype.Component;
  21.  
  22. import java.io.IOException;
  23. import java.net.InetAddress;
  24. import java.net.InetSocketAddress;
  25. import java.net.SocketAddress;
  26. import java.net.URI;
  27. import java.util.Timer;
  28. import java.util.TimerTask;
  29. import java.util.concurrent.TimeUnit;
  30.  
  31. @Scope("prototype")
  32. @Component
  33. class NettyClient {
  34.     private static Logger logger = LogManager.getLogger();
  35.     private Bootstrap bootstrap;
  36.     private Channel channel;
  37.     private Timer timer;
  38.     private SocketAddress socketAddress;
  39.     private URI uri;
  40.  
  41.     public void connect(String uriString, WebSocketListener listener) throws IOException {
  42.         bootstrap = new Bootstrap();
  43.         timer = new Timer();
  44.         uri = URI.create(uriString);
  45.         InetAddress address = InetAddress.getByName(uri.getHost());
  46.         // save the IP address and port of the host (to bypass possible DNS issues should they occur)
  47.         socketAddress = new InetSocketAddress(address.getHostAddress(), uri.getPort() != -1 ? uri.getPort() : 443);
  48.  
  49.         final SslContext sslCtx = SslContextBuilder.forClient()
  50.                 .trustManager(InsecureTrustManagerFactory.INSTANCE)
  51.                 .build();
  52.  
  53.         bootstrap.group(new NioEventLoopGroup());
  54.         bootstrap.channel(NioSocketChannel.class);
  55.         bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
  56.         bootstrap.handler(new ChannelInitializer<SocketChannel>() {
  57.             @Override
  58.             public void initChannel(SocketChannel ch) throws Exception {
  59.                 NettyWebSocketClientEndpoint handler = new NettyWebSocketClientEndpoint(
  60.                         WebSocketClientHandshakerFactory.newHandshaker(uri,
  61.                                 WebSocketVersion.V13, null, false, HttpHeaders.EMPTY_HEADERS, 1280000));
  62.                 handler.adapt(listener);
  63.  
  64.                 ch.pipeline().addLast(sslCtx.newHandler(ch.alloc(), uri.getHost(), uri.getPort() != -1 ? uri.getPort() : 443));
  65.                 ch.pipeline().addLast("http-codec", new HttpClientCodec());
  66.                 ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
  67.                 ch.pipeline().addLast("ws-handler", handler);
  68.             }
  69.         });
  70.  
  71.         scheduleConnect(10, listener);
  72.     }
  73.  
  74.     public void close() {
  75.         try {
  76.             channel.close().sync();
  77.         } catch (InterruptedException e) {
  78.             logger.warn("Interrupted while closing the channel: ", e);
  79.         }
  80.     }
  81.  
  82.     private void doConnect(WebSocketListener listener) {
  83.         try {
  84.             ChannelFuture f = bootstrap.connect(socketAddress);
  85.             f.addListener(new ChannelFutureListener() {
  86.                 @Override
  87.                 public void operationComplete(ChannelFuture future) throws Exception {
  88.                     if (!future.isSuccess()) {//if is not successful, reconnect
  89.                         logger.error("Cannot connect to {}: ", uri.getHost(), future.cause());
  90.                         future.channel().close();
  91.                         try {
  92.                             Thread.sleep(TimeUnit.SECONDS.toMillis(1));
  93.                         } catch (InterruptedException e) {
  94.                             logger.error("Interrupted: ", e);
  95.                             return;
  96.                         }
  97.  
  98.                         bootstrap.connect(socketAddress).addListener(this);
  99.                     } else {//good, the connection is ok
  100.                         channel = future.channel();
  101.                         //add a listener to detect the connection lost
  102.                         addCloseDetectListener(channel, listener);
  103.                         connectionEstablished();
  104.                     }
  105.                 }
  106.  
  107.                 private void addCloseDetectListener(Channel channel, WebSocketListener listener) {
  108.                     //if the channel connection is lost, the ChannelFutureListener.operationComplete() will be called
  109.                     channel.closeFuture().addListener(new ChannelFutureListener() {
  110.                         @Override
  111.                         public void operationComplete(ChannelFuture future) throws Exception {
  112.                             connectionLost(listener);
  113.                             scheduleConnect(5, listener);
  114.                         }
  115.                     });
  116.                 }
  117.             });
  118.         } catch (Exception e) {
  119.             logger.error("Reconnecting in 1 second due to error: ", e);
  120.             scheduleConnect(1000, listener);
  121.         }
  122.     }
  123.  
  124.     private void scheduleConnect(long millis, WebSocketListener listener) {
  125.         timer.schedule(new TimerTask() {
  126.             @Override
  127.             public void run() {
  128.                 doConnect(listener);
  129.             }
  130.         }, millis);
  131.     }
  132.  
  133.     public void connectionLost(WebSocketListener listener) {
  134.         logger.info("connectionLost()");
  135.         listener.onFailure(new NettyWebSocketListenerContext());
  136.     }
  137.  
  138.     public void connectionEstablished() {
  139.         logger.info("Connection established");
  140.     }
  141. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement