dreamworker

merged.java

Jul 15th, 2019
280
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 68.57 KB | None | 0 0
  1. package ru.geekbrains.netty.selector03.server.entities.jobpool;
  2.  
  3. import java.util.concurrent.ThreadPoolExecutor;
  4. import java.util.concurrent.locks.ReentrantLock;
  5.  
  6. public class BaseJobPool {
  7.  
  8.     private ReentrantLock watchdogLock = new ReentrantLock();
  9.     private ThreadPoolExecutor threadPool;
  10.  
  11.  
  12.     /**
  13.      * Shutdown pool
  14.      * <br>
  15.      * Will wait all remaining threads to finish
  16.      */
  17.     public void close() {
  18.  
  19.         // waiting all pool threads to complete for 30 sec then terminate pool
  20.  
  21.         watchdogLock.lock();
  22.  
  23.         //watchdog
  24.         int waitCount = 0;
  25.         while (threadPool.getActiveCount() > 0) {
  26.             try {
  27.                 waitCount++;
  28.                 watchdogLock.wait(1000);
  29.             }
  30.             catch (InterruptedException ignore) {}
  31.  
  32.             if (waitCount > 30)
  33.                 break;
  34.         }
  35.  
  36.         watchdogLock.unlock();
  37.  
  38.         // terminate thread pool
  39.         if (threadPool != null) {
  40.             threadPool.shutdownNow();
  41.         }
  42.     }
  43.  
  44.     public void stop() {
  45.  
  46.         threadPool.shutdownNow();
  47.     }
  48. }
  49.  
  50.  
  51.  
  52.  
  53.  
  54. package ru.geekbrains.netty.selector03.server.entities.jobpool;
  55.  
  56. import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
  57.  
  58. import java.util.concurrent.CompletableFuture;
  59. import java.util.concurrent.Executors;
  60. import java.util.concurrent.ThreadPoolExecutor;
  61. import java.util.function.Consumer;
  62. import java.util.function.Supplier;
  63.  
  64.  
  65. // =========================================================================================
  66.  
  67.  
  68. /**
  69.  * Async job pool dynamic size.
  70.  * <br>
  71.  * @param <T>
  72.  */
  73. public class AsyncJobPool<T> extends BaseJobPool {
  74.  
  75.  
  76.     private ThreadPoolExecutor threadPool;
  77.  
  78.     // On job done handler
  79.     private Consumer<T> callback;
  80.  
  81.  
  82.  
  83.     /**
  84.      * Pool of worker threads
  85.      * @param callback handler onComplete event
  86.      */
  87.     public AsyncJobPool(Consumer<T> callback) {
  88.  
  89.         final CustomizableThreadFactory threadFactory = new CustomizableThreadFactory();
  90.  
  91.         threadFactory.setDaemon(true);
  92.  
  93.         threadFactory.setThreadNamePrefix("AsyncPool-");
  94.  
  95.         threadPool = (ThreadPoolExecutor)Executors.newCachedThreadPool(threadFactory);
  96.         this.callback = callback;
  97.     }
  98.  
  99.     /**
  100.      * Add job to execute
  101.      * <br>
  102.      * @param job Supplier
  103.      */
  104.     public void add(Supplier<T> job) {
  105.  
  106.         CompletableFuture.supplyAsync(job, threadPool)
  107.                 .handle(this::handle)
  108.                 .thenAccept(this::callback);
  109.     }
  110.  
  111.  
  112.     // ----------------------------------------------------------------------------------------------------
  113.  
  114.  
  115.     /**
  116.      * WorkProcessable.work error handler
  117.      */
  118.     // Never should be called
  119.     private T handle(T result, Throwable e) {
  120.  
  121.         //System.out.println(e.getMessage());
  122.         return result;
  123.     }
  124.  
  125.  
  126.     /**
  127.      * On job done
  128.      * @param msg callback message
  129.      */
  130.     private void callback(T msg) {
  131.  
  132.         //System.out.println("callback");
  133.  
  134.         callback.accept(msg);
  135.     }
  136.  
  137.  
  138.  
  139.     // ----------------------------------------------------------------------------------------------------
  140. }
  141.  
  142.  
  143.  
  144. package ru.geekbrains.netty.selector03.server.entities.jobpool;
  145.  
  146. import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
  147.  
  148. import java.util.concurrent.*;
  149. import java.util.concurrent.atomic.AtomicInteger;
  150. import java.util.function.Consumer;
  151. import java.util.function.Supplier;
  152.  
  153.  
  154.  
  155. // ----------------------------------------------------------------------
  156.  
  157.  
  158.  
  159.  
  160. // =========================================================================================
  161.  
  162.  
  163. /**
  164.  * Job pool fixed size.
  165.  * <br>
  166.  * When busy threads exceeds pool size
  167.  * Then thread that calls Blocking JobPool.ads(...) would be blocked until some jobs have been finished
  168.  * @param <T>
  169.  */
  170. public class BlockingJobPool<T> extends BaseJobPool {
  171.  
  172.  
  173.  
  174.     private Semaphore semaphore;
  175.  
  176.     private ThreadPoolExecutor threadPool;
  177.  
  178.     // ThreadPoolExecutor.getActiveCount() precious replacement
  179.     private final AtomicInteger workingThreadCnt = new AtomicInteger(0);
  180.  
  181.     // On job done handler
  182.     private Consumer<T> callback;
  183.  
  184.     private final AtomicInteger threadCount = new AtomicInteger(0);
  185.  
  186.     /**
  187.      * Pool of worker threads
  188.      * @param poolSize pool size (count of threads)
  189.      * @param callback handler onComplete event
  190.      */
  191.     public BlockingJobPool(int poolSize,
  192.                            Consumer<T> callback) {
  193.  
  194.         final CustomizableThreadFactory threadFactory = new CustomizableThreadFactory();
  195.         threadFactory.setDaemon(true);
  196.         threadFactory.setThreadNamePrefix("BlockingPool-");
  197.  
  198.         threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(poolSize, threadFactory);
  199.         this.callback = callback;
  200.  
  201.         semaphore = new Semaphore(poolSize, true);
  202.     }
  203.  
  204.     /**
  205.      * Add job to execute
  206.      * <br>
  207.      * If pool have all it's threads busy then thread that called BlockingJobPool.add(...)
  208.      * will wait until some thread in pool have finished it's job
  209.      * @param job Supplier
  210.      */
  211.     public void add(Supplier<T> job) {
  212.  
  213.         try {
  214.             semaphore.acquire();
  215.  
  216.             threadCount.getAndIncrement();
  217.  
  218.             CompletableFuture.supplyAsync(job, threadPool)
  219.                     .handle(this::handle)
  220.                     .thenAccept(this::callback);
  221.  
  222.         }
  223.         // stop job evaluation if interrupted
  224.         catch (InterruptedException e) {
  225.  
  226.             // if thread was interrupted inside job
  227.             semaphore.release();
  228.         }
  229.     }
  230.  
  231.  
  232.     // ----------------------------------------------------------------------------------------------------
  233.  
  234.  
  235.     /**
  236.      * WorkProcessable.work error handler
  237.      * please handle you exeptions directly in job (lambda) - this.add(Supplier T  job)
  238.      */
  239.     private T handle(T result, Throwable e) {
  240.  
  241.         if (e != null) {
  242.             e.printStackTrace();
  243.         }
  244.  
  245.         return result;
  246.     }
  247.  
  248.  
  249.     /**
  250.      * On job done
  251.      * @param msg T
  252.      */
  253.     private void callback(T msg) {
  254.  
  255.         // notify caller about job done
  256.         callback.accept(msg);
  257.  
  258.         threadCount.getAndDecrement();
  259.         semaphore.release();
  260.     }
  261.  
  262.  
  263.     public boolean isFull() {
  264.         return threadCount.get() == threadPool.getMaximumPoolSize();
  265.     }
  266.  
  267.  
  268.  
  269.     // ----------------------------------------------------------------------------------------------------
  270.  
  271. }
  272.  
  273.  
  274. package ru.geekbrains.netty.selector03.server.entities;
  275.  
  276. import java.nio.channels.SelectionKey;
  277. import java.time.Instant;
  278. import java.util.Iterator;
  279. import java.util.Map;
  280. import java.util.NavigableMap;
  281. import java.util.concurrent.ConcurrentSkipListMap;
  282. import ru.geekbrains.netty.selector03.common.entities.Connection;
  283.  
  284.  
  285. // ByteBuffer cache for clients
  286. public class ConnectionList implements Iterable<Map.Entry<Integer, Connection>>{
  287.  
  288.     private static final int ROTTEN_INTERVAL = 100000000; // sec
  289.  
  290.  
  291.     private NavigableMap<Integer, Connection> connList = new ConcurrentSkipListMap<>();
  292.     private NavigableMap<Instant, Integer> connTimeList = new ConcurrentSkipListMap<>();
  293.  
  294.     public void add(SelectionKey key) {
  295.  
  296.         Instant now = Instant.now();
  297.         Connection connection = new Connection(key, now);
  298.  
  299.         int id = (int)key.attachment();
  300.         connList.put(id, connection);
  301.         connTimeList.put(now, id);
  302.     }
  303.  
  304.  
  305.     public Connection get(int id) {
  306.  
  307.         return connList.get(id);
  308.     }
  309.  
  310.  
  311.     public void update(int id) {
  312.  
  313.         Connection connection = connList.get(id);
  314.  
  315.         if (connection != null) {
  316.  
  317.  
  318.             connTimeList.remove(connection.getTime());
  319.  
  320.             Instant now = Instant.now();
  321.             connection.setTime(now);
  322.  
  323.             connTimeList.put(now, id);
  324.         }
  325.     }
  326.  
  327.     public void remove(int id) {
  328.  
  329.         Connection connection = connList.get(id);
  330.  
  331.         if (connection != null) {
  332.  
  333.             System.out.println("Removing connection #" + id);
  334.             connTimeList.remove(connection.getTime());
  335.  
  336.             connection.close();
  337.         }
  338.         connList.remove(id);
  339.     }
  340.  
  341.     /**
  342.      * Удаляет протухшие ключи
  343.      */
  344.     public void removeRotten() {
  345.  
  346.         //System.out.println("Removing rotten comnnections ...");
  347.  
  348.         Instant label = Instant.now().minusSeconds(ROTTEN_INTERVAL);
  349.  
  350.         NavigableMap<Instant, Integer> rotten = connTimeList.headMap(label , true);
  351.  
  352.         Iterator<Map.Entry<Instant, Integer>> it = rotten.entrySet().iterator();
  353.  
  354.         while(it.hasNext()) {
  355.  
  356.             Map.Entry<Instant, Integer> entry = it.next();
  357.             int id = entry.getValue();
  358.  
  359.             // remove from connList
  360.             remove(id);
  361.  
  362.             // remove from connTimeList
  363.             it.remove();
  364.  
  365.  
  366.         }
  367.     }
  368.  
  369.  
  370.  
  371.  
  372.  
  373.     @Override
  374.     public Iterator<Map.Entry<Integer,Connection>> iterator() {
  375.  
  376.         return connList.entrySet().iterator();
  377.     }
  378.  
  379. }
  380.  
  381.  
  382.  
  383.  
  384.  
  385.  
  386.  
  387. package ru.geekbrains.netty.selector03.server;
  388.  
  389. import ru.geekbrains.netty.selector03.common.entities.Connection;
  390. import ru.geekbrains.netty.selector03.common.entities.MessageType;
  391. import ru.geekbrains.netty.selector03.server.entities.ConnectionList;
  392. import ru.geekbrains.netty.selector03.server.entities.jobpool.BlockingJobPool;
  393. import ru.geekbrains.netty.selector03.server.serverActions.DirectoryReader;
  394.  
  395. import static ru.geekbrains.netty.selector03.common.entities.Utils.StringTochannel;
  396. import static ru.geekbrains.netty.selector03.common.entities.Utils.channelToString;
  397. import static ru.geekbrains.netty.selector03.common.entities.Utils.isNullOrEmpty;
  398.  
  399. import java.io.*;
  400. import java.net.InetSocketAddress;
  401. import java.nio.ByteBuffer;
  402. import java.nio.channels.*;
  403. import java.nio.file.Files;
  404. import java.nio.file.Path;
  405. import java.nio.file.Paths;
  406. import java.util.Iterator;
  407. import java.util.concurrent.*;
  408. import java.util.concurrent.atomic.AtomicInteger;
  409. import java.util.function.Function;
  410. import java.util.function.IntUnaryOperator;
  411.  
  412.  
  413. // https://www.programering.com/a/MTN1MDMwATk.html
  414. // https://www.ibm.com/developerworks/cn/java/l-niosvr/ => google-translate from china
  415.  
  416. // SelectionKey.isWritable() - protect socket from flooding
  417. // https://stackoverflow.com/questions/11360374/when-a-selectionkey-turns-writable-in-java-nio
  418.  
  419. public class FubarServer implements Runnable {
  420.  
  421.     private static final int ROTTEN_LATENCY = 1; //sec
  422.  
  423.     private ServerSocketChannel serverSocketChannel;
  424.     private Selector selector;
  425.     private final String welcomeString = "Fubar Transfer Protocol server приветствует вас.";
  426.  
  427.     // Non-negative AtomicInteger incrementator
  428.     private static IntUnaryOperator AtomicNonNegativeIntIncrementator = (i) -> i == Integer.MAX_VALUE ? 0 : i + 1;
  429.     // connection id generator
  430.     private static final AtomicInteger connectionIdGen =  new AtomicInteger();
  431.  
  432.     private ConnectionList connectionList = new ConnectionList();
  433.  
  434.     private BlockingJobPool<Void> jobPool =  new BlockingJobPool<>(4, this::onDone);
  435.  
  436.     private static final int PORT_NUMBER = 8000;
  437.     private static final String IP = "0.0.0.0";
  438.     private String dataRoot;
  439.  
  440.  
  441.     FubarServer() throws IOException {
  442.  
  443.         // Будут проблемы с путями
  444.         dataRoot = System.getProperty("user.dir") + "/data/";  //(? File.separator)
  445.         Files.createDirectories(Paths.get(dataRoot));
  446.  
  447.         serverSocketChannel = ServerSocketChannel.open();
  448.         serverSocketChannel.socket().bind(new InetSocketAddress(IP, PORT_NUMBER));
  449.         serverSocketChannel.configureBlocking(false);
  450.  
  451.         selector = Selector.open();
  452.         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  453.  
  454.         // start rottening old connections
  455.         scheduleDeleteRottenConnections();
  456.  
  457.     }
  458.  
  459.     @Override
  460.     public void run() {
  461.  
  462.         try {
  463.  
  464.             System.out.println("Серверо запущено " + IP + ":" + PORT_NUMBER);
  465.             Iterator<SelectionKey> it;
  466.             SelectionKey key;
  467.             // while true
  468.             while (serverSocketChannel.isOpen()) {
  469.  
  470.                 selector.select();
  471.                 it = selector.selectedKeys().iterator();
  472.  
  473.                 // ...
  474.                 if (selector.selectedKeys().size() == 0) {
  475.                     continue;
  476.                 }
  477.  
  478.                 //System.out.println("SELECTING: " + selector.selectedKeys().size());
  479.  
  480.                 while (it.hasNext()) {
  481.  
  482.                     key = it.next();
  483.  
  484.                     //System.out.println("KEY INTERESTS: " + key.interestOps());
  485.                     //System.out.println("KEY READY    : " + key.readyOps());
  486.  
  487.  
  488.                     it.remove();
  489.  
  490.                     // skip invalid keys (disconnected channels)
  491.                     if (!key.isValid())
  492.                         continue;
  493.  
  494.                     if (key.isAcceptable()) {
  495.                         acceptSocket(key);
  496.                     }
  497.  
  498.                     if (key.isReadable()) {
  499.  
  500.                         // Снипмаем подписку о получении новых данных в сокете
  501.                         // пока поток из пула читает данные из сокета
  502.                         // Чтобы не бегать бесконечно в цикле select
  503.                         removeInterest(key, SelectionKey.OP_READ);
  504.  
  505.                         // Читаем данные в отдельном потоке
  506.                         // Если прочиталось все сообщение, то назначаем его на выполнение
  507.                         // (выполняться будет потом в другом потоке)
  508.                         // Если целиком не прочиталось - будет дочитываться в других циклах select
  509.                         SelectionKey finalKey = key;
  510.                         jobPool.add(() -> {
  511.  
  512.                             try {
  513.  
  514.                                 //System.out.println("OP_READ job start");
  515.  
  516.                                 // Если сообщение принято целиком,
  517.                                 // то обработать его
  518.                                 if (readSocket(finalKey)) {
  519.                                     processInput(finalKey);
  520.                                 }
  521.  
  522.                                 // Возвращаем подписку на флаг чтения новых данных из сокета
  523.                                 setInterest(finalKey, SelectionKey.OP_READ);
  524.                                 // Будим селектор (могли подойти новые данные)
  525.                                 selector.wakeup();
  526.                             }
  527.                             catch (Exception e) {
  528.                                 e.printStackTrace();
  529.                             }
  530.                             return null;
  531.                         });
  532.                     }
  533.  
  534.  
  535.  
  536.                     // Интерес на запись выставляется отдельно
  537.                     // вручную при желании что-либо передать
  538.                     // либо внутри writeSocket(...) если затопился сокет и отправка не удалась
  539.                     if (key.isWritable()) {
  540.  
  541.                         // Чтобы не бегать бесконечно в цикле select
  542.                         // Пока потоки из пула пишут в сокеты
  543.                         removeInterest(key, SelectionKey.OP_WRITE);
  544.  
  545.                         // Пишем в отдельном потоке
  546.                         SelectionKey finalKey = key;
  547.                         jobPool.add(() -> {
  548.  
  549.                             try {
  550.  
  551.                                 //System.out.println("OP_WRITE job start");
  552.  
  553.                                 writeSocket(finalKey);
  554.  
  555.  
  556.                                 // Возвращаем подписку на флаг чтения новых данных из сокета
  557.                                 //setInterest(finalKey, SelectionKey.OP_WRITE);
  558.                                 // Будим селектор (могли подойти новые данные)
  559.                                 //selector.wakeup();
  560.  
  561.  
  562.                             }
  563.                             catch (Exception e) {
  564.                                 e.printStackTrace();
  565.                             }
  566.                             return null;
  567.                         });
  568.                     }
  569.                 }
  570.             }
  571.         } catch (Exception e) {
  572.             e.printStackTrace();
  573.         }
  574.     }
  575.  
  576.  
  577.     private void acceptSocket(SelectionKey key) {
  578.  
  579.         try {
  580.  
  581.             System.out.println("acceptSocket");
  582.  
  583.             //System.out.println(Thread.currentThread().toString());
  584.  
  585.             ServerSocketChannel serverSocket = (ServerSocketChannel)key.channel();
  586.             //System.out.println("LOCAL: " + serverSocket.getLocalAddress());
  587.  
  588.             SocketChannel client = serverSocket.accept();
  589.             System.out.println("REMOTE ENDPOINT: " + client.getRemoteAddress());
  590.  
  591.             // Нет свободных потоков - нечем обрабатывать клиента
  592.             if (jobPool.isFull()) {
  593.  
  594.                 System.out.println("No workers - disconnecting");
  595.                 client.close();
  596.                 return;
  597.             }
  598.  
  599.             int id = connectionIdGen.getAndUpdate(AtomicNonNegativeIntIncrementator);
  600.             client.configureBlocking(false);
  601.  
  602.             // регистрируемся на OP_READ на сокете клиента
  603.             SelectionKey clientKey = client.register(selector, SelectionKey.OP_READ, id);
  604.             setInterest(clientKey, SelectionKey.OP_WRITE);
  605.  
  606.             connectionList.add(clientKey);
  607.  
  608.             //SeekableByteChannel data = new SeekableInMemoryByteChannel(welcomeString.getBytes());
  609.  
  610.             Connection connection = connectionList.get(id);
  611.             SeekableByteChannel data = connection.getBufferedTransmitChannel();
  612.             // will write greeting to data
  613.             StringTochannel(welcomeString, data);
  614.  
  615.             // schedule sending greeting
  616.             scheduleWrite(clientKey, data);
  617.  
  618.             System.out.println("Подключился новый клиент #" + id);
  619.  
  620.         } catch (Exception e) {
  621.             e.printStackTrace();
  622.         }
  623.     }
  624.  
  625.  
  626.  
  627.  
  628.     /**
  629.      * Читаем из сокета данные, сколько их там накопилось
  630.      * Учитывая длину сообщения из заголовка
  631.      *
  632.      * @return boolean сообщение принято целиком
  633.      */
  634.     private boolean readSocket(SelectionKey key)  {
  635.  
  636.         boolean result = false;
  637.         int id = -1;
  638.  
  639.         try {
  640.  
  641.             //System.out.println("readSocket");
  642.  
  643.             SocketChannel client = (SocketChannel)key.channel();
  644.             id = (int)key.attachment();
  645.             Connection connection = connectionList.get(id);
  646.  
  647.             // Изначально не знаем что приедет - текст или файл
  648.             SeekableByteChannel data = connection.getReceiveChannel();
  649.  
  650.             ByteBuffer buffer = connection.getReadBuffer();
  651.  
  652.             boolean someDataHasReadied = false;
  653.  
  654.             int read; // сколько байт прочли из сокета
  655.  
  656.             // read >  0  - readied some data
  657.             // read =  0  - no data available (end of stream)
  658.             // read = -1  - closed connection
  659.  
  660.  
  661.             // подготавливаем буфер для чтения
  662.             buffer.clear();
  663.  
  664.             // Еще не читали заголовок сообщения из сокета
  665.             // Устанавливаем limit буффера в размер заголовка
  666.             // и будем читать только заголовок
  667.             if(!connection.isReceiveHeaderPresent()) {
  668.                 buffer.limit(8 + 1); // length + type
  669.             }
  670.             else {
  671.  
  672.                 // => проблема с ненужным вычитыванием начальных байтов следущего сообщения
  673.                 // если сообщения идут подряд -
  674.                 // то надо в начале вычитать только хедер, узнать сколько байт надо принять
  675.                 // а потом выставить limit буфера в
  676.                 // min(buffer.capacity, bytesRemaining)
  677.                 // Чтобы буфер не прочел данные за концом текущего сообщения
  678.                 // это будет начальные байты(заголовк) следущего сообщения
  679.                 buffer.limit((int)Math.min(
  680.                         (long)buffer.capacity(),
  681.                         connection.remainingBytesToRead()));
  682.             }
  683.  
  684.             while ((read = client.read(buffer)) > 0) { // ----------------------------------------------
  685.  
  686.                 buffer.flip();
  687.  
  688.                 // устанавливаем флаг что что-то смогли прочитать из сокета
  689.                 someDataHasReadied = true;
  690.  
  691.                 // Parse header if didn't do it before ---------------------------------------
  692.                 // Узнаем тип сообщения и его размер
  693.                 if (!connection.isReceiveHeaderPresent()) {
  694.                     MessageType messageType = connection.parseHeader();
  695.  
  696.                     // Определяемся, куда сохранять данные
  697.                     if(messageType == MessageType.TEXT) {
  698.  
  699.                         // берем из буферный канал для текста
  700.                         data = connection.getBufferedReceiveChannel();
  701.                     }
  702.                     else {
  703.                         // пишем в файл
  704.                         data = connection.createFileChannel(connection.getReceiveFilePath(), "rw");
  705.                     }
  706.                     // устанавливаем выбранный канал для connection в качестве канала-приемника
  707.                     connection.setReceiveChannel(data);
  708.  
  709.                 } // -------------------------------------------------------------------------
  710.  
  711.                 // уменьшаем количество оставшихся байт сообщения для чтения
  712.                 connection.decreaseRemainingBytesToRead(read);
  713.  
  714.  
  715.                 assert data != null;
  716.                 // пишем из буфера в канал
  717.                 data.write(buffer);
  718.  
  719.                 // опять настраиваем буфер, чтоб жизнь медом не казалась
  720.                 buffer.rewind();
  721.                 buffer.limit((int)Math.min(
  722.                         (long)buffer.capacity(),
  723.                         connection.remainingBytesToRead()));
  724.             } // ------------------------------------------------------------------------------------
  725.  
  726.             // -------------------------------------------------
  727.             // Если хоть что-то передалось
  728.             if (someDataHasReadied) {
  729.                 // refresh client TTL
  730.                 connectionList.update(id);
  731.             }
  732.             // -------------------------------------------------
  733.  
  734.             // Remote endpoint close connection
  735.             if (read < 0) {
  736.                 System.out.println(key.attachment() + " отключился");
  737.                 connectionList.remove(id); // will close socket channel
  738.                 return false;
  739.             }
  740.  
  741.             // Тут еще возможен вариант:
  742.             // прочли не все, возможно оставшиеся байты сообщения
  743.             // прижут позднее
  744.  
  745.             // Приняли все байты сообщения
  746.             if(connection.remainingBytesToRead() == 0)  {
  747.  
  748.                 // Сообщение прочиталось целиком
  749.                 result = true;
  750.  
  751.                 // возвращаем обратно возможность разбирать заголовок нового сообщения
  752.                 connection.setReceiveHeaderPresent(false);
  753.  
  754.                 // очищаем буффер для следущего приема (не обязательно)
  755.                 buffer.clear();
  756.  
  757.             }
  758.  
  759. //            assert data != null;
  760. //            long pos = data.position();
  761. //            System.out.println("R: " + channelToString(data));
  762. //            data.position(pos);
  763.  
  764.  
  765.         }
  766.         catch (Exception e) {
  767.             if (e instanceof ClosedChannelException) {
  768.                 System.out.println(key.attachment() + " отключился");
  769.                 // Remote endpoint close connection
  770.                 // (maybe not handled in "if (read < 0)")
  771.                 connectionList.remove(id); // will close socket channel
  772.             }
  773.             e.printStackTrace();
  774.         }
  775.  
  776.  
  777.  
  778.         //System.out.println("readSocket END");
  779.  
  780.         return result;
  781.     }
  782.  
  783.  
  784.  
  785.  
  786.  
  787.     private void writeSocket(SelectionKey key)  {
  788.  
  789.         int id = -1;
  790.  
  791.         try {
  792.  
  793.             //System.out.println("writeSocket");
  794.  
  795.             SocketChannel client = (SocketChannel)key.channel();
  796.             id = (int)key.attachment();
  797.             Connection connection = connectionList.get(id);
  798.  
  799.             ByteBuffer buffer = connection.getWriteBuffer();
  800.             SeekableByteChannel data = connection.getTransmitChannel();
  801.  
  802.             boolean someDataHasSend = false;
  803.  
  804.             int wrote;    // сколько байт записали в сокет
  805.             int dataRead; // сколько байт прочли из канала
  806.  
  807.             // wrote  >  0  - wrote some data
  808.             // wrote  =  0  - no data written    // need register(selector, SelectionKey.OP_WRITE, id);
  809.  
  810.             int bufferRemaining = -1;
  811.  
  812.             // пишем в сокет, пока есть что передавать
  813.             // и сокет принимает данные (не затопился)
  814.             while (data.position() < data.size()) {
  815.  
  816.                 // Add header if absent
  817.                 if (!connection.isTransmitHeaderPresent()) {
  818.                     connection.writeHeader();
  819.                 }
  820.  
  821.                 //  читаем из канала в буффер
  822.                 dataRead = data.read(buffer);
  823.                 buffer.flip();
  824.  
  825.                 // пишем в сокет
  826.                 wrote = client.write(buffer);
  827.  
  828.                 bufferRemaining = buffer.remaining();
  829.  
  830. //                if (buffer.remaining() > 0) {
  831. //                    System.out.println(buffer.remaining() );
  832. //                }
  833.  
  834.  
  835.                 if (!someDataHasSend) {
  836.                     someDataHasSend = wrote > 0;
  837.                 }
  838.  
  839.                 // что не залезло в сокет помещаем в начало буфера
  840.                 buffer.compact();
  841.  
  842.                 // socket stall
  843.                 // оставляем сокет в покое
  844.                 if (bufferRemaining > 0) {
  845.                     //System.out.println("WR: " + wrote);
  846.                     break;
  847.                 }
  848.  
  849.             }
  850.  
  851.             // -------------------------------------------------
  852.             // Если хоть что-то передалось - refresh client TTL
  853.             if (someDataHasSend) {
  854.                 connectionList.update(id);
  855.             }
  856.             // -------------------------------------------------
  857.  
  858.             // причем, если не отправилось по сети, то в buffer будет лежать кусок
  859.             // (скопированный из data), который так и не отправился
  860.             // буффер нужно очистить, а transmitChannel отмотать назад на размер прочтенных байт из data
  861.             //
  862.             // Это все к тому, то нельзя начинать передавать новые данные, пока по сети не передалось
  863.             // текущее сообщение
  864.  
  865.  
  866.             // Не смогли передать все данные
  867.             // Не прочитано целиком все из канала / остался последний неотправленный кусок в buffer
  868.             // Флудим сокет данными
  869.             // он не успевает передавать тут / принимать на удаленном конце
  870.             // регистрируемся на флаг что удаленный сокет может принимать сообщения
  871.             // чтобы возобновить передачу как сокет будет готов передавать
  872.             if (data.position() < data.size() ||
  873.                 bufferRemaining > 0) {
  874.  
  875.                 // Сохранить непереданный кусок данных для следущего цикла передачи
  876.                 // отмотаем transmitChannel назад на размер данных в буффере (которые не передались)
  877.  
  878.                 //System.out.println(data.position() + " / " + data.size());
  879.  
  880.                 // Выставляем бит OP_WRITE в 1 (подписываемся на флаг готовности сокета отправлять данные)
  881.                 setInterest(key, SelectionKey.OP_WRITE);
  882.  
  883.                 // будим селектор (будем отправлять данные в следущем цикле)
  884.                 selector.wakeup();
  885.  
  886.             }
  887.             // -------------------------------------------------------------------------
  888.             // Все успешно записалось, data.position == data.size
  889.             else {
  890.  
  891.                 // возвращаем обратно возможность писать заголовок для нового сообщения
  892.                 // восстанавливаем новый цикл записи сообщений
  893.                 connection.setTransmitHeaderPresent(false);
  894.  
  895.                 //System.out.println(data.position() + " / " + data.size());
  896.  
  897.                 // Закрываем файловый канал (откуда писали в сокет)
  898.                 if (connection.getChannelType(data) == MessageType.BINARY) {
  899.                     data.close();
  900.                 }
  901.                 // Если это был текстовый канал, то ничего не делаем,
  902.                 // он там переиспользуется
  903.  
  904.  
  905.                 // обнуляем ссылку на transmitChannel
  906.                 // (Защита от записи в сокет нового сообщения,
  907.                 // если он еще не закончил передачу текущих данных)
  908.                 connection.setTransmitChannel(null);
  909.  
  910.                 // очищаем буффер для следущей передачи
  911.                 buffer.clear();
  912.  
  913.                 // отписываемся от оповещения что сокет готов передавать данные
  914.                 // Выставляем в ключе бит OP_WRITE в 0 (отписываемся)
  915.                 removeInterest(key, SelectionKey.OP_WRITE);
  916.             }
  917.             // -------------------------------------------------------------------------
  918.  
  919.  
  920. //            long pos = data.position();
  921. //            System.out.println("T: " + channelToString(data));
  922. //            data.position(pos);
  923.  
  924.         }
  925.         catch (Exception e) {
  926.             if (e instanceof ClosedChannelException) {
  927.                 System.out.println(key.attachment() + " отключился");
  928.                 // Remote endpoint close connection
  929.                 connectionList.remove(id); // will close socket channel
  930.             }
  931.             e.printStackTrace();
  932.         }
  933.  
  934.         //System.out.println("writeSocket END");
  935.     }
  936.  
  937.     // -------------------------------------------------------------------------------
  938.  
  939.  
  940.     private void setInterest(SelectionKey key, int interest) {
  941.  
  942.         //System.out.println("setInterest ON: " + interest);
  943.  
  944.         if (key.isValid() &&
  945.             (key.interestOps() & interest) == 0) {
  946.  
  947.             int current = key.interestOps();
  948.             key.interestOps(current | interest);
  949.         }
  950.     }
  951.  
  952.  
  953.     private void removeInterest(SelectionKey key, int interest) {
  954.  
  955.         //System.out.println("setInterest OFF: " + interest);
  956.  
  957.         if (key.isValid() &&
  958.             (key.interestOps() & interest) != 0) {
  959.  
  960.             int current = key.interestOps();
  961.             key.interestOps(current & ~interest);
  962.         }
  963.     }
  964.  
  965.  
  966.  
  967.  
  968.     // =============================================================================
  969.  
  970.  
  971.  
  972.  
  973.     /**
  974.      * Планрует запись в сокет
  975.      * <br>
  976.      * Подготавливает данные для записи в сокет,
  977.      * подписывается(устанавливает) на флаг возможности записи в сокет
  978.      */
  979.     private void scheduleWrite(SelectionKey key, SeekableByteChannel data) {
  980.  
  981.         try {
  982.             // Т.к. все асинхронное (несколько потоков)
  983.             // То одному и тому же клиенту могут начать отправлять одновременно несколько сообщений -
  984.             // Надо делать очередь сообщений (на отправку) для клиента.
  985.             // (Если не охота потом принимать байты(куски байт) сообщений в перемешанном порядке)
  986.             // (Люди говорят для TCP такое можно устроить, для UDP - нет)
  987.             // (Потом, например, удалять только те сообщения, котороые удалось доставить, и т.д.)
  988.  
  989.             int id = (int)key.attachment();
  990.             Connection connection = connectionList.get(id);
  991.  
  992.             // Проверить нет ли текущих данных на отправку (в connectionList)
  993.             // и если есть, то не отправлять - просто потерять это данные (ибо нефиг)
  994.  
  995.             // Можно, конечно валить все в сокет, (и больше ~3 метров в неблокирующем режиме не залезет)
  996.             // дальше данные начнут теряться уже в сетевой подсистеме ядра при переполнении буффера сокета
  997.             if (connection.getTransmitChannel() != null) {
  998.                 System.out.println("Внимание - обнаружена попытка одновременной передачи, данные НЕ отправлены");
  999.                 return;
  1000.             }
  1001.  
  1002.             // Задаем сокету данные на передачу
  1003.             data.position(0);
  1004.             connection.setTransmitChannel(data);
  1005.  
  1006.             setInterest(key, SelectionKey.OP_WRITE);
  1007.  
  1008.         } catch (Exception e) {
  1009.             e.printStackTrace();
  1010.         }
  1011.     }
  1012.  
  1013.  
  1014.  
  1015.  
  1016.  
  1017.  
  1018.     /**
  1019.      * Process received channel (containing text command or file)
  1020.      */
  1021.     private void processInput(SelectionKey key) {
  1022.  
  1023.         try {
  1024.             int id = (int)key.attachment();
  1025.             Connection connection = connectionList.get(id);
  1026.             SeekableByteChannel receiveChannel =  connection.getReceiveChannel();
  1027.  
  1028.             // Text message
  1029.             if (connection.getChannelType(connection.getReceiveChannel()) == MessageType.TEXT) {
  1030.  
  1031.                 String command = channelToString(receiveChannel);
  1032.  
  1033.                 processCommand(key, command);
  1034.  
  1035.                 // будем использовать повторно, без создания нового channel
  1036.                 // receiveChannel backed by bufferedReceiveChannel
  1037.                 // поэтому не закрываем, а truncate до 0
  1038.                 receiveChannel.position(0);
  1039.                 receiveChannel.truncate(0);
  1040.             }
  1041.             // File message
  1042.             else {
  1043.  
  1044.                 // там в прошлом через команду уже был настроен файл для приема
  1045.                 // И в readSocket() файл уже записался.
  1046.                 // Поэтому просто закрываем
  1047.                 receiveChannel.close();
  1048.             }
  1049.  
  1050.         } catch (IOException e) {
  1051.             e.printStackTrace();
  1052.         }
  1053.  
  1054.     }
  1055.  
  1056.  
  1057.     /**
  1058.      * Parse and process user commands
  1059.      * Then reply to user in TEXT MODE
  1060.      * (hope text reply will not concat with subsequent binary stream)
  1061.      */
  1062.     private void processCommand(SelectionKey key, String command) {
  1063.  
  1064.         SeekableByteChannel data = null;
  1065.         String textResponse = null;
  1066.  
  1067.         int id = (int)key.attachment();
  1068.         Connection connection = connectionList.get(id);
  1069.  
  1070. //        // No input
  1071. //        if (isNullOrEmpty(command)) {
  1072. //            return;
  1073. //        }
  1074.  
  1075.         String[] parts = command.split(" ");
  1076.  
  1077.         switch (parts[0]) {
  1078.  
  1079.             case "ls":
  1080.                 Function<String,String> dirNfo = new DirectoryReader();
  1081.                 textResponse = dirNfo.apply(dataRoot);
  1082.  
  1083.                 if (textResponse.equals("")) {
  1084.                     textResponse = ".";
  1085.                 }
  1086.                 else {
  1087.                     textResponse = ".\n" + textResponse;                    
  1088.                 }
  1089.  
  1090.                 break;// ---------------------------------------------------------
  1091.  
  1092.  
  1093.             case "get":
  1094.  
  1095.                 // file name not specified
  1096.                 if (parts.length < 2 ||
  1097.                     isNullOrEmpty(parts[1])) {
  1098.  
  1099.                     textResponse = "invalid command args";
  1100.                     break;
  1101.                 }
  1102.  
  1103.  
  1104.                 Path filePath = Paths.get(dataRoot + parts[1]);
  1105.  
  1106.                 // file not exists
  1107.                 if (!Files.exists(filePath)) {
  1108.                     textResponse = "file not exists";
  1109.                     break;
  1110.                 }
  1111.  
  1112.                 // get file
  1113.                 try {
  1114.                     // будем отвечать клиенту файловым каналом
  1115.                     data = connection.createFileChannel(filePath, "r");
  1116.                 }
  1117.                 catch (Exception e) {
  1118.                     textResponse = "I/O error";
  1119.                     e.printStackTrace();
  1120.                 }
  1121.  
  1122.                 break;// ---------------------------------------------------------
  1123.  
  1124.  
  1125.             case "put":
  1126.  
  1127.                 // file name not specified
  1128.                 if (parts.length < 2 ||
  1129.                     isNullOrEmpty(parts[1])) {
  1130.  
  1131.                     textResponse = "invalid command args";
  1132.                     break;
  1133.                 }
  1134.  
  1135.                 // set file
  1136.                 try {
  1137.                     filePath = Paths.get(dataRoot + parts[1]);
  1138.  
  1139.                     connection.setReceiveFilePath(filePath);
  1140.  
  1141.                     textResponse = "ok"; // response OK
  1142.                 }
  1143.                 catch (Exception e) {
  1144.                     textResponse = "I/O error";
  1145.                     e.printStackTrace();
  1146.                 }
  1147.  
  1148.                 break;// ---------------------------------------------------------
  1149.  
  1150.             case "":
  1151.  
  1152.                 textResponse = "nop";
  1153.  
  1154.                 break;// ---------------------------------------------------------
  1155.  
  1156.  
  1157.             default:
  1158.                 textResponse = "unknown command";
  1159.                 break;// ---------------------------------------------------------
  1160.         }
  1161.  
  1162.  
  1163.         // Конвертируем текст - результат выполнения команды в channel
  1164.         if (textResponse != null) {
  1165.  
  1166.             data = connection.getBufferedTransmitChannel();
  1167.             // will write textResponse to data
  1168.             StringTochannel(textResponse, data);
  1169.         }
  1170.  
  1171.         assert data != null;
  1172.  
  1173.         // schedule sending response to command (text)
  1174.         scheduleWrite(key, data);
  1175.     }
  1176.  
  1177.  
  1178.  
  1179.     // =============================================================================
  1180.  
  1181.  
  1182.     public static void main(String[] args) throws IOException {
  1183.  
  1184.         Thread t = new Thread(new FubarServer());
  1185.         //t.setDaemon(false);
  1186.         t.start();
  1187.     }
  1188.  
  1189.  
  1190.  
  1191.  
  1192.  
  1193.  
  1194.  
  1195.  
  1196.  
  1197.     // =============================================================================
  1198.  
  1199.  
  1200.     // Schedule rottening old connections
  1201.     private void scheduleDeleteRottenConnections() {
  1202.  
  1203.         ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(
  1204.                 new ThreadFactory() {
  1205.                     public Thread newThread(Runnable r) {
  1206.                         Thread t = Executors.defaultThreadFactory().newThread(r);
  1207.                         t.setDaemon(true);
  1208.                         return t;
  1209.                     }
  1210.                 });
  1211.         service.scheduleAtFixedRate(
  1212.                 () -> connectionList.removeRotten(), ROTTEN_LATENCY, ROTTEN_LATENCY, TimeUnit.SECONDS);
  1213.     }
  1214.  
  1215.  
  1216.  
  1217.     public void onDone(Void v) {
  1218.         //System.out.println("Done");
  1219.     }
  1220. }
  1221.  
  1222.  
  1223.  
  1224.  
  1225.  
  1226. package ru.geekbrains.netty.selector03.server.serverActions;
  1227.  
  1228. import java.nio.file.DirectoryStream;
  1229. import java.nio.file.Files;
  1230. import java.nio.file.Path;
  1231. import java.nio.file.Paths;
  1232. import java.util.function.Function;
  1233.  
  1234. public class DirectoryReader implements Function<String,String> {
  1235.  
  1236.     @Override
  1237.     public String apply(String dir) {
  1238.  
  1239.         String result = null;
  1240.  
  1241.         try {
  1242.             StringBuilder sb = new StringBuilder();
  1243.             Path path = Paths.get(dir);
  1244.             try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
  1245.                 for (Path entry : stream) {
  1246.                     sb.append(entry.getFileName()).append("\n");
  1247.                 }
  1248.             }
  1249.             result = sb.toString().trim();
  1250.         } catch (Exception e) {
  1251.             e.printStackTrace();
  1252.         }
  1253.  
  1254.         return result;
  1255.     }
  1256. }
  1257.  
  1258.  
  1259.  
  1260.  
  1261.  
  1262.  
  1263. package ru.geekbrains.netty.selector03.server;
  1264.  
  1265. import org.apache.commons.compress.utils.SeekableInMemoryByteChannel;
  1266. import java.nio.channels.SeekableByteChannel;
  1267.  
  1268. /**
  1269.  * Hello world!
  1270.  *
  1271.  */
  1272. public class App
  1273. {
  1274.  
  1275.     public static void main(String[] args) {
  1276.  
  1277.         new App();
  1278.     }
  1279.    
  1280.    
  1281.  
  1282.     App() {
  1283.  
  1284.     }
  1285.  
  1286. }
  1287.  
  1288.  
  1289.  
  1290.  
  1291. package ru.geekbrains.netty.selector03.common.entities;
  1292.  
  1293. public enum MessageType {
  1294.     TEXT((byte)0),
  1295.     BINARY((byte)1);
  1296.  
  1297.     private byte value;
  1298.  
  1299.  
  1300.     MessageType(byte value) {
  1301.         this.value = value;
  1302.     }
  1303.  
  1304.     public byte getValue() {
  1305.         return value;
  1306.     }
  1307.  
  1308.     public static MessageType parse(byte value) {
  1309.  
  1310.         MessageType result = null;
  1311.  
  1312.  
  1313.         if (value == 0) {
  1314.             result = MessageType.TEXT;
  1315.         }
  1316.         else if (value == 1) {
  1317.             result = MessageType.BINARY;
  1318.         }
  1319.         return result;
  1320.     }
  1321.  
  1322.  
  1323. }
  1324.  
  1325.  
  1326.  
  1327.  
  1328.  
  1329. package ru.geekbrains.netty.selector03.common.entities;
  1330.  
  1331. import org.apache.commons.compress.utils.SeekableInMemoryByteChannel;
  1332.  
  1333. import java.io.FileNotFoundException;
  1334. import java.io.IOException;
  1335. import java.io.RandomAccessFile;
  1336. import java.nio.ByteBuffer;
  1337. import java.nio.channels.*;
  1338. import java.nio.file.Path;
  1339. import java.time.Instant;
  1340.  
  1341. public class Connection {
  1342.  
  1343.     public static final int BUFFER_SIZE = 1024*1024; // read and write buffer size
  1344.  
  1345.  
  1346.     private SelectionKey key;
  1347.     private SocketChannel channel;  // сокет
  1348.     private ByteBuffer readBuffer;  // промежуточный буффер на чтение
  1349.     private ByteBuffer writeBuffer; // промежуточный буффер на запись
  1350.     private Instant time;
  1351.     private SeekableByteChannel transmitChannel;  // канал на передачу данных клиенту
  1352.     private SeekableByteChannel receiveChannel;   // канал на прием данных от клиента
  1353.  
  1354.     // чтобы не плодить каналов через new()
  1355.     public SeekableByteChannel bufferedTransmitChannel; // Используется для передачи текстовых данных
  1356.     private SeekableByteChannel bufferedReceiveChannel; // Используется для приема текстовых данных
  1357.  
  1358.     private boolean transmitHeaderPresent; // заголовок принимаемого сообщения был прочитан
  1359.     private boolean receiveHeaderPresent; // в отправляемое сообщение был записан заголовок
  1360.  
  1361.     //private ByteBuffer data;        // emulating data(file) needed to be transferred to client
  1362.  
  1363.     private Path receiveFilePath;     // путь к файлу для приема
  1364.  
  1365.     private long remainingBytesToRead;
  1366.     //private ByteArrayOutputStream bufferStream; // работает с readBuffer при приеме текстового сообщения
  1367.  
  1368.     // хотя можно просто унаследоваться от SeekableByteChannel и сделать два Channel
  1369.     // TextChannel и BinaryChannel - в одном текст, в другом - файл
  1370.     //private MessageType receiveMessageType;    // received message type
  1371.  
  1372.  
  1373.  
  1374.     public Connection() {
  1375.  
  1376.         bufferedTransmitChannel = new SeekableInMemoryByteChannel();
  1377.         bufferedReceiveChannel = new SeekableInMemoryByteChannel();
  1378.  
  1379.         this.readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
  1380.         this.writeBuffer = ByteBuffer.allocate(BUFFER_SIZE);
  1381.     }
  1382.  
  1383.     public Connection(SelectionKey key, Instant time) {
  1384.         this();
  1385.  
  1386.         this.key = key;
  1387.         this.channel = (SocketChannel)key.channel();
  1388.         this.time = time;
  1389.     }
  1390.  
  1391.     public Connection(SocketChannel channel) {
  1392.         this();
  1393.  
  1394.         this.channel = channel;
  1395.     }
  1396.  
  1397.     public ByteBuffer getReadBuffer() {
  1398.         return readBuffer;
  1399.     }
  1400.  
  1401.     public ByteBuffer getWriteBuffer() {
  1402.         return writeBuffer;
  1403.     }
  1404.  
  1405.     public SelectionKey getKey() {
  1406.         return key;
  1407.     }
  1408.  
  1409.     public SocketChannel getChannel() {
  1410.         return channel;
  1411.     }
  1412.  
  1413.     public Instant getTime() {return time;}
  1414.  
  1415.     public void setTime(Instant time) {this.time = time;}
  1416.  
  1417. /*    public RandomAccessFile getFile() {return file;}
  1418.  
  1419.     public void setFile(RandomAccessFile file) {this.file = file;}*/
  1420.  
  1421.     public SeekableByteChannel getTransmitChannel() {
  1422.         return transmitChannel;
  1423.     }
  1424.  
  1425.     public void setTransmitChannel(SeekableByteChannel transmitChannel) {
  1426.         this.transmitChannel = transmitChannel;
  1427.     }
  1428.  
  1429.     public SeekableByteChannel getReceiveChannel() {
  1430.         return receiveChannel;
  1431.     }
  1432.  
  1433.     public void setReceiveChannel(SeekableByteChannel receiveChannel) {
  1434.         this.receiveChannel = receiveChannel;
  1435.     }
  1436.  
  1437.     public boolean isTransmitHeaderPresent() {
  1438.         return transmitHeaderPresent;
  1439.     }
  1440.  
  1441.     public void setTransmitHeaderPresent(boolean transmitHeaderPresent) {
  1442.         this.transmitHeaderPresent = transmitHeaderPresent;
  1443.     }
  1444.  
  1445.  
  1446.     public boolean isReceiveHeaderPresent() {
  1447.         return receiveHeaderPresent;
  1448.     }
  1449.  
  1450.     public void setReceiveHeaderPresent(boolean receiveHeaderPresent) {
  1451.         this.receiveHeaderPresent = receiveHeaderPresent;
  1452.     }
  1453.  
  1454.     public long remainingBytesToRead() {
  1455.         return remainingBytesToRead;
  1456.     }
  1457.  
  1458.     public void remainingBytesToRead(long remainingBytesToRead) {
  1459.         this.remainingBytesToRead = remainingBytesToRead;
  1460.     }
  1461.  
  1462.     public void decreaseRemainingBytesToRead(long amount) {
  1463.         remainingBytesToRead-= amount;
  1464.     }
  1465.  
  1466.     public Path getReceiveFilePath() {
  1467.         return receiveFilePath;
  1468.     }
  1469.  
  1470.     public void setReceiveFilePath(Path receiveFilePath) {
  1471.         this.receiveFilePath = receiveFilePath;
  1472.     }
  1473.  
  1474.     public SeekableByteChannel getBufferedTransmitChannel() {
  1475.  
  1476.         try {
  1477.             bufferedTransmitChannel.position(0);
  1478.             bufferedTransmitChannel.truncate(0);
  1479.  
  1480.  
  1481.         } catch (IOException e) {
  1482.             e.printStackTrace();
  1483.         }
  1484.         return bufferedTransmitChannel;
  1485.     }
  1486.  
  1487.  
  1488.     public SeekableByteChannel getBufferedReceiveChannel() {
  1489.  
  1490.         try {
  1491.             bufferedReceiveChannel.position(0);
  1492.             bufferedReceiveChannel.truncate(0);
  1493.  
  1494.  
  1495.         } catch (IOException e) {
  1496.             e.printStackTrace();
  1497.         }
  1498.         return bufferedReceiveChannel;
  1499.     }
  1500.  
  1501.  
  1502.     // -----------------------------------------------------------------------------------
  1503.  
  1504.     public void close() {
  1505.  
  1506.         // close socket
  1507.         if (channel != null &&
  1508.             channel.isOpen()) {
  1509.  
  1510.             try {
  1511.                 channel.close();
  1512.             } catch (IOException ignored) {}
  1513.         }
  1514.  
  1515.         // close transmitChannel
  1516.         if (transmitChannel != null) {
  1517.             try {
  1518.                 transmitChannel.close();
  1519.             } catch (IOException ignored) {}
  1520.         }
  1521.  
  1522.         // close receiveChannel
  1523.         if (receiveChannel != null) {
  1524.             try {
  1525.                 receiveChannel.close();
  1526.             } catch (IOException ignored) {}
  1527.         }
  1528.     }
  1529.  
  1530.  
  1531.  
  1532.     /**
  1533.      * Write header to connection.writeBuffer
  1534.      */
  1535.     public void writeHeader() {
  1536.  
  1537.         try {
  1538.  
  1539.             assert writeBuffer.position() == 0;
  1540.  
  1541.             // write message size
  1542.             writeBuffer.putLong(transmitChannel.size());
  1543.  
  1544.             // write message type
  1545.             writeBuffer.put(getChannelType(transmitChannel).getValue());
  1546.  
  1547.             setTransmitHeaderPresent(true);
  1548.  
  1549.         } catch (Exception e) {
  1550.             e.printStackTrace();
  1551.         }
  1552.     }
  1553.  
  1554.     /**
  1555.      * Read header from connection.readBuffer
  1556.      */
  1557.     public MessageType parseHeader() {
  1558.  
  1559.         MessageType result = null;
  1560.  
  1561.         try {
  1562.  
  1563.             // return to beginning of buffer
  1564.             readBuffer.rewind();
  1565.  
  1566.             // get payload length
  1567.             remainingBytesToRead = readBuffer.getLong();
  1568.  
  1569.             // позволим дальше писать в буфер
  1570.             //readBuffer.limit(readBuffer.capacity());
  1571.  
  1572.             // get message type
  1573.             result = MessageType.parse(readBuffer.get());
  1574.  
  1575.  
  1576.             // увеличим количество байт для чтения на размер заголовка (8+1)
  1577.             // т.к. в цикле readSocket(..)
  1578.             // в количество прочитанных байт из сокета 'read' вошли как длина заголовока,
  1579.             // так и число прочтенных байт самого сообщения, поэтому возвращаем обратно
  1580.             // (помечаем ка кнепрочитанные)
  1581.             remainingBytesToRead += (8 + 1);  // int64 - message size + 1 byte message type
  1582.  
  1583.             setReceiveHeaderPresent(true);
  1584.  
  1585.         } catch (Exception e) {
  1586.             e.printStackTrace();
  1587.         }
  1588.  
  1589.         return result;
  1590.     }
  1591.  
  1592.     // =====================================================================
  1593.  
  1594.  
  1595.     public MessageType getChannelType(SeekableByteChannel channel) {
  1596.  
  1597.         MessageType result = null;
  1598.  
  1599.         if (channel instanceof SeekableInMemoryByteChannel) {
  1600.             result = MessageType.TEXT;
  1601.         }
  1602.         else if (channel instanceof FileChannel) {
  1603.             result = MessageType.BINARY;
  1604.         }
  1605.  
  1606.         return result;
  1607.     }
  1608.  
  1609.  
  1610.  
  1611.     public FileChannel createFileChannel(Path path, String mode) {
  1612.  
  1613.         FileChannel result = null;
  1614.         try {
  1615.  
  1616.             RandomAccessFile file = new RandomAccessFile(path.toString(), mode);
  1617.             result = file.getChannel();
  1618.  
  1619.         } catch (FileNotFoundException e) {
  1620.             e.printStackTrace();
  1621.         }
  1622.         return result;
  1623.     }
  1624.  
  1625.  
  1626.  
  1627.  
  1628.  
  1629. }
  1630. package ru.geekbrains.netty.selector03.common.entities;
  1631.  
  1632. import java.io.IOException;
  1633. import java.nio.ByteBuffer;
  1634. import java.nio.channels.SeekableByteChannel;
  1635. import java.nio.charset.StandardCharsets;
  1636.  
  1637. public class Utils {
  1638.  
  1639.     public static String channelToString(SeekableByteChannel channel) {
  1640.  
  1641.         String result = null;
  1642.  
  1643.         try {
  1644.             ByteBuffer buffer = ByteBuffer.allocate((int)channel.size());
  1645.             channel.position(0);
  1646.             channel.read(buffer);
  1647.             buffer.flip();
  1648.  
  1649.             result = new String( buffer.array(), StandardCharsets.UTF_8);
  1650.         } catch (IOException e) {
  1651.             e.printStackTrace();
  1652.         }
  1653.  
  1654.         return result;
  1655.     }
  1656.  
  1657.     public static void StringTochannel(String text, SeekableByteChannel outChannel) {
  1658.  
  1659.         SeekableByteChannel result = null;
  1660.  
  1661.         try {
  1662.             ByteBuffer tmpBuffer = ByteBuffer.wrap(text.getBytes());
  1663.  
  1664.             outChannel.position(0);
  1665.             outChannel.truncate(tmpBuffer.capacity());
  1666.             outChannel.write(tmpBuffer);
  1667.         }
  1668.         catch (Exception e) {
  1669.             e.printStackTrace();
  1670.         }
  1671.     }
  1672.  
  1673.  
  1674.     public static void copyBuffer(ByteBuffer src, ByteBuffer dst) {
  1675.  
  1676.         int maxTransfer = Math.min(dst.remaining(), src.remaining());
  1677.  
  1678.         // use a duplicated(backed on original) buffer so we don't disrupt the limit of the original buffer
  1679.         ByteBuffer tmp = src.duplicate();
  1680.         tmp.limit(tmp.position() + maxTransfer);
  1681.         dst.put(tmp);
  1682.  
  1683.         // now discard the data we've copied from the original source (optional)
  1684.         src.position(src.position() + maxTransfer);
  1685.     }
  1686.  
  1687.     public static boolean isNullOrEmpty(Object object) {
  1688.  
  1689.         return object == null || object.getClass() == String.class && ((String)object).trim().isEmpty();
  1690.     }
  1691.  
  1692.  
  1693. }
  1694.  
  1695.  
  1696.  
  1697.  
  1698.  
  1699. package ru.geekbrains.netty.selector03.client;
  1700.  
  1701. import org.apache.commons.compress.utils.SeekableInMemoryByteChannel;
  1702. import ru.geekbrains.netty.selector03.common.entities.Connection;
  1703. import ru.geekbrains.netty.selector03.common.entities.MessageType;
  1704.  
  1705. import java.io.BufferedReader;
  1706. import java.io.IOException;
  1707. import java.io.InputStreamReader;
  1708. import java.net.InetSocketAddress;
  1709. import java.nio.ByteBuffer;
  1710. import java.nio.channels.ClosedChannelException;
  1711. import java.nio.channels.SeekableByteChannel;
  1712. import java.nio.channels.SocketChannel;
  1713. import java.nio.charset.StandardCharsets;
  1714. import java.nio.file.Files;
  1715. import java.nio.file.Path;
  1716. import java.nio.file.Paths;
  1717.  
  1718. import static ru.geekbrains.netty.selector03.common.entities.Utils.channelToString;
  1719. import static ru.geekbrains.netty.selector03.common.entities.Utils.isNullOrEmpty;
  1720.  
  1721. public class FubarClient implements Runnable {
  1722.  
  1723.     private static final int PORT_NUMBER = 8000;
  1724.     private static final String SERVER_HOST = "127.0.0.1";
  1725.  
  1726.     private SocketChannel socketChannel;
  1727.     private String dataRoot;
  1728.     private Connection connection;
  1729.     private SeekableByteChannel pendingFileTransfer;
  1730.  
  1731.     public FubarClient() throws IOException {
  1732.  
  1733.         // Будут проблемы с путями
  1734.         dataRoot = System.getProperty("user.dir") + "/data/";  //(? File.separator)
  1735.         Files.createDirectories(Paths.get(dataRoot));
  1736.  
  1737.         // in blocking mode
  1738.         socketChannel = SocketChannel.open();
  1739.         socketChannel.connect((new InetSocketAddress(SERVER_HOST, PORT_NUMBER)));
  1740.  
  1741.         //noinspection ConstantConditions
  1742.         connection = new Connection(socketChannel);
  1743.     }
  1744.  
  1745.  
  1746.     @Override
  1747.     public void run() {
  1748.  
  1749.         try {
  1750.  
  1751.             // reading server greeting
  1752.             readSocket();
  1753.  
  1754.             // print greeting to user
  1755.             processResponse();
  1756.  
  1757.             BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
  1758.  
  1759.             //noinspection InfiniteLoopStatement
  1760.             while(true) {
  1761.  
  1762.                 // get user input
  1763.                 String input = br.readLine();
  1764.  
  1765.                 String res = parseUserInput(input);
  1766.  
  1767.                 // invalid user input
  1768.                 if (!isNullOrEmpty(res)) {
  1769.                     System.out.println(res);
  1770.                     continue;
  1771.                 }
  1772.  
  1773.                 //send to server
  1774.                 SeekableByteChannel data = new SeekableInMemoryByteChannel(input.getBytes());
  1775.                 connection.setTransmitChannel(data);
  1776.                 writeSocket();
  1777.  
  1778.                 // reading response
  1779.                 readSocket();
  1780.  
  1781.                 processResponse();
  1782.  
  1783.                 // transfer file to server (if scheduled one)
  1784.                 if (pendingFileTransfer!= null) {
  1785.                     connection.setTransmitChannel(pendingFileTransfer);
  1786.                     writeSocket();
  1787.                 }
  1788.  
  1789.  
  1790.             }
  1791.         }
  1792.         catch (Exception e) {
  1793.             e.printStackTrace();
  1794.         }
  1795.  
  1796.     }
  1797.  
  1798.  
  1799.  
  1800.     /**
  1801.      * Читаем из сокета данные, сколько их там накопилось
  1802.      * Учитывая длину сообщения из заголовка
  1803.      */
  1804.     private void readSocket()  {
  1805.  
  1806.         try {
  1807.  
  1808.             //System.out.println("readSocket");
  1809.  
  1810.             SocketChannel client = connection.getChannel();
  1811.             ByteBuffer buffer = connection.getReadBuffer();
  1812.  
  1813.             // Изначально не знаем что приедет - текст или файл
  1814.             SeekableByteChannel data = connection.getReceiveChannel();
  1815.  
  1816.             int read;
  1817.  
  1818.             // read >  0  - readied some data
  1819.             // read =  0  - no data available (end of stream)
  1820.             // read = -1  - closed connection
  1821.  
  1822.  
  1823.  
  1824.             // подготавливаем буфер для чтения
  1825.             buffer.clear();
  1826.  
  1827.             long totalToReceive = -1;
  1828.  
  1829.             // см FubarServer
  1830.             if(!connection.isReceiveHeaderPresent()) {
  1831.                 buffer.limit(8 + 1);
  1832.             }
  1833.             else {
  1834.  
  1835.                 buffer.limit((int)Math.min(
  1836.                         (long)buffer.capacity(),
  1837.                         connection.remainingBytesToRead()));
  1838.             }
  1839.  
  1840.             while ((read = client.read(buffer)) > 0) {
  1841.  
  1842.                 buffer.flip();
  1843.  
  1844.  
  1845.                 // Parse header if didn't do it before ---------------------------------------
  1846.                 // Узнаем тип сообщения и его размер
  1847.                 if (!connection.isReceiveHeaderPresent()) {
  1848.                     MessageType messageType = connection.parseHeader();
  1849.  
  1850.                     totalToReceive = connection.remainingBytesToRead() - (8 + 1);
  1851.  
  1852.                     // Определяемся, куда сохранять данные
  1853.                     if(messageType == MessageType.TEXT) {
  1854.  
  1855.                         // берем из буферный канал для текста
  1856.                         data = connection.getBufferedReceiveChannel();
  1857.                     }
  1858.                     else {
  1859.                         // пишем в файл
  1860.                         data = connection.createFileChannel(connection.getReceiveFilePath(), "rw");
  1861.                     }
  1862.                     // устанавливаем выбранный канал для connection в качестве канала-приемника
  1863.                     connection.setReceiveChannel(data);
  1864.  
  1865.                 } // -------------------------------------------------------------------------
  1866.  
  1867.                 // уменьшаем количество оставшихся байт сообщения для чтения
  1868.                 connection.decreaseRemainingBytesToRead(read);
  1869.  
  1870.  
  1871.                 assert data != null;
  1872.                 // пишем из буфера в канал
  1873.                 data.write(buffer);
  1874.  
  1875.                 // опять настраиваем буфер, чтоб жизнь медом не казалась
  1876.                 buffer.rewind();
  1877.                 buffer.limit((int)Math.min(
  1878.                         (long)buffer.capacity(),
  1879.                         connection.remainingBytesToRead()));
  1880.  
  1881.                 System.out.println("Rx: " + data.position() + " / " + totalToReceive);
  1882.  
  1883.                 //System.out.println(data.position() + " / " + data.size());
  1884.  
  1885.                 // преодалеваем упирание в блокирующий сокет (на чтение)
  1886.                 if (connection.remainingBytesToRead() == 0) {
  1887.                     break;
  1888.                 }
  1889.             }
  1890.  
  1891.             // возвращаем обратно возможность разбирать заголовок нового сообщения
  1892.             connection.setReceiveHeaderPresent(false);
  1893.  
  1894.             // Remote endpoint close connection
  1895.             if (read < 0) {
  1896.                 System.out.println("потеряна связь с сервером");
  1897.                 connection.close();
  1898.                 return;
  1899.             }
  1900.  
  1901.  
  1902.         }
  1903.         // Remote endpoint close connection
  1904.         catch (Exception e) {
  1905.             if (e instanceof ClosedChannelException) {
  1906.                 System.out.println("потеряна связь с сервером");
  1907.                 // Remote endpoint close connection
  1908.                 // (maybe not handled in "if (read < 0)")
  1909.                 connection.close(); // will close socket channel
  1910.             }
  1911.             e.printStackTrace();
  1912.         }
  1913.  
  1914.         //System.out.println("readSocket end");
  1915.     }
  1916.  
  1917.  
  1918.  
  1919.  
  1920.  
  1921.     private void writeSocket()  {
  1922.  
  1923.         try {
  1924.  
  1925.             //System.out.println("writeSocket");
  1926.  
  1927.             SocketChannel client = connection.getChannel();
  1928.             ByteBuffer buffer = connection.getWriteBuffer();
  1929.             SeekableByteChannel data = connection.getTransmitChannel();
  1930.  
  1931.             //int wrote;
  1932.  
  1933.             // wrote  >  0  - wrote some data
  1934.             // wrote  =  0  - no data written // socket stall
  1935.  
  1936.             do {
  1937.                 buffer.clear();
  1938.  
  1939.                 // Add header if absent
  1940.                 if (!connection.isTransmitHeaderPresent()) {
  1941.                     connection.writeHeader();
  1942.                 }
  1943.  
  1944.                 data.read(buffer);
  1945.                 buffer.flip();
  1946.  
  1947.  
  1948.                 // пишем до упора, флудим сокет, висим на client.write(...)
  1949.                 // пока все не пролезет или не упадем
  1950.                 client.write(buffer);
  1951.  
  1952.                 System.out.println("Tx: " + data.position() + " / " + data.size());
  1953.             }
  1954.             while (data.position() < data.size());
  1955.  
  1956.             // -------------------------------------------------------------------------
  1957.             // Все успешно записалось, data.position == data.size
  1958.  
  1959.             // возвращаем обратно возможность писать заголовок для нового сообщения
  1960.             // восстанавливаем новый цикл записи сообщений
  1961.             connection.setTransmitHeaderPresent(false);
  1962.  
  1963.             // Закрываем файловый канал (откуда писали в сокет)
  1964.             if (connection.getChannelType(data) == MessageType.BINARY) {
  1965.                 data.close();
  1966.  
  1967.                 // помечаем, что мы закончили передачу файла на сервер
  1968.                 pendingFileTransfer = null;
  1969.                 System.out.println("transfer complete");
  1970.             }
  1971.             // Если это был текстовый канал, то ничего не делаем,
  1972.             // он там переиспользуется
  1973.  
  1974.             // очищаем буффер для следущей передачи
  1975.             buffer.clear();
  1976.  
  1977.             // -------------------------------------------------------------------------
  1978.  
  1979.         }
  1980.         // Remote endpoint close connection
  1981.         catch (Exception e) {
  1982.             if (e instanceof ClosedChannelException) {
  1983.                 System.out.println("потеряна связь с сервером");
  1984.                 // Remote endpoint close connection
  1985.                 // (maybe not handled in "if (read < 0)")
  1986.                 connection.close(); // will close socket channel
  1987.             }
  1988.             e.printStackTrace();
  1989.         }
  1990.     }
  1991.  
  1992.  
  1993.  
  1994.     private void processResponse() {
  1995.  
  1996.  
  1997.         try {
  1998.  
  1999.             SeekableByteChannel receiveChannel =  connection.getReceiveChannel();
  2000.  
  2001.             // Text message
  2002.             if (connection.getChannelType(receiveChannel) == MessageType.TEXT) {
  2003.  
  2004.                 // display to user
  2005.                 String response = channelToString(receiveChannel);
  2006.  
  2007.  
  2008.                 // При отправке сереверу команды на передачу файла
  2009.                 // сервер ответит, что он готов и ожидает передачу файла
  2010.                 // - не будем выводить это в консоль
  2011.                 if (pendingFileTransfer == null) {
  2012.                     System.out.println(response);
  2013.                 }
  2014.  
  2015.                 // будем использовать повторно, без создания нового channel
  2016.                 // receiveChannel буфферезируется (backed by) bufferedReceiveChannel
  2017.                 // поэтому не закрываем, а truncate до 0
  2018.                 receiveChannel.position(0);
  2019.                 receiveChannel.truncate(0);
  2020.             }
  2021.             else {
  2022.  
  2023.                 // working with files
  2024.  
  2025.  
  2026.                 // там в прошлом через команду уже был настроен файл для приема
  2027.                 // И в readSocket() файл уже записался.
  2028.                 // Поэтому просто закрываем
  2029.                 receiveChannel.close();
  2030.                 System.out.println("transfer complete");
  2031.             }
  2032.  
  2033.  
  2034.         } catch (IOException e) {
  2035.             e.printStackTrace();
  2036.         }
  2037.     }
  2038.  
  2039.     // -----------------------------------------------------------------------------
  2040.  
  2041.  
  2042.     private String parseUserInput(String command) {
  2043.  
  2044.         String[] parts = command.split(" ");
  2045.         String preFilter = null;
  2046.         Path filePath;
  2047.  
  2048.         switch (parts[0]) {
  2049.  
  2050.             case "get":
  2051.  
  2052.                 // file name not specified
  2053.                 if (parts.length < 2 ||
  2054.                     isNullOrEmpty(parts[1])) {
  2055.  
  2056.                     preFilter = "invalid command args";
  2057.                     break;
  2058.                 }
  2059.  
  2060.                 filePath = Paths.get(dataRoot + parts[1]);
  2061.                 connection.setReceiveFilePath(filePath);
  2062.  
  2063.                 break;// ---------------------------------------------------------
  2064.  
  2065.  
  2066.             case "put":
  2067.  
  2068.                 // file name not specified
  2069.                 if (parts.length < 2 ||
  2070.                     isNullOrEmpty(parts[1])) {
  2071.  
  2072.                     preFilter = "invalid command args";
  2073.                     break;
  2074.                 }
  2075.  
  2076.                 filePath = Paths.get(dataRoot + parts[1]);
  2077.                 // file not exists
  2078.                 if (!Files.exists(filePath)) {
  2079.                     preFilter = "file not exists";
  2080.                     break;
  2081.                 }
  2082.  
  2083.                 // set file
  2084.                 try {
  2085.                     pendingFileTransfer = connection.createFileChannel(filePath, "r");
  2086.                 }
  2087.                 catch (Exception e) {
  2088.                     preFilter = "I/O error";
  2089.                     e.printStackTrace();
  2090.                 }
  2091.  
  2092.                 break;// ---------------------------------------------------------
  2093.  
  2094.         }
  2095.  
  2096.         return preFilter;
  2097.  
  2098.     }
  2099.  
  2100.  
  2101.     // ==============================================================
  2102.  
  2103.  
  2104.  
  2105.  
  2106.     public static void main(String[] args) throws IOException {
  2107.  
  2108.         Thread t = new Thread(new FubarClient());
  2109.         //t.setDaemon(false);
  2110.         t.start();
  2111.     }
  2112.  
  2113.  
  2114.  
  2115. }
  2116. package ru.geekbrains.netty.selector03.client;
  2117.  
  2118. import java.io.BufferedReader;
  2119. import java.io.InputStreamReader;
  2120.  
  2121. /**
  2122.  * Hello world!
  2123.  *
  2124.  */
  2125. public class App
  2126. {
  2127.  
  2128.     public static void main(String[] args) {
  2129.  
  2130.         new App();
  2131.     }
  2132.    
  2133.    
  2134.  
  2135.     App() {
  2136.  
  2137.  
  2138.     }
  2139.  
  2140. }
Advertisement
Add Comment
Please, Sign In to add comment