Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ass.ullriher.hw8.server.impl;
- import ass.ullriher.hw8.server.*;
- import io.reactivex.Observable;
- import io.reactivex.subjects.PublishSubject;
- import java.io.IOException;
- import java.net.SocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- public class Server implements IServer {
- /**
- * Address that server will bind to
- */
- protected SocketAddress address;
- protected boolean exit = false;
- protected PublishSubject<IRequest> requests = PublishSubject.create();
- protected ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
- protected String delimiter;
- public Server(SocketAddress address) {
- this.address = address;
- }
- private void acceptChannel(ServerSocketChannel channel, Selector sel) throws IOException {
- channel.accept();
- channel.configureBlocking(false);
- channel.register(sel, SelectionKey.OP_READ);
- }
- private void writeAssignedChannel(SelectionKey key, Selector sel) throws IOException {
- if (key.attachment() == null) {
- key.attach(new StringBuilder());
- }
- SocketChannel channel = (SocketChannel) key.channel();
- StringBuilder builder = (StringBuilder) key.attachment();
- String input = ByteBufferUtil.readFromChannel(inputBuffer, channel);
- builder.append(input);
- if (input.contains(delimiter)) {
- String data = builder.toString();
- data = data.split("\n")[0];
- requests.onNext(new ClientRequest(channel, data));
- }
- }
- public void run(IRequestHandler handler, IResponseWriter writer, String delimiter) throws IOException {
- this.delimiter = delimiter;
- ServerSocketChannel serverChannel = ServerSocketChannel.open();
- serverChannel.bind(address);
- serverChannel.configureBlocking(false);
- System.out.println("Server started on address:" + address);
- Selector sel = Selector.open();
- serverChannel.register(sel, serverChannel.validOps());
- requests.compose(handler)
- .subscribe(writer);
- while (!exit) {
- sel.select();
- Iterator<SelectionKey> iterator = sel.selectedKeys().iterator();
- Observable<SelectionKey> validKeys = Observable.<SelectionKey>create((emittor) -> {
- while (iterator.hasNext()) {
- SelectionKey next = iterator.next();
- emittor.onNext(next);
- iterator.remove();
- }
- emittor.onComplete();
- })
- .filter(key -> key.isValid())
- .cache();
- validKeys.filter(SelectionKey::isAcceptable)
- .map(SelectionKey::channel)
- .cast(ServerSocketChannel.class)
- .forEach(channel -> acceptChannel(channel, sel));
- validKeys.filter(SelectionKey::isReadable)
- .map(SelectionKey::channel)
- .forEach(key -> writeAssignedChannel(key, sel));
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement