Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package ru.geekbrains.netty.selector03.server.entities.jobpool;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.locks.ReentrantLock;
- public class BaseJobPool {
- private ReentrantLock watchdogLock = new ReentrantLock();
- private ThreadPoolExecutor threadPool;
- /**
- * Shutdown pool
- * <br>
- * Will wait all remaining threads to finish
- */
- public void close() {
- // waiting all pool threads to complete for 30 sec then terminate pool
- watchdogLock.lock();
- //watchdog
- int waitCount = 0;
- while (threadPool.getActiveCount() > 0) {
- try {
- waitCount++;
- watchdogLock.wait(1000);
- }
- catch (InterruptedException ignore) {}
- if (waitCount > 30)
- break;
- }
- watchdogLock.unlock();
- // terminate thread pool
- if (threadPool != null) {
- threadPool.shutdownNow();
- }
- }
- public void stop() {
- threadPool.shutdownNow();
- }
- }
- package ru.geekbrains.netty.selector03.server.entities.jobpool;
- import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.function.Consumer;
- import java.util.function.Supplier;
- // =========================================================================================
- /**
- * Async job pool dynamic size.
- * <br>
- * @param <T>
- */
- public class AsyncJobPool<T> extends BaseJobPool {
- private ThreadPoolExecutor threadPool;
- // On job done handler
- private Consumer<T> callback;
- /**
- * Pool of worker threads
- * @param callback handler onComplete event
- */
- public AsyncJobPool(Consumer<T> callback) {
- final CustomizableThreadFactory threadFactory = new CustomizableThreadFactory();
- threadFactory.setDaemon(true);
- threadFactory.setThreadNamePrefix("AsyncPool-");
- threadPool = (ThreadPoolExecutor)Executors.newCachedThreadPool(threadFactory);
- this.callback = callback;
- }
- /**
- * Add job to execute
- * <br>
- * @param job Supplier
- */
- public void add(Supplier<T> job) {
- CompletableFuture.supplyAsync(job, threadPool)
- .handle(this::handle)
- .thenAccept(this::callback);
- }
- // ----------------------------------------------------------------------------------------------------
- /**
- * WorkProcessable.work error handler
- */
- // Never should be called
- private T handle(T result, Throwable e) {
- //System.out.println(e.getMessage());
- return result;
- }
- /**
- * On job done
- * @param msg callback message
- */
- private void callback(T msg) {
- //System.out.println("callback");
- callback.accept(msg);
- }
- // ----------------------------------------------------------------------------------------------------
- }
- package ru.geekbrains.netty.selector03.server.entities.jobpool;
- import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
- import java.util.concurrent.*;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.function.Consumer;
- import java.util.function.Supplier;
- // ----------------------------------------------------------------------
- // =========================================================================================
- /**
- * Job pool fixed size.
- * <br>
- * When busy threads exceeds pool size
- * Then thread that calls Blocking JobPool.ads(...) would be blocked until some jobs have been finished
- * @param <T>
- */
- public class BlockingJobPool<T> extends BaseJobPool {
- private Semaphore semaphore;
- private ThreadPoolExecutor threadPool;
- // ThreadPoolExecutor.getActiveCount() precious replacement
- private final AtomicInteger workingThreadCnt = new AtomicInteger(0);
- // On job done handler
- private Consumer<T> callback;
- private final AtomicInteger threadCount = new AtomicInteger(0);
- /**
- * Pool of worker threads
- * @param poolSize pool size (count of threads)
- * @param callback handler onComplete event
- */
- public BlockingJobPool(int poolSize,
- Consumer<T> callback) {
- final CustomizableThreadFactory threadFactory = new CustomizableThreadFactory();
- threadFactory.setDaemon(true);
- threadFactory.setThreadNamePrefix("BlockingPool-");
- threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(poolSize, threadFactory);
- this.callback = callback;
- semaphore = new Semaphore(poolSize, true);
- }
- /**
- * Add job to execute
- * <br>
- * If pool have all it's threads busy then thread that called BlockingJobPool.add(...)
- * will wait until some thread in pool have finished it's job
- * @param job Supplier
- */
- public void add(Supplier<T> job) {
- try {
- semaphore.acquire();
- threadCount.getAndIncrement();
- CompletableFuture.supplyAsync(job, threadPool)
- .handle(this::handle)
- .thenAccept(this::callback);
- }
- // stop job evaluation if interrupted
- catch (InterruptedException e) {
- // if thread was interrupted inside job
- semaphore.release();
- }
- }
- // ----------------------------------------------------------------------------------------------------
- /**
- * WorkProcessable.work error handler
- * please handle you exeptions directly in job (lambda) - this.add(Supplier T job)
- */
- private T handle(T result, Throwable e) {
- if (e != null) {
- e.printStackTrace();
- }
- return result;
- }
- /**
- * On job done
- * @param msg T
- */
- private void callback(T msg) {
- // notify caller about job done
- callback.accept(msg);
- threadCount.getAndDecrement();
- semaphore.release();
- }
- public boolean isFull() {
- return threadCount.get() == threadPool.getMaximumPoolSize();
- }
- // ----------------------------------------------------------------------------------------------------
- }
- package ru.geekbrains.netty.selector03.server.entities;
- import java.nio.channels.SelectionKey;
- import java.time.Instant;
- import java.util.Iterator;
- import java.util.Map;
- import java.util.NavigableMap;
- import java.util.concurrent.ConcurrentSkipListMap;
- import ru.geekbrains.netty.selector03.common.entities.Connection;
- // ByteBuffer cache for clients
- public class ConnectionList implements Iterable<Map.Entry<Integer, Connection>>{
- private static final int ROTTEN_INTERVAL = 100000000; // sec
- private NavigableMap<Integer, Connection> connList = new ConcurrentSkipListMap<>();
- private NavigableMap<Instant, Integer> connTimeList = new ConcurrentSkipListMap<>();
- public void add(SelectionKey key) {
- Instant now = Instant.now();
- Connection connection = new Connection(key, now);
- int id = (int)key.attachment();
- connList.put(id, connection);
- connTimeList.put(now, id);
- }
- public Connection get(int id) {
- return connList.get(id);
- }
- public void update(int id) {
- Connection connection = connList.get(id);
- if (connection != null) {
- connTimeList.remove(connection.getTime());
- Instant now = Instant.now();
- connection.setTime(now);
- connTimeList.put(now, id);
- }
- }
- public void remove(int id) {
- Connection connection = connList.get(id);
- if (connection != null) {
- System.out.println("Removing connection #" + id);
- connTimeList.remove(connection.getTime());
- connection.close();
- }
- connList.remove(id);
- }
- /**
- * Удаляет протухшие ключи
- */
- public void removeRotten() {
- //System.out.println("Removing rotten comnnections ...");
- Instant label = Instant.now().minusSeconds(ROTTEN_INTERVAL);
- NavigableMap<Instant, Integer> rotten = connTimeList.headMap(label , true);
- Iterator<Map.Entry<Instant, Integer>> it = rotten.entrySet().iterator();
- while(it.hasNext()) {
- Map.Entry<Instant, Integer> entry = it.next();
- int id = entry.getValue();
- // remove from connList
- remove(id);
- // remove from connTimeList
- it.remove();
- }
- }
- @Override
- public Iterator<Map.Entry<Integer,Connection>> iterator() {
- return connList.entrySet().iterator();
- }
- }
- package ru.geekbrains.netty.selector03.server;
- import ru.geekbrains.netty.selector03.common.entities.Connection;
- import ru.geekbrains.netty.selector03.common.entities.MessageType;
- import ru.geekbrains.netty.selector03.server.entities.ConnectionList;
- import ru.geekbrains.netty.selector03.server.entities.jobpool.BlockingJobPool;
- import ru.geekbrains.netty.selector03.server.serverActions.DirectoryReader;
- import static ru.geekbrains.netty.selector03.common.entities.Utils.StringTochannel;
- import static ru.geekbrains.netty.selector03.common.entities.Utils.channelToString;
- import static ru.geekbrains.netty.selector03.common.entities.Utils.isNullOrEmpty;
- import java.io.*;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.*;
- import java.nio.file.Files;
- import java.nio.file.Path;
- import java.nio.file.Paths;
- import java.util.Iterator;
- import java.util.concurrent.*;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.function.Function;
- import java.util.function.IntUnaryOperator;
- // https://www.programering.com/a/MTN1MDMwATk.html
- // https://www.ibm.com/developerworks/cn/java/l-niosvr/ => google-translate from china
- // SelectionKey.isWritable() - protect socket from flooding
- // https://stackoverflow.com/questions/11360374/when-a-selectionkey-turns-writable-in-java-nio
- public class FubarServer implements Runnable {
- private static final int ROTTEN_LATENCY = 1; //sec
- private ServerSocketChannel serverSocketChannel;
- private Selector selector;
- private final String welcomeString = "Fubar Transfer Protocol server приветствует вас.";
- // Non-negative AtomicInteger incrementator
- private static IntUnaryOperator AtomicNonNegativeIntIncrementator = (i) -> i == Integer.MAX_VALUE ? 0 : i + 1;
- // connection id generator
- private static final AtomicInteger connectionIdGen = new AtomicInteger();
- private ConnectionList connectionList = new ConnectionList();
- private BlockingJobPool<Void> jobPool = new BlockingJobPool<>(4, this::onDone);
- private static final int PORT_NUMBER = 8000;
- private static final String IP = "0.0.0.0";
- private String dataRoot;
- FubarServer() throws IOException {
- // Будут проблемы с путями
- dataRoot = System.getProperty("user.dir") + "/data/"; //(? File.separator)
- Files.createDirectories(Paths.get(dataRoot));
- serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.socket().bind(new InetSocketAddress(IP, PORT_NUMBER));
- serverSocketChannel.configureBlocking(false);
- selector = Selector.open();
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- // start rottening old connections
- scheduleDeleteRottenConnections();
- }
- @Override
- public void run() {
- try {
- System.out.println("Серверо запущено " + IP + ":" + PORT_NUMBER);
- Iterator<SelectionKey> it;
- SelectionKey key;
- // while true
- while (serverSocketChannel.isOpen()) {
- selector.select();
- it = selector.selectedKeys().iterator();
- // ...
- if (selector.selectedKeys().size() == 0) {
- continue;
- }
- //System.out.println("SELECTING: " + selector.selectedKeys().size());
- while (it.hasNext()) {
- key = it.next();
- //System.out.println("KEY INTERESTS: " + key.interestOps());
- //System.out.println("KEY READY : " + key.readyOps());
- it.remove();
- // skip invalid keys (disconnected channels)
- if (!key.isValid())
- continue;
- if (key.isAcceptable()) {
- acceptSocket(key);
- }
- if (key.isReadable()) {
- // Снипмаем подписку о получении новых данных в сокете
- // пока поток из пула читает данные из сокета
- // Чтобы не бегать бесконечно в цикле select
- removeInterest(key, SelectionKey.OP_READ);
- // Читаем данные в отдельном потоке
- // Если прочиталось все сообщение, то назначаем его на выполнение
- // (выполняться будет потом в другом потоке)
- // Если целиком не прочиталось - будет дочитываться в других циклах select
- SelectionKey finalKey = key;
- jobPool.add(() -> {
- try {
- //System.out.println("OP_READ job start");
- // Если сообщение принято целиком,
- // то обработать его
- if (readSocket(finalKey)) {
- processInput(finalKey);
- }
- // Возвращаем подписку на флаг чтения новых данных из сокета
- setInterest(finalKey, SelectionKey.OP_READ);
- // Будим селектор (могли подойти новые данные)
- selector.wakeup();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- });
- }
- // Интерес на запись выставляется отдельно
- // вручную при желании что-либо передать
- // либо внутри writeSocket(...) если затопился сокет и отправка не удалась
- if (key.isWritable()) {
- // Чтобы не бегать бесконечно в цикле select
- // Пока потоки из пула пишут в сокеты
- removeInterest(key, SelectionKey.OP_WRITE);
- // Пишем в отдельном потоке
- SelectionKey finalKey = key;
- jobPool.add(() -> {
- try {
- //System.out.println("OP_WRITE job start");
- writeSocket(finalKey);
- // Возвращаем подписку на флаг чтения новых данных из сокета
- //setInterest(finalKey, SelectionKey.OP_WRITE);
- // Будим селектор (могли подойти новые данные)
- //selector.wakeup();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- });
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- private void acceptSocket(SelectionKey key) {
- try {
- System.out.println("acceptSocket");
- //System.out.println(Thread.currentThread().toString());
- ServerSocketChannel serverSocket = (ServerSocketChannel)key.channel();
- //System.out.println("LOCAL: " + serverSocket.getLocalAddress());
- SocketChannel client = serverSocket.accept();
- System.out.println("REMOTE ENDPOINT: " + client.getRemoteAddress());
- // Нет свободных потоков - нечем обрабатывать клиента
- if (jobPool.isFull()) {
- System.out.println("No workers - disconnecting");
- client.close();
- return;
- }
- int id = connectionIdGen.getAndUpdate(AtomicNonNegativeIntIncrementator);
- client.configureBlocking(false);
- // регистрируемся на OP_READ на сокете клиента
- SelectionKey clientKey = client.register(selector, SelectionKey.OP_READ, id);
- setInterest(clientKey, SelectionKey.OP_WRITE);
- connectionList.add(clientKey);
- //SeekableByteChannel data = new SeekableInMemoryByteChannel(welcomeString.getBytes());
- Connection connection = connectionList.get(id);
- SeekableByteChannel data = connection.getBufferedTransmitChannel();
- // will write greeting to data
- StringTochannel(welcomeString, data);
- // schedule sending greeting
- scheduleWrite(clientKey, data);
- System.out.println("Подключился новый клиент #" + id);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * Читаем из сокета данные, сколько их там накопилось
- * Учитывая длину сообщения из заголовка
- *
- * @return boolean сообщение принято целиком
- */
- private boolean readSocket(SelectionKey key) {
- boolean result = false;
- int id = -1;
- try {
- //System.out.println("readSocket");
- SocketChannel client = (SocketChannel)key.channel();
- id = (int)key.attachment();
- Connection connection = connectionList.get(id);
- // Изначально не знаем что приедет - текст или файл
- SeekableByteChannel data = connection.getReceiveChannel();
- ByteBuffer buffer = connection.getReadBuffer();
- boolean someDataHasReadied = false;
- int read; // сколько байт прочли из сокета
- // read > 0 - readied some data
- // read = 0 - no data available (end of stream)
- // read = -1 - closed connection
- // подготавливаем буфер для чтения
- buffer.clear();
- // Еще не читали заголовок сообщения из сокета
- // Устанавливаем limit буффера в размер заголовка
- // и будем читать только заголовок
- if(!connection.isReceiveHeaderPresent()) {
- buffer.limit(8 + 1); // length + type
- }
- else {
- // => проблема с ненужным вычитыванием начальных байтов следущего сообщения
- // если сообщения идут подряд -
- // то надо в начале вычитать только хедер, узнать сколько байт надо принять
- // а потом выставить limit буфера в
- // min(buffer.capacity, bytesRemaining)
- // Чтобы буфер не прочел данные за концом текущего сообщения
- // это будет начальные байты(заголовк) следущего сообщения
- buffer.limit((int)Math.min(
- (long)buffer.capacity(),
- connection.remainingBytesToRead()));
- }
- while ((read = client.read(buffer)) > 0) { // ----------------------------------------------
- buffer.flip();
- // устанавливаем флаг что что-то смогли прочитать из сокета
- someDataHasReadied = true;
- // Parse header if didn't do it before ---------------------------------------
- // Узнаем тип сообщения и его размер
- if (!connection.isReceiveHeaderPresent()) {
- MessageType messageType = connection.parseHeader();
- // Определяемся, куда сохранять данные
- if(messageType == MessageType.TEXT) {
- // берем из буферный канал для текста
- data = connection.getBufferedReceiveChannel();
- }
- else {
- // пишем в файл
- data = connection.createFileChannel(connection.getReceiveFilePath(), "rw");
- }
- // устанавливаем выбранный канал для connection в качестве канала-приемника
- connection.setReceiveChannel(data);
- } // -------------------------------------------------------------------------
- // уменьшаем количество оставшихся байт сообщения для чтения
- connection.decreaseRemainingBytesToRead(read);
- assert data != null;
- // пишем из буфера в канал
- data.write(buffer);
- // опять настраиваем буфер, чтоб жизнь медом не казалась
- buffer.rewind();
- buffer.limit((int)Math.min(
- (long)buffer.capacity(),
- connection.remainingBytesToRead()));
- } // ------------------------------------------------------------------------------------
- // -------------------------------------------------
- // Если хоть что-то передалось
- if (someDataHasReadied) {
- // refresh client TTL
- connectionList.update(id);
- }
- // -------------------------------------------------
- // Remote endpoint close connection
- if (read < 0) {
- System.out.println(key.attachment() + " отключился");
- connectionList.remove(id); // will close socket channel
- return false;
- }
- // Тут еще возможен вариант:
- // прочли не все, возможно оставшиеся байты сообщения
- // прижут позднее
- // Приняли все байты сообщения
- if(connection.remainingBytesToRead() == 0) {
- // Сообщение прочиталось целиком
- result = true;
- // возвращаем обратно возможность разбирать заголовок нового сообщения
- connection.setReceiveHeaderPresent(false);
- // очищаем буффер для следущего приема (не обязательно)
- buffer.clear();
- }
- // assert data != null;
- // long pos = data.position();
- // System.out.println("R: " + channelToString(data));
- // data.position(pos);
- }
- catch (Exception e) {
- if (e instanceof ClosedChannelException) {
- System.out.println(key.attachment() + " отключился");
- // Remote endpoint close connection
- // (maybe not handled in "if (read < 0)")
- connectionList.remove(id); // will close socket channel
- }
- e.printStackTrace();
- }
- //System.out.println("readSocket END");
- return result;
- }
- private void writeSocket(SelectionKey key) {
- int id = -1;
- try {
- //System.out.println("writeSocket");
- SocketChannel client = (SocketChannel)key.channel();
- id = (int)key.attachment();
- Connection connection = connectionList.get(id);
- ByteBuffer buffer = connection.getWriteBuffer();
- SeekableByteChannel data = connection.getTransmitChannel();
- boolean someDataHasSend = false;
- int wrote; // сколько байт записали в сокет
- int dataRead; // сколько байт прочли из канала
- // wrote > 0 - wrote some data
- // wrote = 0 - no data written // need register(selector, SelectionKey.OP_WRITE, id);
- int bufferRemaining = -1;
- // пишем в сокет, пока есть что передавать
- // и сокет принимает данные (не затопился)
- while (data.position() < data.size()) {
- // Add header if absent
- if (!connection.isTransmitHeaderPresent()) {
- connection.writeHeader();
- }
- // читаем из канала в буффер
- dataRead = data.read(buffer);
- buffer.flip();
- // пишем в сокет
- wrote = client.write(buffer);
- bufferRemaining = buffer.remaining();
- // if (buffer.remaining() > 0) {
- // System.out.println(buffer.remaining() );
- // }
- if (!someDataHasSend) {
- someDataHasSend = wrote > 0;
- }
- // что не залезло в сокет помещаем в начало буфера
- buffer.compact();
- // socket stall
- // оставляем сокет в покое
- if (bufferRemaining > 0) {
- //System.out.println("WR: " + wrote);
- break;
- }
- }
- // -------------------------------------------------
- // Если хоть что-то передалось - refresh client TTL
- if (someDataHasSend) {
- connectionList.update(id);
- }
- // -------------------------------------------------
- // причем, если не отправилось по сети, то в buffer будет лежать кусок
- // (скопированный из data), который так и не отправился
- // буффер нужно очистить, а transmitChannel отмотать назад на размер прочтенных байт из data
- //
- // Это все к тому, то нельзя начинать передавать новые данные, пока по сети не передалось
- // текущее сообщение
- // Не смогли передать все данные
- // Не прочитано целиком все из канала / остался последний неотправленный кусок в buffer
- // Флудим сокет данными
- // он не успевает передавать тут / принимать на удаленном конце
- // регистрируемся на флаг что удаленный сокет может принимать сообщения
- // чтобы возобновить передачу как сокет будет готов передавать
- if (data.position() < data.size() ||
- bufferRemaining > 0) {
- // Сохранить непереданный кусок данных для следущего цикла передачи
- // отмотаем transmitChannel назад на размер данных в буффере (которые не передались)
- //System.out.println(data.position() + " / " + data.size());
- // Выставляем бит OP_WRITE в 1 (подписываемся на флаг готовности сокета отправлять данные)
- setInterest(key, SelectionKey.OP_WRITE);
- // будим селектор (будем отправлять данные в следущем цикле)
- selector.wakeup();
- }
- // -------------------------------------------------------------------------
- // Все успешно записалось, data.position == data.size
- else {
- // возвращаем обратно возможность писать заголовок для нового сообщения
- // восстанавливаем новый цикл записи сообщений
- connection.setTransmitHeaderPresent(false);
- //System.out.println(data.position() + " / " + data.size());
- // Закрываем файловый канал (откуда писали в сокет)
- if (connection.getChannelType(data) == MessageType.BINARY) {
- data.close();
- }
- // Если это был текстовый канал, то ничего не делаем,
- // он там переиспользуется
- // обнуляем ссылку на transmitChannel
- // (Защита от записи в сокет нового сообщения,
- // если он еще не закончил передачу текущих данных)
- connection.setTransmitChannel(null);
- // очищаем буффер для следущей передачи
- buffer.clear();
- // отписываемся от оповещения что сокет готов передавать данные
- // Выставляем в ключе бит OP_WRITE в 0 (отписываемся)
- removeInterest(key, SelectionKey.OP_WRITE);
- }
- // -------------------------------------------------------------------------
- // long pos = data.position();
- // System.out.println("T: " + channelToString(data));
- // data.position(pos);
- }
- catch (Exception e) {
- if (e instanceof ClosedChannelException) {
- System.out.println(key.attachment() + " отключился");
- // Remote endpoint close connection
- connectionList.remove(id); // will close socket channel
- }
- e.printStackTrace();
- }
- //System.out.println("writeSocket END");
- }
- // -------------------------------------------------------------------------------
- private void setInterest(SelectionKey key, int interest) {
- //System.out.println("setInterest ON: " + interest);
- if (key.isValid() &&
- (key.interestOps() & interest) == 0) {
- int current = key.interestOps();
- key.interestOps(current | interest);
- }
- }
- private void removeInterest(SelectionKey key, int interest) {
- //System.out.println("setInterest OFF: " + interest);
- if (key.isValid() &&
- (key.interestOps() & interest) != 0) {
- int current = key.interestOps();
- key.interestOps(current & ~interest);
- }
- }
- // =============================================================================
- /**
- * Планрует запись в сокет
- * <br>
- * Подготавливает данные для записи в сокет,
- * подписывается(устанавливает) на флаг возможности записи в сокет
- */
- private void scheduleWrite(SelectionKey key, SeekableByteChannel data) {
- try {
- // Т.к. все асинхронное (несколько потоков)
- // То одному и тому же клиенту могут начать отправлять одновременно несколько сообщений -
- // Надо делать очередь сообщений (на отправку) для клиента.
- // (Если не охота потом принимать байты(куски байт) сообщений в перемешанном порядке)
- // (Люди говорят для TCP такое можно устроить, для UDP - нет)
- // (Потом, например, удалять только те сообщения, котороые удалось доставить, и т.д.)
- int id = (int)key.attachment();
- Connection connection = connectionList.get(id);
- // Проверить нет ли текущих данных на отправку (в connectionList)
- // и если есть, то не отправлять - просто потерять это данные (ибо нефиг)
- // Можно, конечно валить все в сокет, (и больше ~3 метров в неблокирующем режиме не залезет)
- // дальше данные начнут теряться уже в сетевой подсистеме ядра при переполнении буффера сокета
- if (connection.getTransmitChannel() != null) {
- System.out.println("Внимание - обнаружена попытка одновременной передачи, данные НЕ отправлены");
- return;
- }
- // Задаем сокету данные на передачу
- data.position(0);
- connection.setTransmitChannel(data);
- setInterest(key, SelectionKey.OP_WRITE);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * Process received channel (containing text command or file)
- */
- private void processInput(SelectionKey key) {
- try {
- int id = (int)key.attachment();
- Connection connection = connectionList.get(id);
- SeekableByteChannel receiveChannel = connection.getReceiveChannel();
- // Text message
- if (connection.getChannelType(connection.getReceiveChannel()) == MessageType.TEXT) {
- String command = channelToString(receiveChannel);
- processCommand(key, command);
- // будем использовать повторно, без создания нового channel
- // receiveChannel backed by bufferedReceiveChannel
- // поэтому не закрываем, а truncate до 0
- receiveChannel.position(0);
- receiveChannel.truncate(0);
- }
- // File message
- else {
- // там в прошлом через команду уже был настроен файл для приема
- // И в readSocket() файл уже записался.
- // Поэтому просто закрываем
- receiveChannel.close();
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- /**
- * Parse and process user commands
- * Then reply to user in TEXT MODE
- * (hope text reply will not concat with subsequent binary stream)
- */
- private void processCommand(SelectionKey key, String command) {
- SeekableByteChannel data = null;
- String textResponse = null;
- int id = (int)key.attachment();
- Connection connection = connectionList.get(id);
- // // No input
- // if (isNullOrEmpty(command)) {
- // return;
- // }
- String[] parts = command.split(" ");
- switch (parts[0]) {
- case "ls":
- Function<String,String> dirNfo = new DirectoryReader();
- textResponse = dirNfo.apply(dataRoot);
- if (textResponse.equals("")) {
- textResponse = ".";
- }
- else {
- textResponse = ".\n" + textResponse;
- }
- break;// ---------------------------------------------------------
- case "get":
- // file name not specified
- if (parts.length < 2 ||
- isNullOrEmpty(parts[1])) {
- textResponse = "invalid command args";
- break;
- }
- Path filePath = Paths.get(dataRoot + parts[1]);
- // file not exists
- if (!Files.exists(filePath)) {
- textResponse = "file not exists";
- break;
- }
- // get file
- try {
- // будем отвечать клиенту файловым каналом
- data = connection.createFileChannel(filePath, "r");
- }
- catch (Exception e) {
- textResponse = "I/O error";
- e.printStackTrace();
- }
- break;// ---------------------------------------------------------
- case "put":
- // file name not specified
- if (parts.length < 2 ||
- isNullOrEmpty(parts[1])) {
- textResponse = "invalid command args";
- break;
- }
- // set file
- try {
- filePath = Paths.get(dataRoot + parts[1]);
- connection.setReceiveFilePath(filePath);
- textResponse = "ok"; // response OK
- }
- catch (Exception e) {
- textResponse = "I/O error";
- e.printStackTrace();
- }
- break;// ---------------------------------------------------------
- case "":
- textResponse = "nop";
- break;// ---------------------------------------------------------
- default:
- textResponse = "unknown command";
- break;// ---------------------------------------------------------
- }
- // Конвертируем текст - результат выполнения команды в channel
- if (textResponse != null) {
- data = connection.getBufferedTransmitChannel();
- // will write textResponse to data
- StringTochannel(textResponse, data);
- }
- assert data != null;
- // schedule sending response to command (text)
- scheduleWrite(key, data);
- }
- // =============================================================================
- public static void main(String[] args) throws IOException {
- Thread t = new Thread(new FubarServer());
- //t.setDaemon(false);
- t.start();
- }
- // =============================================================================
- // Schedule rottening old connections
- private void scheduleDeleteRottenConnections() {
- ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactory() {
- public Thread newThread(Runnable r) {
- Thread t = Executors.defaultThreadFactory().newThread(r);
- t.setDaemon(true);
- return t;
- }
- });
- service.scheduleAtFixedRate(
- () -> connectionList.removeRotten(), ROTTEN_LATENCY, ROTTEN_LATENCY, TimeUnit.SECONDS);
- }
- public void onDone(Void v) {
- //System.out.println("Done");
- }
- }
- package ru.geekbrains.netty.selector03.server.serverActions;
- import java.nio.file.DirectoryStream;
- import java.nio.file.Files;
- import java.nio.file.Path;
- import java.nio.file.Paths;
- import java.util.function.Function;
- public class DirectoryReader implements Function<String,String> {
- @Override
- public String apply(String dir) {
- String result = null;
- try {
- StringBuilder sb = new StringBuilder();
- Path path = Paths.get(dir);
- try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
- for (Path entry : stream) {
- sb.append(entry.getFileName()).append("\n");
- }
- }
- result = sb.toString().trim();
- } catch (Exception e) {
- e.printStackTrace();
- }
- return result;
- }
- }
- package ru.geekbrains.netty.selector03.server;
- import org.apache.commons.compress.utils.SeekableInMemoryByteChannel;
- import java.nio.channels.SeekableByteChannel;
- /**
- * Hello world!
- *
- */
- public class App
- {
- public static void main(String[] args) {
- new App();
- }
- App() {
- }
- }
- package ru.geekbrains.netty.selector03.common.entities;
- public enum MessageType {
- TEXT((byte)0),
- BINARY((byte)1);
- private byte value;
- MessageType(byte value) {
- this.value = value;
- }
- public byte getValue() {
- return value;
- }
- public static MessageType parse(byte value) {
- MessageType result = null;
- if (value == 0) {
- result = MessageType.TEXT;
- }
- else if (value == 1) {
- result = MessageType.BINARY;
- }
- return result;
- }
- }
- package ru.geekbrains.netty.selector03.common.entities;
- import org.apache.commons.compress.utils.SeekableInMemoryByteChannel;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.io.RandomAccessFile;
- import java.nio.ByteBuffer;
- import java.nio.channels.*;
- import java.nio.file.Path;
- import java.time.Instant;
- public class Connection {
- public static final int BUFFER_SIZE = 1024*1024; // read and write buffer size
- private SelectionKey key;
- private SocketChannel channel; // сокет
- private ByteBuffer readBuffer; // промежуточный буффер на чтение
- private ByteBuffer writeBuffer; // промежуточный буффер на запись
- private Instant time;
- private SeekableByteChannel transmitChannel; // канал на передачу данных клиенту
- private SeekableByteChannel receiveChannel; // канал на прием данных от клиента
- // чтобы не плодить каналов через new()
- public SeekableByteChannel bufferedTransmitChannel; // Используется для передачи текстовых данных
- private SeekableByteChannel bufferedReceiveChannel; // Используется для приема текстовых данных
- private boolean transmitHeaderPresent; // заголовок принимаемого сообщения был прочитан
- private boolean receiveHeaderPresent; // в отправляемое сообщение был записан заголовок
- //private ByteBuffer data; // emulating data(file) needed to be transferred to client
- private Path receiveFilePath; // путь к файлу для приема
- private long remainingBytesToRead;
- //private ByteArrayOutputStream bufferStream; // работает с readBuffer при приеме текстового сообщения
- // хотя можно просто унаследоваться от SeekableByteChannel и сделать два Channel
- // TextChannel и BinaryChannel - в одном текст, в другом - файл
- //private MessageType receiveMessageType; // received message type
- public Connection() {
- bufferedTransmitChannel = new SeekableInMemoryByteChannel();
- bufferedReceiveChannel = new SeekableInMemoryByteChannel();
- this.readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
- this.writeBuffer = ByteBuffer.allocate(BUFFER_SIZE);
- }
- public Connection(SelectionKey key, Instant time) {
- this();
- this.key = key;
- this.channel = (SocketChannel)key.channel();
- this.time = time;
- }
- public Connection(SocketChannel channel) {
- this();
- this.channel = channel;
- }
- public ByteBuffer getReadBuffer() {
- return readBuffer;
- }
- public ByteBuffer getWriteBuffer() {
- return writeBuffer;
- }
- public SelectionKey getKey() {
- return key;
- }
- public SocketChannel getChannel() {
- return channel;
- }
- public Instant getTime() {return time;}
- public void setTime(Instant time) {this.time = time;}
- /* public RandomAccessFile getFile() {return file;}
- public void setFile(RandomAccessFile file) {this.file = file;}*/
- public SeekableByteChannel getTransmitChannel() {
- return transmitChannel;
- }
- public void setTransmitChannel(SeekableByteChannel transmitChannel) {
- this.transmitChannel = transmitChannel;
- }
- public SeekableByteChannel getReceiveChannel() {
- return receiveChannel;
- }
- public void setReceiveChannel(SeekableByteChannel receiveChannel) {
- this.receiveChannel = receiveChannel;
- }
- public boolean isTransmitHeaderPresent() {
- return transmitHeaderPresent;
- }
- public void setTransmitHeaderPresent(boolean transmitHeaderPresent) {
- this.transmitHeaderPresent = transmitHeaderPresent;
- }
- public boolean isReceiveHeaderPresent() {
- return receiveHeaderPresent;
- }
- public void setReceiveHeaderPresent(boolean receiveHeaderPresent) {
- this.receiveHeaderPresent = receiveHeaderPresent;
- }
- public long remainingBytesToRead() {
- return remainingBytesToRead;
- }
- public void remainingBytesToRead(long remainingBytesToRead) {
- this.remainingBytesToRead = remainingBytesToRead;
- }
- public void decreaseRemainingBytesToRead(long amount) {
- remainingBytesToRead-= amount;
- }
- public Path getReceiveFilePath() {
- return receiveFilePath;
- }
- public void setReceiveFilePath(Path receiveFilePath) {
- this.receiveFilePath = receiveFilePath;
- }
- public SeekableByteChannel getBufferedTransmitChannel() {
- try {
- bufferedTransmitChannel.position(0);
- bufferedTransmitChannel.truncate(0);
- } catch (IOException e) {
- e.printStackTrace();
- }
- return bufferedTransmitChannel;
- }
- public SeekableByteChannel getBufferedReceiveChannel() {
- try {
- bufferedReceiveChannel.position(0);
- bufferedReceiveChannel.truncate(0);
- } catch (IOException e) {
- e.printStackTrace();
- }
- return bufferedReceiveChannel;
- }
- // -----------------------------------------------------------------------------------
- public void close() {
- // close socket
- if (channel != null &&
- channel.isOpen()) {
- try {
- channel.close();
- } catch (IOException ignored) {}
- }
- // close transmitChannel
- if (transmitChannel != null) {
- try {
- transmitChannel.close();
- } catch (IOException ignored) {}
- }
- // close receiveChannel
- if (receiveChannel != null) {
- try {
- receiveChannel.close();
- } catch (IOException ignored) {}
- }
- }
- /**
- * Write header to connection.writeBuffer
- */
- public void writeHeader() {
- try {
- assert writeBuffer.position() == 0;
- // write message size
- writeBuffer.putLong(transmitChannel.size());
- // write message type
- writeBuffer.put(getChannelType(transmitChannel).getValue());
- setTransmitHeaderPresent(true);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * Read header from connection.readBuffer
- */
- public MessageType parseHeader() {
- MessageType result = null;
- try {
- // return to beginning of buffer
- readBuffer.rewind();
- // get payload length
- remainingBytesToRead = readBuffer.getLong();
- // позволим дальше писать в буфер
- //readBuffer.limit(readBuffer.capacity());
- // get message type
- result = MessageType.parse(readBuffer.get());
- // увеличим количество байт для чтения на размер заголовка (8+1)
- // т.к. в цикле readSocket(..)
- // в количество прочитанных байт из сокета 'read' вошли как длина заголовока,
- // так и число прочтенных байт самого сообщения, поэтому возвращаем обратно
- // (помечаем ка кнепрочитанные)
- remainingBytesToRead += (8 + 1); // int64 - message size + 1 byte message type
- setReceiveHeaderPresent(true);
- } catch (Exception e) {
- e.printStackTrace();
- }
- return result;
- }
- // =====================================================================
- public MessageType getChannelType(SeekableByteChannel channel) {
- MessageType result = null;
- if (channel instanceof SeekableInMemoryByteChannel) {
- result = MessageType.TEXT;
- }
- else if (channel instanceof FileChannel) {
- result = MessageType.BINARY;
- }
- return result;
- }
- public FileChannel createFileChannel(Path path, String mode) {
- FileChannel result = null;
- try {
- RandomAccessFile file = new RandomAccessFile(path.toString(), mode);
- result = file.getChannel();
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- }
- return result;
- }
- }
- package ru.geekbrains.netty.selector03.common.entities;
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.channels.SeekableByteChannel;
- import java.nio.charset.StandardCharsets;
- public class Utils {
- public static String channelToString(SeekableByteChannel channel) {
- String result = null;
- try {
- ByteBuffer buffer = ByteBuffer.allocate((int)channel.size());
- channel.position(0);
- channel.read(buffer);
- buffer.flip();
- result = new String( buffer.array(), StandardCharsets.UTF_8);
- } catch (IOException e) {
- e.printStackTrace();
- }
- return result;
- }
- public static void StringTochannel(String text, SeekableByteChannel outChannel) {
- SeekableByteChannel result = null;
- try {
- ByteBuffer tmpBuffer = ByteBuffer.wrap(text.getBytes());
- outChannel.position(0);
- outChannel.truncate(tmpBuffer.capacity());
- outChannel.write(tmpBuffer);
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static void copyBuffer(ByteBuffer src, ByteBuffer dst) {
- int maxTransfer = Math.min(dst.remaining(), src.remaining());
- // use a duplicated(backed on original) buffer so we don't disrupt the limit of the original buffer
- ByteBuffer tmp = src.duplicate();
- tmp.limit(tmp.position() + maxTransfer);
- dst.put(tmp);
- // now discard the data we've copied from the original source (optional)
- src.position(src.position() + maxTransfer);
- }
- public static boolean isNullOrEmpty(Object object) {
- return object == null || object.getClass() == String.class && ((String)object).trim().isEmpty();
- }
- }
- package ru.geekbrains.netty.selector03.client;
- import org.apache.commons.compress.utils.SeekableInMemoryByteChannel;
- import ru.geekbrains.netty.selector03.common.entities.Connection;
- import ru.geekbrains.netty.selector03.common.entities.MessageType;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.ClosedChannelException;
- import java.nio.channels.SeekableByteChannel;
- import java.nio.channels.SocketChannel;
- import java.nio.charset.StandardCharsets;
- import java.nio.file.Files;
- import java.nio.file.Path;
- import java.nio.file.Paths;
- import static ru.geekbrains.netty.selector03.common.entities.Utils.channelToString;
- import static ru.geekbrains.netty.selector03.common.entities.Utils.isNullOrEmpty;
- public class FubarClient implements Runnable {
- private static final int PORT_NUMBER = 8000;
- private static final String SERVER_HOST = "127.0.0.1";
- private SocketChannel socketChannel;
- private String dataRoot;
- private Connection connection;
- private SeekableByteChannel pendingFileTransfer;
- public FubarClient() throws IOException {
- // Будут проблемы с путями
- dataRoot = System.getProperty("user.dir") + "/data/"; //(? File.separator)
- Files.createDirectories(Paths.get(dataRoot));
- // in blocking mode
- socketChannel = SocketChannel.open();
- socketChannel.connect((new InetSocketAddress(SERVER_HOST, PORT_NUMBER)));
- //noinspection ConstantConditions
- connection = new Connection(socketChannel);
- }
- @Override
- public void run() {
- try {
- // reading server greeting
- readSocket();
- // print greeting to user
- processResponse();
- BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
- //noinspection InfiniteLoopStatement
- while(true) {
- // get user input
- String input = br.readLine();
- String res = parseUserInput(input);
- // invalid user input
- if (!isNullOrEmpty(res)) {
- System.out.println(res);
- continue;
- }
- //send to server
- SeekableByteChannel data = new SeekableInMemoryByteChannel(input.getBytes());
- connection.setTransmitChannel(data);
- writeSocket();
- // reading response
- readSocket();
- processResponse();
- // transfer file to server (if scheduled one)
- if (pendingFileTransfer!= null) {
- connection.setTransmitChannel(pendingFileTransfer);
- writeSocket();
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * Читаем из сокета данные, сколько их там накопилось
- * Учитывая длину сообщения из заголовка
- */
- private void readSocket() {
- try {
- //System.out.println("readSocket");
- SocketChannel client = connection.getChannel();
- ByteBuffer buffer = connection.getReadBuffer();
- // Изначально не знаем что приедет - текст или файл
- SeekableByteChannel data = connection.getReceiveChannel();
- int read;
- // read > 0 - readied some data
- // read = 0 - no data available (end of stream)
- // read = -1 - closed connection
- // подготавливаем буфер для чтения
- buffer.clear();
- long totalToReceive = -1;
- // см FubarServer
- if(!connection.isReceiveHeaderPresent()) {
- buffer.limit(8 + 1);
- }
- else {
- buffer.limit((int)Math.min(
- (long)buffer.capacity(),
- connection.remainingBytesToRead()));
- }
- while ((read = client.read(buffer)) > 0) {
- buffer.flip();
- // Parse header if didn't do it before ---------------------------------------
- // Узнаем тип сообщения и его размер
- if (!connection.isReceiveHeaderPresent()) {
- MessageType messageType = connection.parseHeader();
- totalToReceive = connection.remainingBytesToRead() - (8 + 1);
- // Определяемся, куда сохранять данные
- if(messageType == MessageType.TEXT) {
- // берем из буферный канал для текста
- data = connection.getBufferedReceiveChannel();
- }
- else {
- // пишем в файл
- data = connection.createFileChannel(connection.getReceiveFilePath(), "rw");
- }
- // устанавливаем выбранный канал для connection в качестве канала-приемника
- connection.setReceiveChannel(data);
- } // -------------------------------------------------------------------------
- // уменьшаем количество оставшихся байт сообщения для чтения
- connection.decreaseRemainingBytesToRead(read);
- assert data != null;
- // пишем из буфера в канал
- data.write(buffer);
- // опять настраиваем буфер, чтоб жизнь медом не казалась
- buffer.rewind();
- buffer.limit((int)Math.min(
- (long)buffer.capacity(),
- connection.remainingBytesToRead()));
- System.out.println("Rx: " + data.position() + " / " + totalToReceive);
- //System.out.println(data.position() + " / " + data.size());
- // преодалеваем упирание в блокирующий сокет (на чтение)
- if (connection.remainingBytesToRead() == 0) {
- break;
- }
- }
- // возвращаем обратно возможность разбирать заголовок нового сообщения
- connection.setReceiveHeaderPresent(false);
- // Remote endpoint close connection
- if (read < 0) {
- System.out.println("потеряна связь с сервером");
- connection.close();
- return;
- }
- }
- // Remote endpoint close connection
- catch (Exception e) {
- if (e instanceof ClosedChannelException) {
- System.out.println("потеряна связь с сервером");
- // Remote endpoint close connection
- // (maybe not handled in "if (read < 0)")
- connection.close(); // will close socket channel
- }
- e.printStackTrace();
- }
- //System.out.println("readSocket end");
- }
- private void writeSocket() {
- try {
- //System.out.println("writeSocket");
- SocketChannel client = connection.getChannel();
- ByteBuffer buffer = connection.getWriteBuffer();
- SeekableByteChannel data = connection.getTransmitChannel();
- //int wrote;
- // wrote > 0 - wrote some data
- // wrote = 0 - no data written // socket stall
- do {
- buffer.clear();
- // Add header if absent
- if (!connection.isTransmitHeaderPresent()) {
- connection.writeHeader();
- }
- data.read(buffer);
- buffer.flip();
- // пишем до упора, флудим сокет, висим на client.write(...)
- // пока все не пролезет или не упадем
- client.write(buffer);
- System.out.println("Tx: " + data.position() + " / " + data.size());
- }
- while (data.position() < data.size());
- // -------------------------------------------------------------------------
- // Все успешно записалось, data.position == data.size
- // возвращаем обратно возможность писать заголовок для нового сообщения
- // восстанавливаем новый цикл записи сообщений
- connection.setTransmitHeaderPresent(false);
- // Закрываем файловый канал (откуда писали в сокет)
- if (connection.getChannelType(data) == MessageType.BINARY) {
- data.close();
- // помечаем, что мы закончили передачу файла на сервер
- pendingFileTransfer = null;
- System.out.println("transfer complete");
- }
- // Если это был текстовый канал, то ничего не делаем,
- // он там переиспользуется
- // очищаем буффер для следущей передачи
- buffer.clear();
- // -------------------------------------------------------------------------
- }
- // Remote endpoint close connection
- catch (Exception e) {
- if (e instanceof ClosedChannelException) {
- System.out.println("потеряна связь с сервером");
- // Remote endpoint close connection
- // (maybe not handled in "if (read < 0)")
- connection.close(); // will close socket channel
- }
- e.printStackTrace();
- }
- }
- private void processResponse() {
- try {
- SeekableByteChannel receiveChannel = connection.getReceiveChannel();
- // Text message
- if (connection.getChannelType(receiveChannel) == MessageType.TEXT) {
- // display to user
- String response = channelToString(receiveChannel);
- // При отправке сереверу команды на передачу файла
- // сервер ответит, что он готов и ожидает передачу файла
- // - не будем выводить это в консоль
- if (pendingFileTransfer == null) {
- System.out.println(response);
- }
- // будем использовать повторно, без создания нового channel
- // receiveChannel буфферезируется (backed by) bufferedReceiveChannel
- // поэтому не закрываем, а truncate до 0
- receiveChannel.position(0);
- receiveChannel.truncate(0);
- }
- else {
- // working with files
- // там в прошлом через команду уже был настроен файл для приема
- // И в readSocket() файл уже записался.
- // Поэтому просто закрываем
- receiveChannel.close();
- System.out.println("transfer complete");
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- // -----------------------------------------------------------------------------
- private String parseUserInput(String command) {
- String[] parts = command.split(" ");
- String preFilter = null;
- Path filePath;
- switch (parts[0]) {
- case "get":
- // file name not specified
- if (parts.length < 2 ||
- isNullOrEmpty(parts[1])) {
- preFilter = "invalid command args";
- break;
- }
- filePath = Paths.get(dataRoot + parts[1]);
- connection.setReceiveFilePath(filePath);
- break;// ---------------------------------------------------------
- case "put":
- // file name not specified
- if (parts.length < 2 ||
- isNullOrEmpty(parts[1])) {
- preFilter = "invalid command args";
- break;
- }
- filePath = Paths.get(dataRoot + parts[1]);
- // file not exists
- if (!Files.exists(filePath)) {
- preFilter = "file not exists";
- break;
- }
- // set file
- try {
- pendingFileTransfer = connection.createFileChannel(filePath, "r");
- }
- catch (Exception e) {
- preFilter = "I/O error";
- e.printStackTrace();
- }
- break;// ---------------------------------------------------------
- }
- return preFilter;
- }
- // ==============================================================
- public static void main(String[] args) throws IOException {
- Thread t = new Thread(new FubarClient());
- //t.setDaemon(false);
- t.start();
- }
- }
- package ru.geekbrains.netty.selector03.client;
- import java.io.BufferedReader;
- import java.io.InputStreamReader;
- /**
- * Hello world!
- *
- */
- public class App
- {
- public static void main(String[] args) {
- new App();
- }
- App() {
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment