Advertisement
Heruberuto

Untitled

May 23rd, 2017
59
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.20 KB | None | 0 0
  1. package ass.ullriher.hw8.server.impl;
  2.  
  3. import ass.ullriher.hw8.server.*;
  4. import io.reactivex.Observable;
  5. import io.reactivex.subjects.PublishSubject;
  6.  
  7. import java.io.IOException;
  8. import java.net.SocketAddress;
  9. import java.nio.ByteBuffer;
  10. import java.nio.channels.SelectionKey;
  11. import java.nio.channels.Selector;
  12. import java.nio.channels.ServerSocketChannel;
  13. import java.nio.channels.SocketChannel;
  14. import java.util.Iterator;
  15.  
  16. public class Server implements IServer {
  17.  
  18.     /**
  19.      * Address that server will bind to
  20.      */
  21.     protected SocketAddress address;
  22.     protected boolean exit = false;
  23.     protected PublishSubject<IRequest> requests = PublishSubject.create();
  24.     protected ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
  25.     protected String delimiter;
  26.  
  27.     public Server(SocketAddress address) {
  28.         this.address = address;
  29.     }
  30.  
  31.     private void acceptChannel(ServerSocketChannel channel, Selector sel) throws IOException {
  32.         channel.accept();
  33.         channel.configureBlocking(false);
  34.         channel.register(sel, SelectionKey.OP_READ);
  35.     }
  36.  
  37.     private void writeAssignedChannel(SelectionKey key) throws IOException {
  38.         if (key.attachment() == null) {
  39.             key.attach(new StringBuilder());
  40.         }
  41.  
  42.         SocketChannel channel = (SocketChannel) key.channel();
  43.         StringBuilder builder = (StringBuilder) key.attachment();
  44.         String input = ByteBufferUtil.readFromChannel(inputBuffer, channel);
  45.         builder.append(input);
  46.  
  47.         if (input.contains(delimiter)) {
  48.             String data = builder.toString();
  49.             data = data.substring(0, data.indexOf(delimiter));
  50.             requests.onNext(new ClientRequest(channel, data));
  51.         }
  52.     }
  53.  
  54.     public void run(IRequestHandler handler, IResponseWriter writer, String delimiter) throws IOException {
  55.         this.delimiter = delimiter;
  56.  
  57.         ServerSocketChannel serverChannel = ServerSocketChannel.open();
  58.         serverChannel.bind(address);
  59.         serverChannel.configureBlocking(false);
  60.  
  61.         System.out.println("Server started on address:" + address);
  62.  
  63.         Selector sel = Selector.open();
  64.         serverChannel.register(sel, serverChannel.validOps());
  65.  
  66.         requests.compose(handler)
  67.                 .subscribe(writer);
  68.  
  69.         while (!exit) {
  70.             sel.select();
  71.             Iterator<SelectionKey> iterator = sel.selectedKeys().iterator();
  72.             Observable<SelectionKey> validKeys = Observable.<SelectionKey>create((emittor) -> {
  73.                 while (iterator.hasNext()) {
  74.                     SelectionKey next = iterator.next();
  75.                     emittor.onNext(next);
  76.                     iterator.remove();
  77.                 }
  78.                 emittor.onComplete();
  79.             })
  80.                     .filter(SelectionKey::isValid)
  81.                     .cache();
  82.  
  83.  
  84.             validKeys.filter(SelectionKey::isAcceptable)
  85.                     .map(SelectionKey::channel)
  86.                     .cast(ServerSocketChannel.class)
  87.                     .forEach(channel -> acceptChannel(channel, sel));
  88.  
  89.             validKeys.filter(SelectionKey::isReadable)
  90.                     .forEach(this::writeAssignedChannel);
  91.         }
  92.     }
  93.  
  94. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement