Advertisement
Guest User

Untitled

a guest
Jun 5th, 2019
165
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 10.41 KB | None | 0 0
  1. /**
  2.  * @author Dalton (Palidino)
  3.  */
  4. package com.palidino.nio;
  5.  
  6. import java.io.ByteArrayOutputStream;
  7. import java.io.IOException;
  8. import java.net.InetSocketAddress;
  9. import java.nio.ByteBuffer;
  10. import java.nio.channels.SelectionKey;
  11. import java.nio.channels.Selector;
  12. import java.nio.channels.ServerSocketChannel;
  13. import java.nio.channels.SocketChannel;
  14. import java.util.ArrayList;
  15. import java.util.HashMap;
  16. import java.util.Iterator;
  17. import java.util.List;
  18. import java.util.Map;
  19.  
  20. public class NioServer implements Runnable {
  21.     private List<Session> sessions = new ArrayList<>();
  22.     private Map<String, Integer> connectionCounts = new HashMap<>();
  23.     private InetSocketAddress hostAddress;
  24.     private ServerSocketChannel serverSocketChannel;
  25.     private Selector selector;
  26.     private SessionHandler sessionHandler;
  27.     private boolean running;
  28.  
  29.     private ByteBuffer readBuffer;
  30.     private byte[] readBytes;
  31.  
  32.     private int sessionIdleTimeout;
  33.     private int maxConnectionsPerIPAddress;
  34.     private int socketBufferSize = 16384;
  35.  
  36.     public NioServer() throws IOException {
  37.         selector = Selector.open();
  38.         serverSocketChannel = ServerSocketChannel.open();
  39.         serverSocketChannel.configureBlocking(false);
  40.     }
  41.  
  42.     public void start(String remoteAddress, int port) throws IOException {
  43.         if (hostAddress != null) {
  44.             throw new IllegalStateException("Server already started");
  45.         }
  46.         if (sessionHandler == null) {
  47.             throw new IllegalStateException("SsessionHandler can't be null");
  48.         }
  49.         readBuffer = ByteBuffer.allocateDirect(socketBufferSize);
  50.         readBytes = new byte[socketBufferSize];
  51.         hostAddress = new InetSocketAddress(remoteAddress, port);
  52.         serverSocketChannel.socket().setReceiveBufferSize(socketBufferSize);
  53.         serverSocketChannel.socket().bind(hostAddress);
  54.         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  55.         System.out.println("Starting server on " + remoteAddress + ":" + port);
  56.         new Thread(this, "NioServer").start();
  57.     }
  58.  
  59.     public void stop() {
  60.         try {
  61.             if (serverSocketChannel != null) {
  62.                 serverSocketChannel.close();
  63.                 serverSocketChannel = null;
  64.             }
  65.         } catch (IOException ioe) {
  66.             ioe.printStackTrace();
  67.         }
  68.     }
  69.  
  70.     public void setSessionHandler(SessionHandler sessionHandler) {
  71.         this.sessionHandler = sessionHandler;
  72.     }
  73.  
  74.     public void setSessionIdleTimeout(int seconds) {
  75.         if (hostAddress != null) {
  76.             throw new IllegalStateException("Server already started");
  77.         }
  78.         if (seconds <= 0) {
  79.             throw new IllegalArgumentException("seconds must be greater than 0");
  80.         }
  81.         sessionIdleTimeout = seconds * 1000;
  82.     }
  83.  
  84.     public void setMaxConnectionsPerIPAddress(int maxConnectionsPerIPAddress) {
  85.         if (hostAddress != null) {
  86.             throw new IllegalStateException("Server already started");
  87.         }
  88.         if (maxConnectionsPerIPAddress <= 0) {
  89.             throw new IllegalArgumentException("maxConnectionsPerIPAddress must be greater than 0");
  90.         }
  91.         this.maxConnectionsPerIPAddress = maxConnectionsPerIPAddress;
  92.     }
  93.  
  94.     public void setSocketBufferSize(int socketBufferSize) throws IOException {
  95.         if (hostAddress != null) {
  96.             throw new IllegalStateException("Server already started");
  97.         }
  98.         if (socketBufferSize <= 0) {
  99.             throw new IllegalArgumentException("size must be greater than 0");
  100.         }
  101.         this.socketBufferSize = socketBufferSize;
  102.     }
  103.  
  104.     @Override
  105.     public void run() {
  106.         if (running) {
  107.             throw new IllegalStateException("Server is already running");
  108.         }
  109.         running = true;
  110.         while (serverSocketChannel.isOpen()) {
  111.             cycle();
  112.         }
  113.         running = false;
  114.     }
  115.  
  116.     private void cycle() {
  117.         try {
  118.             selector.select();
  119.             for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext();) {
  120.                 SelectionKey selectionKey = it.next();
  121.                 it.remove();
  122.                 Session session = null;
  123.                 try {
  124.                     if (serverSocketChannel == null || !serverSocketChannel.isOpen()) {
  125.                         break;
  126.                     }
  127.                     session = (Session) selectionKey.attachment();
  128.                     if (selectionKey.isValid() && selectionKey.isAcceptable()) {
  129.                         session = accept(selectionKey);
  130.                     }
  131.                     if (session == null) {
  132.                         continue;
  133.                     }
  134.                     if (selectionKey.isValid() && selectionKey.isReadable()) {
  135.                         read(selectionKey);
  136.                     }
  137.                     if (selectionKey.isValid() && selectionKey.isWritable()) {
  138.                         write(selectionKey);
  139.                     }
  140.                 } catch (Exception e2) {
  141.                     error(e2, session);
  142.                 }
  143.             }
  144.             checkSessions();
  145.         } catch (Exception e) {
  146.             e.printStackTrace();
  147.         }
  148.     }
  149.  
  150.     private Session accept(SelectionKey selectionKey) throws IOException {
  151.         Session session = null;
  152.         ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
  153.         SocketChannel socketChannel = serverSocketChannel.accept();
  154.         socketChannel.socket().setSendBufferSize(socketBufferSize);
  155.         socketChannel.configureBlocking(false);
  156.         String remoteAddress = socketChannel.socket().getInetAddress().getHostAddress();
  157.         int connectionCount = getConnectionCount(remoteAddress);
  158.         if (maxConnectionsPerIPAddress > 0 && connectionCount >= maxConnectionsPerIPAddress) {
  159.             socketChannel.close();
  160.         } else {
  161.             connectionCounts.put(remoteAddress, connectionCount + 1);
  162.             session = new Session(socketChannel, remoteAddress, socketChannel.register(selector, SelectionKey.OP_READ));
  163.             sessionHandler.accept(session);
  164.             sessions.add(session);
  165.         }
  166.         return session;
  167.     }
  168.  
  169.     private void read(SelectionKey selectionKey) throws IOException {
  170.         SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
  171.         if (!socketChannel.isOpen()) {
  172.             return;
  173.         }
  174.         Session session = (Session) selectionKey.attachment();
  175.         readBuffer.clear();
  176.         int numberBytesRead;
  177.         ByteArrayOutputStream readStream = new ByteArrayOutputStream();
  178.         while ((numberBytesRead = socketChannel.read(readBuffer)) > 0) {
  179.             readBuffer.flip();
  180.             readBuffer.get(readBytes, 0, numberBytesRead);
  181.             readStream.write(readBytes, 0, numberBytesRead);
  182.             readBuffer.clear();
  183.             session.updateLastRead();
  184.         }
  185.         if (readStream.size() > 0) {
  186.             sessionHandler.read(session, readStream.toByteArray());
  187.         }
  188.         if (numberBytesRead == -1) {
  189.             session.close();
  190.         }
  191.     }
  192.  
  193.     private void write(SelectionKey selectionKey) throws IOException {
  194.         SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
  195.         if (!socketChannel.isOpen()) {
  196.             return;
  197.         }
  198.         Session session = (Session) selectionKey.attachment();
  199.         if (session.getWriteEvents().isEmpty()) {
  200.             return;
  201.         }
  202.         try {
  203.             while (!session.getWriteEvents().isEmpty()) {
  204.                 WriteEvent writeEvent = session.getWriteEvents().peek();
  205.                 socketChannel.write(writeEvent.getBuffer());
  206.                 if (writeEvent.getBuffer().remaining() > 0) {
  207.                     break;
  208.                 }
  209.                 if (writeEvent.getHandler() != null) {
  210.                     writeEvent.getHandler().complete(session, true);
  211.                 }
  212.                 session.getWriteEvents().poll();
  213.             }
  214.         } catch (Exception e) {
  215.             error(e, session);
  216.         }
  217.         if (selectionKey.isValid() && session.getWriteEvents().isEmpty()) {
  218.             selectionKey.interestOps(SelectionKey.OP_READ);
  219.         }
  220.     }
  221.  
  222.     private void error(Exception exception, Session session) throws IOException {
  223.         try {
  224.             sessionHandler.error(exception, session);
  225.         } catch (Exception e) {
  226.             if (session != null) {
  227.                 session.close();
  228.             }
  229.             e.printStackTrace();
  230.         }
  231.     }
  232.  
  233.     private void checkSessions() {
  234.         if (sessions.isEmpty()) {
  235.             return;
  236.         }
  237.         for (Iterator<Session> it = sessions.iterator(); it.hasNext();) {
  238.             Session session = it.next();
  239.             SelectionKey selectionKey = session.getSelectionKey();
  240.             if (selectionKey.isValid() && !session.getWriteEvents().isEmpty()) {
  241.                 selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
  242.             }
  243.             if (session.idleTimeout(sessionIdleTimeout)) {
  244.                 session.close();
  245.             }
  246.             if (session.isOpen()) {
  247.                 continue;
  248.             }
  249.             String remoteAddress = session.getRemoteAddress();
  250.             int connectionCount = getConnectionCount(remoteAddress);
  251.             if (connectionCount > 1) {
  252.                 connectionCounts.put(remoteAddress, connectionCount - 1);
  253.             } else {
  254.                 connectionCounts.remove(remoteAddress);
  255.             }
  256.             if (sessionHandler != null) {
  257.                 sessionHandler.closed(session);
  258.             }
  259.             if (selectionKey.isValid()) {
  260.                 selectionKey.cancel();
  261.             }
  262.             while (!session.getWriteEvents().isEmpty()) {
  263.                 WriteEvent writeEvent = session.getWriteEvents().poll();
  264.                 if (writeEvent.getHandler() != null) {
  265.                     writeEvent.getHandler().complete(session, false);
  266.                 }
  267.             }
  268.             it.remove();
  269.         }
  270.     }
  271.  
  272.     private int getConnectionCount(String remoteAddress) {
  273.         return connectionCounts.containsKey(remoteAddress) ? connectionCounts.get(remoteAddress) : 0;
  274.     }
  275.  
  276.     public void printStats() {
  277.         System.out
  278.                 .println("NIOServer: sessions: " + sessions.size() + "; connectionCounts: " + connectionCounts.size());
  279.     }
  280. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement