Advertisement
tlycken

Java NIO problem

Jan 10th, 2012
311
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 6.11 KB | None | 0 0
  1. package server.network.nio;
  2.  
  3. import java.io.IOException;
  4. import java.net.InetAddress;
  5. import java.net.InetSocketAddress;
  6. import java.net.NetworkInterface;
  7. import java.net.ServerSocket;
  8. import java.net.SocketException;
  9. import java.net.UnknownHostException;
  10. import java.nio.ByteBuffer;
  11. import java.nio.channels.ClosedChannelException;
  12. import java.nio.channels.SelectionKey;
  13. import java.nio.channels.Selector;
  14. import java.nio.channels.ServerSocketChannel;
  15. import java.nio.channels.SocketChannel;
  16. import java.util.ArrayList;
  17. import java.util.Enumeration;
  18. import java.util.Iterator;
  19. import java.util.Set;
  20.  
  21. import server.utils.Log;
  22.  
  23. public class Server implements Runnable {
  24.  
  25.     private ArrayList<ClientHandle> clients;
  26.     private Selector selector;
  27.     private Worker worker;
  28.  
  29.     public Server(Worker worker) {
  30.         this.clients = new ArrayList<ClientHandle>();
  31.         this.worker = new Worker();
  32.         try {
  33.             this.start();
  34.         } catch (IOException e) {
  35.             Log.e("An error occurred when trying to start the server.", e,
  36.                     this.getClass());
  37.         }
  38.     }
  39.  
  40.     public void start() throws IOException {
  41.         Log.i("Server starting...", this.getClass());
  42.  
  43.         // Instantiate a new selector
  44.         selector = Selector.open();
  45.  
  46.         // Create server socket and channel
  47.         ServerSocketChannel ssc = ServerSocketChannel.open();
  48.         ssc.configureBlocking(false);
  49.         ServerSocket ss = ssc.socket();
  50.         InetSocketAddress address = new InetSocketAddress(InetAddress
  51.                 .getLocalHost().getHostAddress(), 1337);
  52.         ss.bind(address);
  53.         Log.i("Opening port " + address.getPort() + " on address "
  54.                 + address.getAddress().toString().split("/")[1],
  55.                 this.getClass());
  56.  
  57.         // Register channel with selector
  58.         ssc.register(selector, SelectionKey.OP_ACCEPT);
  59.         Log.i("Start sequence complete.", this.getClass());
  60.     }
  61.  
  62.     @Override
  63.     public void run() {
  64.         Log.i("Server running.", this.getClass());
  65.         while (true) {
  66.             // Wait for new things to happen
  67.             try {
  68.                 selector.select();
  69.             } catch (IOException e) {
  70.                 Log.e("Could not select in server thread.", e, this.getClass());
  71.             }
  72.  
  73.             Set<SelectionKey> selectedKeys = selector.selectedKeys();
  74.             Iterator<SelectionKey> it = selectedKeys.iterator();
  75.  
  76.             while (it.hasNext()) {
  77.                 SelectionKey key = it.next();
  78.  
  79.                 if (!key.isValid()) {
  80.                     Log.w("Invalid key selected: " + key.toString(), this.getClass());
  81.                     continue;
  82.                 }
  83.  
  84.                 if (key.isAcceptable()) {
  85.                     ClientHandle c = this.accept(key);
  86.                     it.remove();
  87.                     if (c != null) {
  88.                         clients.add(c);
  89.                         Log.i("Client connected: " + c.id(), this.getClass());
  90.                     }
  91.                 } else if (key.isReadable()) {
  92.                     this.read(key);
  93.                     it.remove();
  94.                 }
  95.             }
  96.         }
  97.     }
  98.  
  99.     protected ClientHandle accept(SelectionKey key) {
  100.         ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  101.         SocketChannel sc;
  102.         try {
  103.             sc = ssc.accept();
  104.             sc.configureBlocking(false);
  105.         } catch (IOException e) {
  106.             Log.e("Could not accept socket connection.", e, this.getClass());
  107.             return null;
  108.         }
  109.  
  110.         // Register the socket with the selector
  111.         SelectionKey k;
  112.         try {
  113.             k = sc.register(selector, SelectionKey.OP_READ);
  114.         } catch (ClosedChannelException e) {
  115.             Log.e(e, this.getClass());
  116.             return null;
  117.         }
  118.  
  119.         // Add the client to the list of connected clients
  120.         return new ClientHandle(sc, k);
  121.     }
  122.  
  123.     private void read(SelectionKey key) {
  124.         SocketChannel sc = (SocketChannel) key.channel();
  125.  
  126.         ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  127.  
  128.         int r;
  129.         try {
  130.             r = sc.read(readBuffer);
  131.         } catch (IOException e) {
  132.             // Remote end closed connection forcibly
  133.             closeConnection(key);
  134.             return;
  135.         }
  136.         if (r == -1) {
  137.             // Remote side closed connection cleanly
  138.             closeConnection(key);
  139.             return;
  140.         }
  141.         worker.processMessage(readBuffer.array(), r);
  142.     }
  143.  
  144.     private void closeConnection(SelectionKey key) {
  145.         SocketChannel sc = (SocketChannel) key.channel();
  146.         try {
  147.             sc.close();
  148.         } catch (IOException e) {
  149.             Log.e("Error occurred when closing socket channel.", e,
  150.                     this.getClass());
  151.         }
  152.         key.cancel();
  153.     }
  154.  
  155.     public void stop() {
  156.         Log.i("Server stopping...", this.getClass());
  157.  
  158.         Log.i("Server stopped.", this.getClass());
  159.     }
  160.  
  161.     public static String getLocalIpAddress() {
  162.         InetAddress localaddr = null;
  163.         try {
  164.             localaddr = InetAddress.getLocalHost();
  165.         } catch (UnknownHostException e) {
  166.             Log.e("Could not get local IP.", e, Server.class);
  167.         }
  168.         return localaddr.toString().split("/")[1];
  169.     }
  170.  
  171.     public static String getLocalExternalIpAddress() {
  172.         try {
  173.             for (Enumeration<NetworkInterface> en = NetworkInterface
  174.                     .getNetworkInterfaces(); en.hasMoreElements();) {
  175.                 NetworkInterface ni = en.nextElement();
  176.                 for (Enumeration<InetAddress> enumIpAddr = ni
  177.                         .getInetAddresses(); enumIpAddr.hasMoreElements();) {
  178.                     InetAddress inetAddress = enumIpAddr.nextElement();
  179.                     if (!inetAddress.isLoopbackAddress()) { // ignore 127.0.0.1
  180.                         return inetAddress.getHostAddress().toString();
  181.                     }
  182.                 }
  183.             }
  184.         } catch (SocketException ex) {
  185.         }
  186.         return null;
  187.     }
  188. }
  189.  
  190.  
  191. package server.network.nio;
  192.  
  193. import java.util.LinkedList;
  194. import java.util.Queue;
  195.  
  196. import server.utils.Log;
  197.  
  198. public class Worker implements Runnable {
  199.  
  200.     private Queue<String> queue = new LinkedList<String>();
  201.  
  202.     public void processMessage(byte[] data, int count) {
  203.         byte[] copiedData = new byte[count];
  204.         System.arraycopy(data, 0, copiedData, 0, count);
  205.         synchronized (queue) {
  206.             Log.d("Queue, in processMessage: " + System.identityHashCode(queue), this.getClass());
  207.             queue.add(new String(copiedData));
  208.             queue.notifyAll();
  209.         }
  210.     }
  211.  
  212.     @Override
  213.     public void run() {
  214.         Log.d("Worker running.", this.getClass());
  215.         while (true) {
  216.             Log.d("Worker starting new loop...", this.getClass());
  217.             String msg;
  218.  
  219.             // Wait for new data to become available
  220.             synchronized (queue) {
  221.                 Log.d("Queue, in run: " + System.identityHashCode(queue), this.getClass());
  222.                 while (queue.isEmpty()) {
  223.                     try {
  224.                         queue.wait();
  225.                     } catch (InterruptedException e) {
  226.                         // Swallow and wait some more...
  227.                     }
  228.                 }
  229.                 msg = queue.poll();
  230.             }
  231.  
  232.             Log.i("Processed message from client: " + msg, this.getClass());
  233.         }
  234.     }
  235. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement