Advertisement
Heruberuto

Untitled

May 23rd, 2017
81
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.63 KB | None | 0 0
  1. package ass.ullriher.hw8.server.impl;
  2.  
  3. import ass.sivakfil.hw8.server.*;
  4. import ass.ullriher.hw8.server.*;
  5. import io.reactivex.Observable;
  6. import io.reactivex.subjects.PublishSubject;
  7.  
  8. import java.io.IOException;
  9. import java.net.SocketAddress;
  10. import java.nio.ByteBuffer;
  11. import java.nio.channels.*;
  12. import java.util.Iterator;
  13.  
  14. public class Server implements IServer {
  15.  
  16.     /**
  17.      * Address that server will bind to
  18.      */
  19.     protected SocketAddress address;
  20.     protected String delimiter;
  21.     protected boolean exit = false;
  22.     protected Observable<IRequest> requests = PublishSubject.create();
  23.     protected ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
  24.     private StringBuilder stringBuilder = new StringBuilder();
  25.  
  26.     public Server(SocketAddress address) {
  27.         this.address = address;
  28.     }
  29.  
  30.     private void acceptChannel(ServerSocketChannel channel, Selector sel) throws IOException {
  31.         channel.accept();
  32.         channel.configureBlocking(false);
  33.         channel.register(sel, SelectionKey.OP_READ);
  34.     }
  35.  
  36.  
  37.     private void writeChannel(SocketChannel channel, Selector sel) throws IOException {
  38.         String data = ByteBufferUtil.readFromChannelBuffered(channel);
  39.         stringBuilder.append(data);
  40.         if(data.endsWith(delimiter)){
  41.  
  42.         }
  43.  
  44.         // 1) (SocketChannel)key.channel() on readable key
  45.         // 2) read from channel (you can use ByteBufferUtil)
  46.         // 3) append data to some kind of buffer (ByteBuffer, ByteArrayOutputStream, StringBuilder, ...)
  47.         //      this might have tremendous impact on performance, but beware of premature optimization
  48.         //      note that speed is not primary objective of this homework
  49.         // 4) construct request object when whole message is stored in buffer (message is terminated by terminator string)
  50.         // 5) emit request object into observable
  51.         //  (hint: using PublishSubject is convenient)
  52.         // TODO
  53.         channel.accept();
  54.         channel.configureBlocking(false);
  55.         channel.register(sel, SelectionKey.OP_READ);
  56.     }
  57.  
  58.     public void run(IRequestHandler handler, IResponseWriter writer, String delimiter) throws IOException {
  59.         this.delimiter = delimiter;
  60.  
  61.         ServerSocketChannel serverChannel = ServerSocketChannel.open();
  62.         serverChannel.bind(address);
  63.         serverChannel.configureBlocking(false);
  64.  
  65.         System.out.println("Server started on address:" + address);
  66.  
  67.         Selector sel = Selector.open();
  68.         serverChannel.register(sel, serverChannel.validOps());
  69.  
  70.         requests.compose(handler)
  71.                 .subscribe(writer);
  72.  
  73.         while (!exit) {
  74.             sel.select();
  75.             Iterator<SelectionKey> iterator = sel.selectedKeys().iterator();
  76.             Observable<SelectionKey> validKeys = Observable.<SelectionKey>create((emittor) -> {
  77.                 while (iterator.hasNext()) {
  78.                     SelectionKey next = iterator.next();
  79.                     emittor.onNext(next);
  80.                     iterator.remove();
  81.                 }
  82.                 emittor.onComplete();
  83.             })
  84.                     .filter(key -> key.isValid())
  85.                     .cache();
  86.  
  87.  
  88.             validKeys.filter(SelectionKey::isAcceptable)
  89.                     .map(SelectionKey::channel)
  90.                     .cast(ServerSocketChannel.class)
  91.                     .forEach(channel -> acceptChannel(channel,sel));
  92.  
  93.             validKeys.filter(SelectionKey::isReadable)
  94.                     .map(SelectionKey::channel)
  95.                     .cast(SocketChannel.class)
  96.                     .forEach(channel -> writeChannel(channel,sel));
  97.         }
  98.     }
  99.  
  100. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement