package com.aurora.intelligence.scheduling; import com.aurora.intelligence.SyntaxService; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; public class RedisConsumer { private final RedisService redisService; @Autowired private SyntaxService syntaxService; private ThreadPoolExecutor executor; private static final Logger LOGGER = LoggerFactory.getLogger(RedisConsumer.class); public RedisConsumer(RedisService redisService, ThreadPoolExecutor executor) { this.redisService = redisService; this.executor = executor; } @Scheduled(fixedRate = 1000) private void receive() throws JsonParseException, JsonMappingException, IOException { if (executor.getActiveCount() >= executor.getMaximumPoolSize()) { return; } RedisMessage redisMessage = redisService.getRedisMessage( RedisMessageType.STATEMENT_DATA, RedisMessageType.CRAWLING_DATA ); if (redisMessage != null) { LOGGER.info("Found message type: " + redisMessage.getType()); LOGGER.info("Currently " + executor.getActiveCount() + " out of " + executor.getMaximumPoolSize() + " threads are busy."); LOGGER.info("Tasks in work queue: " + redisService.getWorkQueueSize()); if (redisMessage.getType() == RedisMessageType.STATEMENT_DATA) { Statement statement = redisService.getDataObject(redisMessage, Statement.class); executor.submit(new Runnable() { @Override public void run() { try { LOGGER.info("Analyzing " + statement.getOriginalStatementId() + " : " + statement.getText()); AnalyzedText res = syntaxService.analyze( statement.getText(), statement.getAnnotations(), statement.getOriginalStatementId()); updateStatementWithAnalyzedText(res, statement); redisService.addMessage(redisMessage, statement); } catch (Exception e) { statement.setStatus(ProcessingStatus.FAULTY); redisService.handleFailure(redisMessage, e); } } }); } else if (redisMessage.getType() == RedisMessageType.CRAWLING_DATA) { CrawlingMessage crawlingMessage = redisService.getDataObject(redisMessage, CrawlingMessage.class); ScrapingResultDTO wrapper = crawlingMessage.getResult(); if (wrapper == null) { redisService.handleFailure(redisMessage, new Exception("Scraping result is null")); } executor.submit(new Runnable() { @Override public void run() { try { List scrapingResults = new ArrayList<>(); scrapingResults.add(wrapper); if (wrapper.getChildren() != null && wrapper.getChildren().size() > 0) { hierarchicalKUR(wrapper.getChildren(), scrapingResults); } for (ScrapingResultDTO result : scrapingResults) { if (result.getResult() == null || result.getResult().get("description") == null) { LOGGER.warn("The scraping result has no description - SKIP"); continue; } String text = result.getResult().get("description"); AnalyzedText res = syntaxService.analyze(text); Statement statement = new Statement(); updateStatementWithAnalyzedText(res, statement); statement.setText(text); redisService.addMessage(redisMessage, statement); } } catch (Exception e) { redisService.handleFailure(redisMessage, e); } } }); } } } private static void hierarchicalKUR(List wrapper, List list) { if (wrapper != null && !wrapper.isEmpty()) { for (ScrapingResultDTO dto : wrapper) { list.add(dto); if (dto.getChildren() != null && dto.getChildren().size() > 0) { hierarchicalKUR(dto.getChildren(), list); } } } } private static void updateStatementWithAnalyzedText(AnalyzedText res, Statement statement) { statement.setSentences(res.getSentences()); statement.setFrequencyAnalysis(res.getFrequencyAnalysis()); statement.setStatus(ProcessingStatus.PROCESSED); } }