Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.junit.vintage</groupId>
- <artifactId>junit-vintage-engine</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.github.pengrad</groupId>
- <artifactId>java-telegram-bot-api</artifactId>
- <version>4.6.0</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/net.tascalate/net.tascalate.concurrent -->
- <dependency>
- <groupId>net.tascalate</groupId>
- <artifactId>net.tascalate.concurrent</artifactId>
- <version>0.8.4</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/com.pivovarit/throwing-function -->
- <dependency>
- <groupId>com.pivovarit</groupId>
- <artifactId>throwing-function</artifactId>
- <version>1.5.0</version>
- </dependency>
- </dependencies>
- */
- package ru.home.telebot;
- import com.pengrad.telegrambot.TelegramBot;
- import com.pengrad.telegrambot.UpdatesListener;
- import com.pengrad.telegrambot.model.Update;
- import com.pengrad.telegrambot.request.SendMessage;
- import com.pengrad.telegrambot.response.SendResponse;
- import net.tascalate.concurrent.CompletableTask;
- import net.tascalate.concurrent.ThreadPoolTaskExecutor;
- import okhttp3.*;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
- import org.springframework.stereotype.Service;
- import ru.home.telebot.dto.StreamDto;
- import ru.home.telebot.entities.BufferStatus;
- import javax.annotation.PostConstruct;
- import java.lang.invoke.MethodHandles;
- import java.net.InetSocketAddress;
- import java.net.Proxy;
- import java.time.Duration;
- import java.time.Instant;
- import java.util.Map;
- import java.util.concurrent.*;
- import java.util.concurrent.atomic.AtomicReference;
- import java.util.function.BiConsumer;
- import java.util.function.Consumer;
- import static org.springframework.util.StringUtils.isEmpty;
- import static org.springframework.util.StringUtils.trimAllWhitespace;
- import static org.springframework.util.StringUtils.trimWhitespace;
- @Service
- public class Telebot {
- private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private final String API_KEY = ;
- private final String PROXY_HOST = ;
- private final int PROXY_PORT = ;
- private final String PROXY_LOGIN = ;
- private final String PROXY_PASSWORD = ;
- private final Long GROUP_ID = ; // portal.tv
- //private final Long GROUP_ID = // me
- private static final int TELEGRAM_MAX_MESSAGE_LENGTH = 4000;
- private ConcurrentNavigableMap<Instant, String> messageQueue = new ConcurrentSkipListMap<>();
- private String teleSendBuffer; // not synced
- private AtomicReference<String> teleBuffer = new AtomicReference<>();
- private AtomicReference<BufferStatus> bufferStatus = new AtomicReference<>();
- private ThreadPoolExecutor threadPool; // асинхронная отправка сообщений
- private static final int QUEUE_POOL_SIZE = 10;
- private static Duration TELEGRAM_INITIAL_SEND_TIMEOUT = Duration.ofSeconds(10);
- private static Duration TELEGRAM_SEND_TIMEOUT = Duration.ofSeconds(TELEGRAM_INITIAL_SEND_TIMEOUT.getSeconds());
- private TelegramBot bot;
- private Map<String, BiConsumer<Long, String>> handlers = new ConcurrentHashMap<>();
- // @Autowired
- // public void setObjectMapper(ObjectMapper objectMapper) {
- // this.objectMapper = objectMapper;
- // }
- @PostConstruct
- private void postConstruct() {
- log.info("Staring telegram bot ...");
- handlers.put("/help", this::help);
- handlers.put("/echo", this::echo);
- handlers.put("/ping", this::ping);
- handlers.put("/streams", this::streams);
- OkHttpClient client =
- client(null, PROXY_HOST, PROXY_PORT, PROXY_LOGIN, PROXY_PASSWORD);
- bot = new TelegramBot.Builder(API_KEY)
- .okHttpClient(client)
- .build();
- bot.setUpdatesListener(updates -> {
- // process updates
- for (Update update : updates) {
- try {
- Long chatId = update.message().chat().id();
- String text = update.message().text();
- if (isEmpty(trimAllWhitespace(text))) {
- continue;
- }
- String[] commandArray = text.split("[ @]");
- if (commandArray.length > 0) {
- String command = commandArray[0];
- if (handlers.containsKey(command)) {
- String body = trimWhitespace(text.substring(command.length(), text.length()));
- handlers.get(command).accept(chatId, body);
- }
- }
- }
- catch (Exception e) {
- log.error("bot receive/handle message error: ", e);
- }
- }
- // return id of last processed update or confirm them all
- return UpdatesListener.CONFIRMED_UPDATES_ALL;
- });
- final CustomizableThreadFactory threadFactory = new CustomizableThreadFactory();
- threadFactory.setDaemon(true);
- threadFactory.setThreadNamePrefix("QueuePool-");
- threadPool = new ThreadPoolTaskExecutor(
- QUEUE_POOL_SIZE, QUEUE_POOL_SIZE * 2,
- 60,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(QUEUE_POOL_SIZE * 10),
- threadFactory);
- }
- public void updateStream(StreamDto streamDto) {
- // enqueue push-notification from bot
- String tmp = streamDto.isOnline() ? "ONLINE" : "OFFLINE";
- String s = "'" + streamDto.getName() + "' now " + tmp;
- messageQueue.put(Instant.now(), s);
- }
- // TELEGRAM BOT HANDLERS ===========================================
- private void help(Long chatId, String text) {
- String resp =
- "/help - this help" +
- "\n" + "/echo [text] - echo [text]" +
- "\n" + "/ping - echo-reply" +
- "\n" + "/streams - streams status";
- SendResponse response = bot.execute(new SendMessage(chatId, resp));
- }
- private void echo(Long chatId, String text) {
- SendResponse response = bot.execute(new SendMessage(chatId, text));
- }
- private void ping(Long chatId, String text) {
- SendResponse response = bot.execute(new SendMessage(chatId, "pong"));
- }
- private void streams(Long chatId, String text) {
- StringBuilder sb = new StringBuilder();
- String[] lines = Utils.getStreamsInfo().split("\n");
- // разбивка по страницам 4k
- for (String s : lines) {
- sb.append(s).append("\n");
- if (sb.length() >= TELEGRAM_MAX_MESSAGE_LENGTH) {
- SendResponse response = bot.execute(new SendMessage(chatId, sb.toString()));
- sb.setLength(0);
- }
- }
- // отправка последнего куска
- if (sb.length() > 0) {
- //log.debug("addSendJod, TELEGRAM_SEND_TIMEOUT: {}", TELEGRAM_SEND_TIMEOUT.getSeconds());
- SendResponse response = bot.execute(new SendMessage(chatId, sb.toString()));
- }
- }
- // ==================================================================================
- private static OkHttpClient client(Interceptor interceptor, final String proxyHost, final int proxyPort, final String userName, final String password) {
- OkHttpClient.Builder builder = new OkHttpClient.Builder();
- if (interceptor != null) builder.addInterceptor(interceptor);
- final Authenticator proxyAuthenticator = (route, response) -> {
- String credential = Credentials.basic(userName, password);
- return response.request().newBuilder()
- .header("Proxy-Authorization", credential)
- .build();
- };
- builder.proxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)))
- .proxyAuthenticator(proxyAuthenticator)
- .retryOnConnectionFailure(true);
- return builder.build();
- }
- /**
- * Will periodically read messages from messageQueue and send them to telegram
- */
- @Scheduled(fixedDelay = 2000)
- private void sendMessageFromQueue() {
- // отправляем сообщение
- Runnable sendJob =
- () -> {
- log.debug("send {}", teleBuffer.get().replaceAll("\n"," "));
- SendResponse response = bot.execute(new SendMessage(GROUP_ID, teleBuffer.get()));
- };
- // все ок отправилось - удаляем отправленное
- Runnable postSendJob =
- () -> {
- log.debug("postSend");
- // reset teleBuffer
- teleBuffer.set(null);
- bufferStatus.set(BufferStatus.EMPTY);
- // decrease timeout time
- if (TELEGRAM_SEND_TIMEOUT.getSeconds() / 1.5 >= TELEGRAM_INITIAL_SEND_TIMEOUT.getSeconds()) {
- TELEGRAM_SEND_TIMEOUT = Duration.ofSeconds((long)(TELEGRAM_SEND_TIMEOUT.getSeconds() / 1.5));
- //log.debug("TELEGRAM_SEND_TIMEOUT DEC: {}", TELEGRAM_SEND_TIMEOUT.getSeconds());
- }
- };
- // чет пошло не так
- Consumer<Throwable> onError =
- (throwable) -> {
- log.error("telegram send error: ", throwable);
- bufferStatus.set(BufferStatus.ERROR);
- // increase timeout time
- if (TELEGRAM_SEND_TIMEOUT.getSeconds() * 2 <= TELEGRAM_INITIAL_SEND_TIMEOUT.getSeconds() * 60) {
- TELEGRAM_SEND_TIMEOUT = Duration.ofSeconds(TELEGRAM_SEND_TIMEOUT.getSeconds() * 2);
- //log.debug("TELEGRAM_SEND_TIMEOUT INC: {}", TELEGRAM_SEND_TIMEOUT.getSeconds());
- }
- };
- // -----------------------------------------------------------
- // телега занята отправкой текущего сообщения
- if(bufferStatus.get() == BufferStatus.PENDING) {
- return;
- }
- // предыдущее сообщение не отправлено - ошибка передачи - отправляем заново
- if(bufferStatus.get() == BufferStatus.ERROR) {
- addSendJod(sendJob, postSendJob, onError);
- return;
- }
- // нечего отправлять
- if (messageQueue.size() == 0) {
- return;
- }
- // ------------------------------------------------------------
- StringBuilder sb = new StringBuilder();
- Instant lastReadied = null;
- // формируем пакет
- for (Map.Entry<Instant, String> entry : messageQueue.entrySet()) {
- if(sb.length() > TELEGRAM_MAX_MESSAGE_LENGTH) {
- break;
- }
- sb.append(entry.getValue()).append("\n");
- lastReadied = entry.getKey();
- }
- // move all readied strings to teleBuffer
- if (sb.length() > 0) {
- teleBuffer.set(sb.toString());
- Instant finalLastReadied = lastReadied;
- messageQueue.entrySet().removeIf(e -> e.getKey().compareTo(finalLastReadied) <= 0);
- // отправляем пакет
- addSendJod(sendJob, postSendJob, onError);
- }
- }
- private void addSendJod(Runnable send, Runnable postSend, Consumer<Throwable> onError) {
- CompletableTask
- .runAsync(send, threadPool) // отправляем пачку telegram сообщения
- .orTimeout(TELEGRAM_SEND_TIMEOUT)
- .handle((aVoid, throwable) -> {
- if (throwable == null) {
- postSend.run();
- }
- else {
- onError.accept(throwable);
- }
- return null;
- });
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment