Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.coingi.service.websockets.nettywebsocket;
- import com.coingi.service.websocket.listener.WebSocketListener;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.http.HttpClientCodec;
- import io.netty.handler.codec.http.HttpHeaders;
- import io.netty.handler.codec.http.HttpObjectAggregator;
- import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
- import io.netty.handler.codec.http.websocketx.WebSocketVersion;
- import io.netty.handler.ssl.SslContext;
- import io.netty.handler.ssl.SslContextBuilder;
- import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
- import org.apache.logging.log4j.LogManager;
- import org.apache.logging.log4j.Logger;
- import org.springframework.context.annotation.Scope;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
- import java.net.InetAddress;
- import java.net.InetSocketAddress;
- import java.net.SocketAddress;
- import java.net.URI;
- import java.util.Timer;
- import java.util.TimerTask;
- import java.util.concurrent.TimeUnit;
- @Scope("prototype")
- @Component
- class NettyClient {
- private static Logger logger = LogManager.getLogger();
- private Bootstrap bootstrap;
- private Channel channel;
- private Timer timer;
- private SocketAddress socketAddress;
- private URI uri;
- public void connect(String uriString, WebSocketListener listener) throws IOException {
- bootstrap = new Bootstrap();
- timer = new Timer();
- uri = URI.create(uriString);
- InetAddress address = InetAddress.getByName(uri.getHost());
- // save the IP address and port of the host (to bypass possible DNS issues should they occur)
- socketAddress = new InetSocketAddress(address.getHostAddress(), uri.getPort() != -1 ? uri.getPort() : 443);
- final SslContext sslCtx = SslContextBuilder.forClient()
- .trustManager(InsecureTrustManagerFactory.INSTANCE)
- .build();
- bootstrap.group(new NioEventLoopGroup());
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
- bootstrap.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- NettyWebSocketClientEndpoint handler = new NettyWebSocketClientEndpoint(
- WebSocketClientHandshakerFactory.newHandshaker(uri,
- WebSocketVersion.V13, null, false, HttpHeaders.EMPTY_HEADERS, 1280000));
- handler.adapt(listener);
- ch.pipeline().addLast(sslCtx.newHandler(ch.alloc(), uri.getHost(), uri.getPort() != -1 ? uri.getPort() : 443));
- ch.pipeline().addLast("http-codec", new HttpClientCodec());
- ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
- ch.pipeline().addLast("ws-handler", handler);
- }
- });
- scheduleConnect(10, listener);
- }
- public void close() {
- try {
- channel.close().sync();
- } catch (InterruptedException e) {
- logger.warn("Interrupted while closing the channel: ", e);
- }
- }
- private void doConnect(WebSocketListener listener) {
- try {
- ChannelFuture f = bootstrap.connect(socketAddress);
- f.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {//if is not successful, reconnect
- logger.error("Cannot connect to {}: ", uri.getHost(), future.cause());
- future.channel().close();
- try {
- Thread.sleep(TimeUnit.SECONDS.toMillis(1));
- } catch (InterruptedException e) {
- logger.error("Interrupted: ", e);
- return;
- }
- bootstrap.connect(socketAddress).addListener(this);
- } else {//good, the connection is ok
- channel = future.channel();
- //add a listener to detect the connection lost
- addCloseDetectListener(channel, listener);
- connectionEstablished();
- }
- }
- private void addCloseDetectListener(Channel channel, WebSocketListener listener) {
- //if the channel connection is lost, the ChannelFutureListener.operationComplete() will be called
- channel.closeFuture().addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- connectionLost(listener);
- scheduleConnect(5, listener);
- }
- });
- }
- });
- } catch (Exception e) {
- logger.error("Reconnecting in 1 second due to error: ", e);
- scheduleConnect(1000, listener);
- }
- }
- private void scheduleConnect(long millis, WebSocketListener listener) {
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- doConnect(listener);
- }
- }, millis);
- }
- public void connectionLost(WebSocketListener listener) {
- logger.info("connectionLost()");
- listener.onFailure(new NettyWebSocketListenerContext());
- }
- public void connectionEstablished() {
- logger.info("Connection established");
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement