Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ass.ullriher.hw8.server.impl;
- import ass.sivakfil.hw8.server.*;
- import ass.ullriher.hw8.server.*;
- import io.reactivex.Observable;
- import io.reactivex.subjects.PublishSubject;
- import java.io.IOException;
- import java.net.SocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.*;
- import java.util.Iterator;
- public class Server implements IServer {
- /**
- * Address that server will bind to
- */
- protected SocketAddress address;
- protected String delimiter;
- protected boolean exit = false;
- protected Observable<IRequest> requests = PublishSubject.create();
- protected ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
- private StringBuilder stringBuilder = new StringBuilder();
- public Server(SocketAddress address) {
- this.address = address;
- }
- private void acceptChannel(ServerSocketChannel channel, Selector sel) throws IOException {
- channel.accept();
- channel.configureBlocking(false);
- channel.register(sel, SelectionKey.OP_READ);
- }
- private void writeChannel(SocketChannel channel, Selector sel) throws IOException {
- String data = ByteBufferUtil.readFromChannelBuffered(channel);
- stringBuilder.append(data);
- if(data.endsWith(delimiter)){
- }
- // 1) (SocketChannel)key.channel() on readable key
- // 2) read from channel (you can use ByteBufferUtil)
- // 3) append data to some kind of buffer (ByteBuffer, ByteArrayOutputStream, StringBuilder, ...)
- // this might have tremendous impact on performance, but beware of premature optimization
- // note that speed is not primary objective of this homework
- // 4) construct request object when whole message is stored in buffer (message is terminated by terminator string)
- // 5) emit request object into observable
- // (hint: using PublishSubject is convenient)
- // TODO
- channel.accept();
- channel.configureBlocking(false);
- channel.register(sel, SelectionKey.OP_READ);
- }
- public void run(IRequestHandler handler, IResponseWriter writer, String delimiter) throws IOException {
- this.delimiter = delimiter;
- ServerSocketChannel serverChannel = ServerSocketChannel.open();
- serverChannel.bind(address);
- serverChannel.configureBlocking(false);
- System.out.println("Server started on address:" + address);
- Selector sel = Selector.open();
- serverChannel.register(sel, serverChannel.validOps());
- requests.compose(handler)
- .subscribe(writer);
- while (!exit) {
- sel.select();
- Iterator<SelectionKey> iterator = sel.selectedKeys().iterator();
- Observable<SelectionKey> validKeys = Observable.<SelectionKey>create((emittor) -> {
- while (iterator.hasNext()) {
- SelectionKey next = iterator.next();
- emittor.onNext(next);
- iterator.remove();
- }
- emittor.onComplete();
- })
- .filter(key -> key.isValid())
- .cache();
- validKeys.filter(SelectionKey::isAcceptable)
- .map(SelectionKey::channel)
- .cast(ServerSocketChannel.class)
- .forEach(channel -> acceptChannel(channel,sel));
- validKeys.filter(SelectionKey::isReadable)
- .map(SelectionKey::channel)
- .cast(SocketChannel.class)
- .forEach(channel -> writeChannel(channel,sel));
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement