Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.aurora.intelligence.scheduling;
- import com.aurora.intelligence.SyntaxService;
- import com.fasterxml.jackson.core.JsonParseException;
- import com.fasterxml.jackson.databind.JsonMappingException;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Scheduled;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.ThreadPoolExecutor;
- public class RedisConsumer {
- private final RedisService redisService;
- @Autowired
- private SyntaxService syntaxService;
- private ThreadPoolExecutor executor;
- private static final Logger LOGGER = LoggerFactory.getLogger(RedisConsumer.class);
- public RedisConsumer(RedisService redisService, ThreadPoolExecutor executor) {
- this.redisService = redisService;
- this.executor = executor;
- }
- @Scheduled(fixedRate = 1000)
- private void receive() throws JsonParseException, JsonMappingException, IOException {
- if (executor.getActiveCount() >= executor.getMaximumPoolSize()) {
- return;
- }
- RedisMessage redisMessage = redisService.getRedisMessage(
- RedisMessageType.STATEMENT_DATA,
- RedisMessageType.CRAWLING_DATA
- );
- if (redisMessage != null) {
- LOGGER.info("Found message type: " + redisMessage.getType());
- LOGGER.info("Currently " + executor.getActiveCount() + " out of " +
- executor.getMaximumPoolSize() + " threads are busy.");
- LOGGER.info("Tasks in work queue: " + redisService.getWorkQueueSize());
- if (redisMessage.getType() == RedisMessageType.STATEMENT_DATA) {
- Statement statement = redisService.getDataObject(redisMessage, Statement.class);
- executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- LOGGER.info("Analyzing " + statement.getOriginalStatementId() + " : " +
- statement.getText());
- AnalyzedText res = syntaxService.analyze(
- statement.getText(),
- statement.getAnnotations(),
- statement.getOriginalStatementId());
- updateStatementWithAnalyzedText(res, statement);
- statement.setSentences(res.getSentences());
- statement.setFrequencyAnalysis(res.getFrequencyAnalysis());
- statement.setStatus(ProcessingStatus.PROCESSED);
- redisService.addMessage(redisMessage, statement);
- } catch (Exception e) {
- statement.setStatus(ProcessingStatus.FAULTY);
- redisService.handleFailure(redisMessage, e);
- }
- }
- });
- } else if (redisMessage.getType() == RedisMessageType.CRAWLING_DATA) {
- CrawlingMessage crawlingMessage = redisService.getDataObject(redisMessage, CrawlingMessage.class);
- ScrapingResultDTO wrapper = crawlingMessage.getResult();
- if (wrapper == null) {
- redisService.handleFailure(redisMessage, new Exception("Scraping result is null"));
- }
- executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- List<ScrapingResultDTO> scrapingResults = new ArrayList<>();
- scrapingResults.add(wrapper);
- if (wrapper.getChildren() != null && wrapper.getChildren().size() > 0) {
- hierarchicalKUR(wrapper.getChildren(), scrapingResults);
- }
- for (ScrapingResultDTO result : scrapingResults) {
- if (result.getResult() == null || result.getResult().get("description") == null) {
- LOGGER.warn("The scraping result has no description - SKIP");
- continue;
- }
- String text = result.getResult().get("description");
- AnalyzedText res = syntaxService.analyze(text);
- Statement statement = new Statement();
- updateStatementWithAnalyzedText(res, statement);
- statement.setText(text);
- redisService.addMessage(redisMessage, statement);
- }
- } catch (Exception e) {
- redisService.handleFailure(redisMessage, e);
- }
- }
- });
- }
- }
- }
- private static void hierarchicalKUR(List<ScrapingResultDTO> wrapper, List<ScrapingResultDTO> list) {
- if (wrapper != null && !wrapper.isEmpty()) {
- for (ScrapingResultDTO dto : wrapper) {
- list.add(dto);
- if (dto.getChildren() != null && dto.getChildren().size() > 0) {
- hierarchicalKUR(dto.getChildren(), list);
- }
- }
- }
- }
- private static void updateStatementWithAnalyzedText(AnalyzedText res, Statement statement) {
- statement.setSentences(res.getSentences());
- statement.setFrequencyAnalysis(res.getFrequencyAnalysis());
- statement.setStatus(ProcessingStatus.PROCESSED);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement