dreamworker

Untitled

Mar 18th, 2020
313
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 13.00 KB | None | 0 0
  1.  
  2. /*
  3. <dependencies>
  4.  
  5.         <dependency>
  6.             <groupId>org.springframework.boot</groupId>
  7.             <artifactId>spring-boot-starter</artifactId>
  8.         </dependency>
  9.  
  10.  
  11.         <dependency>
  12.             <groupId>org.springframework.boot</groupId>
  13.             <artifactId>spring-boot-starter-web</artifactId>
  14.         </dependency>
  15.  
  16.         <dependency>
  17.             <groupId>org.springframework.boot</groupId>
  18.             <artifactId>spring-boot-starter-test</artifactId>
  19.             <scope>test</scope>
  20.             <exclusions>
  21.                 <exclusion>
  22.                     <groupId>org.junit.vintage</groupId>
  23.                     <artifactId>junit-vintage-engine</artifactId>
  24.                 </exclusion>
  25.             </exclusions>
  26.         </dependency>
  27.  
  28.         <dependency>
  29.             <groupId>com.github.pengrad</groupId>
  30.             <artifactId>java-telegram-bot-api</artifactId>
  31.             <version>4.6.0</version>
  32.         </dependency>
  33.  
  34.         <!-- https://mvnrepository.com/artifact/net.tascalate/net.tascalate.concurrent -->
  35.         <dependency>
  36.             <groupId>net.tascalate</groupId>
  37.             <artifactId>net.tascalate.concurrent</artifactId>
  38.             <version>0.8.4</version>
  39.         </dependency>
  40.  
  41.         <!-- https://mvnrepository.com/artifact/com.pivovarit/throwing-function -->
  42.         <dependency>
  43.             <groupId>com.pivovarit</groupId>
  44.             <artifactId>throwing-function</artifactId>
  45.             <version>1.5.0</version>
  46.         </dependency>
  47.        
  48.     </dependencies>
  49. */
  50.  
  51.  
  52.  
  53.  
  54.  
  55.  
  56. package ru.home.telebot;
  57.  
  58. import com.pengrad.telegrambot.TelegramBot;
  59. import com.pengrad.telegrambot.UpdatesListener;
  60. import com.pengrad.telegrambot.model.Update;
  61. import com.pengrad.telegrambot.request.SendMessage;
  62. import com.pengrad.telegrambot.response.SendResponse;
  63. import net.tascalate.concurrent.CompletableTask;
  64. import net.tascalate.concurrent.ThreadPoolTaskExecutor;
  65. import okhttp3.*;
  66. import org.slf4j.Logger;
  67. import org.slf4j.LoggerFactory;
  68. import org.springframework.scheduling.annotation.Scheduled;
  69. import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
  70. import org.springframework.stereotype.Service;
  71. import ru.home.telebot.dto.StreamDto;
  72. import ru.home.telebot.entities.BufferStatus;
  73.  
  74. import javax.annotation.PostConstruct;
  75. import java.lang.invoke.MethodHandles;
  76. import java.net.InetSocketAddress;
  77. import java.net.Proxy;
  78. import java.time.Duration;
  79. import java.time.Instant;
  80.  
  81. import java.util.Map;
  82.  
  83. import java.util.concurrent.*;
  84.  
  85. import java.util.concurrent.atomic.AtomicReference;
  86. import java.util.function.BiConsumer;
  87. import java.util.function.Consumer;
  88.  
  89. import static org.springframework.util.StringUtils.isEmpty;
  90. import static org.springframework.util.StringUtils.trimAllWhitespace;
  91. import static org.springframework.util.StringUtils.trimWhitespace;
  92.  
  93. @Service
  94. public class Telebot {
  95.  
  96.     private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
  97.     private final String API_KEY = ;
  98.     private final String PROXY_HOST = ;
  99.     private final int PROXY_PORT = ;
  100.     private final String PROXY_LOGIN = ;
  101.     private final String PROXY_PASSWORD = ;
  102.  
  103.     private final Long GROUP_ID = ; // portal.tv
  104.     //private final Long GROUP_ID =  // me
  105.  
  106.     private static final int TELEGRAM_MAX_MESSAGE_LENGTH = 4000;
  107.  
  108.     private ConcurrentNavigableMap<Instant, String> messageQueue = new ConcurrentSkipListMap<>();
  109.  
  110.     private String teleSendBuffer; // not synced
  111.  
  112.     private AtomicReference<String> teleBuffer = new AtomicReference<>();
  113.     private AtomicReference<BufferStatus> bufferStatus = new AtomicReference<>();
  114.  
  115.     private ThreadPoolExecutor threadPool; // асинхронная отправка сообщений
  116.     private static final int QUEUE_POOL_SIZE = 10;
  117.  
  118.     private static Duration TELEGRAM_INITIAL_SEND_TIMEOUT = Duration.ofSeconds(10);
  119.     private static Duration TELEGRAM_SEND_TIMEOUT = Duration.ofSeconds(TELEGRAM_INITIAL_SEND_TIMEOUT.getSeconds());
  120.  
  121.     private TelegramBot bot;
  122.  
  123.     private Map<String, BiConsumer<Long, String>> handlers = new ConcurrentHashMap<>();
  124.  
  125.  
  126.  
  127. //    @Autowired
  128. //    public void setObjectMapper(ObjectMapper objectMapper) {
  129. //        this.objectMapper = objectMapper;
  130. //    }
  131.  
  132.     @PostConstruct
  133.     private void postConstruct() {
  134.         log.info("Staring telegram bot ...");
  135.  
  136.         handlers.put("/help", this::help);
  137.         handlers.put("/echo", this::echo);
  138.         handlers.put("/ping", this::ping);
  139.         handlers.put("/streams", this::streams);
  140.  
  141.  
  142.  
  143.         OkHttpClient client =
  144.             client(null, PROXY_HOST, PROXY_PORT, PROXY_LOGIN, PROXY_PASSWORD);
  145.  
  146.         bot = new TelegramBot.Builder(API_KEY)
  147.             .okHttpClient(client)
  148.             .build();
  149.  
  150.         bot.setUpdatesListener(updates -> {
  151.  
  152.             // process updates
  153.             for (Update update : updates) {
  154.  
  155.                 try {
  156.  
  157.  
  158.                     Long chatId = update.message().chat().id();
  159.                     String text = update.message().text();
  160.  
  161.                     if (isEmpty(trimAllWhitespace(text))) {
  162.                         continue;
  163.                     }
  164.  
  165.                     String[] commandArray = text.split("[ @]");
  166.                     if (commandArray.length > 0) {
  167.  
  168.                         String command = commandArray[0];
  169.  
  170.                         if (handlers.containsKey(command)) {
  171.                             String body = trimWhitespace(text.substring(command.length(), text.length()));
  172.  
  173.                             handlers.get(command).accept(chatId, body);
  174.                         }
  175.                     }
  176.  
  177.                 }
  178.                 catch (Exception e) {
  179.                     log.error("bot receive/handle message error: ", e);
  180.                 }
  181.             }
  182.  
  183.             // return id of last processed update or confirm them all
  184.             return UpdatesListener.CONFIRMED_UPDATES_ALL;
  185.         });
  186.  
  187.  
  188.  
  189.  
  190.         final CustomizableThreadFactory threadFactory = new CustomizableThreadFactory();
  191.         threadFactory.setDaemon(true);
  192.         threadFactory.setThreadNamePrefix("QueuePool-");
  193.  
  194.  
  195.  
  196.         threadPool = new ThreadPoolTaskExecutor(
  197.             QUEUE_POOL_SIZE, QUEUE_POOL_SIZE * 2,
  198.             60,
  199.             TimeUnit.SECONDS,
  200.             new LinkedBlockingQueue<>(QUEUE_POOL_SIZE * 10),
  201.             threadFactory);
  202.     }
  203.  
  204.     public void updateStream(StreamDto streamDto) {
  205.  
  206.         // enqueue push-notification from bot
  207.         String tmp = streamDto.isOnline() ? "ONLINE" : "OFFLINE";
  208.         String s = "'" + streamDto.getName() + "' now " + tmp;
  209.         messageQueue.put(Instant.now(), s);
  210.     }
  211.  
  212.  
  213.  
  214.     // TELEGRAM BOT HANDLERS ===========================================
  215.  
  216.  
  217.  
  218.     private void help(Long chatId, String text) {
  219.  
  220.         String resp =
  221.             "/help - this help" +
  222.             "\n" + "/echo [text] - echo [text]" +
  223.             "\n" + "/ping - echo-reply" +
  224.             "\n" + "/streams - streams status";
  225.  
  226.  
  227.         SendResponse response = bot.execute(new SendMessage(chatId, resp));
  228.  
  229.     }
  230.  
  231.     private void echo(Long chatId, String text) {
  232.  
  233.         SendResponse response = bot.execute(new SendMessage(chatId, text));
  234.  
  235.     }
  236.  
  237.     private void ping(Long chatId, String text) {
  238.  
  239.         SendResponse response = bot.execute(new SendMessage(chatId, "pong"));
  240.     }
  241.  
  242.  
  243.     private void streams(Long chatId, String text) {
  244.  
  245.         StringBuilder sb = new StringBuilder();
  246.         String[] lines = Utils.getStreamsInfo().split("\n");
  247.  
  248.  
  249.         // разбивка по страницам 4k
  250.         for (String s : lines) {
  251.  
  252.             sb.append(s).append("\n");
  253.  
  254.             if (sb.length() >= TELEGRAM_MAX_MESSAGE_LENGTH) {
  255.                 SendResponse response = bot.execute(new SendMessage(chatId, sb.toString()));
  256.                 sb.setLength(0);
  257.             }
  258.         }
  259.  
  260.  
  261.  
  262.         // отправка последнего куска
  263.         if (sb.length() > 0) {
  264.  
  265.             //log.debug("addSendJod, TELEGRAM_SEND_TIMEOUT: {}", TELEGRAM_SEND_TIMEOUT.getSeconds());
  266.             SendResponse response = bot.execute(new SendMessage(chatId, sb.toString()));
  267.         }
  268.  
  269.     }
  270.  
  271.  
  272.     // ==================================================================================
  273.  
  274.  
  275.     private static OkHttpClient client(Interceptor interceptor, final String proxyHost, final int proxyPort, final String userName, final String password) {
  276.         OkHttpClient.Builder builder = new OkHttpClient.Builder();
  277.         if (interceptor != null) builder.addInterceptor(interceptor);
  278.  
  279.         final Authenticator proxyAuthenticator = (route, response) -> {
  280.             String credential = Credentials.basic(userName, password);
  281.             return response.request().newBuilder()
  282.                 .header("Proxy-Authorization", credential)
  283.                 .build();
  284.         };
  285.  
  286.         builder.proxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)))
  287.             .proxyAuthenticator(proxyAuthenticator)
  288.             .retryOnConnectionFailure(true);
  289.  
  290.         return builder.build();
  291.     }
  292.  
  293.  
  294.     /**
  295.      * Will periodically read messages from messageQueue and send them to telegram
  296.      */
  297.     @Scheduled(fixedDelay = 2000)
  298.     private void sendMessageFromQueue() {
  299.  
  300.         // отправляем сообщение
  301.         Runnable sendJob =
  302.             () -> {
  303.                 log.debug("send {}", teleBuffer.get().replaceAll("\n"," "));
  304.                 SendResponse response = bot.execute(new SendMessage(GROUP_ID, teleBuffer.get()));
  305.             };
  306.  
  307.         // все ок отправилось - удаляем отправленное
  308.         Runnable postSendJob =
  309.             () -> {
  310.                 log.debug("postSend");
  311.                 // reset teleBuffer
  312.                 teleBuffer.set(null);
  313.                 bufferStatus.set(BufferStatus.EMPTY);
  314.  
  315.                 // decrease timeout time
  316.                 if (TELEGRAM_SEND_TIMEOUT.getSeconds() / 1.5 >= TELEGRAM_INITIAL_SEND_TIMEOUT.getSeconds()) {
  317.                     TELEGRAM_SEND_TIMEOUT = Duration.ofSeconds((long)(TELEGRAM_SEND_TIMEOUT.getSeconds() / 1.5));
  318.  
  319.                     //log.debug("TELEGRAM_SEND_TIMEOUT DEC: {}", TELEGRAM_SEND_TIMEOUT.getSeconds());
  320.                 }
  321.  
  322.             };
  323.  
  324.         // чет пошло не так
  325.         Consumer<Throwable> onError =
  326.             (throwable) -> {
  327.  
  328.                 log.error("telegram send error: ", throwable);
  329.                 bufferStatus.set(BufferStatus.ERROR);
  330.  
  331.                 // increase timeout time
  332.                 if (TELEGRAM_SEND_TIMEOUT.getSeconds() * 2  <= TELEGRAM_INITIAL_SEND_TIMEOUT.getSeconds() * 60) {
  333.                     TELEGRAM_SEND_TIMEOUT = Duration.ofSeconds(TELEGRAM_SEND_TIMEOUT.getSeconds() * 2);
  334.  
  335.                     //log.debug("TELEGRAM_SEND_TIMEOUT INC: {}", TELEGRAM_SEND_TIMEOUT.getSeconds());
  336.                 }
  337.  
  338.  
  339.             };
  340.  
  341.  
  342.  
  343.  
  344.         // -----------------------------------------------------------
  345.  
  346.         // телега занята отправкой текущего сообщения
  347.         if(bufferStatus.get() == BufferStatus.PENDING) {
  348.             return;
  349.         }
  350.  
  351.         // предыдущее сообщение не отправлено - ошибка передачи - отправляем заново
  352.         if(bufferStatus.get() == BufferStatus.ERROR) {
  353.             addSendJod(sendJob, postSendJob, onError);
  354.             return;
  355.         }
  356.  
  357.         // нечего отправлять
  358.         if (messageQueue.size() == 0) {
  359.             return;
  360.         }
  361.  
  362.  
  363.  
  364.         // ------------------------------------------------------------
  365.  
  366.         StringBuilder sb = new StringBuilder();
  367.         Instant lastReadied = null;
  368.  
  369.         // формируем пакет
  370.         for (Map.Entry<Instant, String> entry : messageQueue.entrySet()) {
  371.  
  372.             if(sb.length() > TELEGRAM_MAX_MESSAGE_LENGTH) {
  373.                 break;
  374.             }
  375.  
  376.             sb.append(entry.getValue()).append("\n");
  377.             lastReadied = entry.getKey();
  378.         }
  379.  
  380.         // move all readied strings to teleBuffer
  381.         if (sb.length() > 0) {
  382.  
  383.             teleBuffer.set(sb.toString());
  384.  
  385.             Instant finalLastReadied = lastReadied;
  386.             messageQueue.entrySet().removeIf(e -> e.getKey().compareTo(finalLastReadied) <= 0);
  387.  
  388.             // отправляем пакет
  389.             addSendJod(sendJob, postSendJob, onError);
  390.         }
  391.  
  392.     }
  393.  
  394.  
  395.  
  396.  
  397.  
  398.  
  399.     private void addSendJod(Runnable send, Runnable postSend, Consumer<Throwable> onError) {
  400.  
  401.  
  402.         CompletableTask
  403.             .runAsync(send, threadPool)    // отправляем пачку telegram сообщения
  404.             .orTimeout(TELEGRAM_SEND_TIMEOUT)
  405.             .handle((aVoid, throwable) -> {
  406.  
  407.                 if (throwable == null) {
  408.                     postSend.run();
  409.                 }
  410.                 else {
  411.                     onError.accept(throwable);
  412.                 }
  413.                 return null;
  414.             });
  415.  
  416.  
  417.     }
  418.  
  419.  
  420.  
  421. }
Advertisement
Add Comment
Please, Sign In to add comment