Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ass.sivakfil.hw8.server.impl;
- import java.io.IOException;
- import java.io.UncheckedIOException;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- import java.util.Map;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ConcurrentLinkedQueue;
- import ass.sivakfil.hw8.server.IResponse;
- import ass.sivakfil.hw8.server.IResponseWriter;
- public class ResponseWriterTemplate implements IResponseWriter, Runnable {
- // elements are inserted from other thread
- protected ConcurrentLinkedQueue<SocketChannel> queue = new ConcurrentLinkedQueue<>();
- protected Map<SocketChannel, ByteBuffer> bufferMap = new ConcurrentHashMap<>();
- protected final Selector selector;
- public ResponseWriterTemplate() {
- try {
- // prepare selector
- selector = Selector.open();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- // Start single thread that writes responses in non-blocking way using selector
- // TODO
- }
- @Override
- public void run() {
- // there could be better stopping condition
- // e.g.: IResponseWriter could inherit from subscriber, than this thread could be stopped on onComplete message
- System.out.println("Started writing thread .. ");
- while(true) {
- // register channels that were added to queue
- // TODO
- // call blocking selector.select()
- Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
- while(iterator.hasNext()) {
- SelectionKey selKey = iterator.next();
- SocketChannel client = (SocketChannel)selKey.channel();
- // TODO check that key is readable and valid and stuff
- // get buffer from client
- ByteBuffer bufferToWrite = bufferMap.get(client);
- // write what you can; bufferToWrite remembers position, no need to remember what was written
- // TODO
- // use bufferToWrite.hasRemaining to check, whether whole buffer was written
- // TODO
- // cleanup bufferMap of channels that were cancelled or closed by client
- // TODO
- iterator.remove();
- }
- }
- }
- public void registerChannels() throws IOException {
- while(!queue.isEmpty()) {
- SocketChannel toBeRegistered = queue.poll();
- // register channel for write operation
- // TODO
- }
- }
- @Override
- public void accept(IResponse t) throws Exception {
- System.out.println("Registering response for writing ..");
- ByteBuffer dataToWrite = ByteBuffer.wrap(t.getResponseData().getBytes());
- SocketChannel socketToWriteTo = t.getClient();
- bufferMap.put(socketToWriteTo, dataToWrite);
- queue.add(socketToWriteTo);
- // this wakes up blocking selector.select() so that loop can register sockets added to queue
- selector.wakeup();
- }
- }
Add Comment
Please, Sign In to add comment