Advertisement
viraco4a

Original

Apr 2nd, 2019
462
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.77 KB | None | 0 0
  1. package com.aurora.intelligence.scheduling;
  2.  
  3. import com.aurora.intelligence.SyntaxService;
  4. import com.fasterxml.jackson.core.JsonParseException;
  5. import com.fasterxml.jackson.databind.JsonMappingException;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.scheduling.annotation.Scheduled;
  10.  
  11. import java.io.IOException;
  12. import java.util.ArrayList;
  13. import java.util.List;
  14. import java.util.concurrent.ThreadPoolExecutor;
  15.  
  16. public class RedisConsumer {
  17. private final RedisService redisService;
  18.  
  19. @Autowired
  20. private SyntaxService syntaxService;
  21.  
  22. private ThreadPoolExecutor executor;
  23.  
  24. private static final Logger LOGGER = LoggerFactory.getLogger(RedisConsumer.class);
  25.  
  26. public RedisConsumer(RedisService redisService, ThreadPoolExecutor executor) {
  27. this.redisService = redisService;
  28. this.executor = executor;
  29. }
  30.  
  31. @Scheduled(fixedRate = 1000)
  32. private void receive() throws JsonParseException, JsonMappingException, IOException {
  33. if (executor.getActiveCount() >= executor.getMaximumPoolSize()) {
  34. return;
  35. }
  36.  
  37. RedisMessage redisMessage = redisService.getRedisMessage(
  38. RedisMessageType.STATEMENT_DATA,
  39. RedisMessageType.CRAWLING_DATA
  40. );
  41.  
  42. if (redisMessage != null) {
  43. LOGGER.info("Found message type: " + redisMessage.getType());
  44. LOGGER.info("Currently " + executor.getActiveCount() + " out of " +
  45. executor.getMaximumPoolSize() + " threads are busy.");
  46. LOGGER.info("Tasks in work queue: " + redisService.getWorkQueueSize());
  47.  
  48. if (redisMessage.getType() == RedisMessageType.STATEMENT_DATA) {
  49.  
  50. Statement statement = redisService.getDataObject(redisMessage, Statement.class);
  51. executor.submit(new Runnable() {
  52. @Override
  53. public void run() {
  54. try {
  55. LOGGER.info("Analyzing " + statement.getOriginalStatementId() + " : " +
  56. statement.getText());
  57. AnalyzedText res = syntaxService.analyze(
  58. statement.getText(),
  59. statement.getAnnotations(),
  60. statement.getOriginalStatementId());
  61.  
  62. statement.setSentences(res.getSentences());
  63. statement.setFrequencyAnalysis(res.getFrequencyAnalysis());
  64. statement.setStatus(ProcessingStatus.PROCESSED);
  65.  
  66. redisService.addMessage(redisMessage, statement);
  67.  
  68. } catch (Exception e) {
  69. statement.setStatus(ProcessingStatus.FAULTY);
  70. redisService.handleFailure(redisMessage, e);
  71. }
  72. }
  73. });
  74. } else if (redisMessage.getType() == RedisMessageType.CRAWLING_DATA) {
  75.  
  76. CrawlingMessage crawlingMessage = redisService.getDataObject(redisMessage, CrawlingMessage.class);
  77.  
  78. ScrapingResultDTO wrapper = crawlingMessage.getResult();
  79. if (wrapper == null) {
  80. redisService.handleFailure(redisMessage, new Exception("Scraping result is null"));
  81. }
  82.  
  83. executor.submit(new Runnable() {
  84. @Override
  85. public void run() {
  86. try {
  87. List<ScrapingResultDTO> scrapingResults = new ArrayList<>();
  88. scrapingResults.add(wrapper);
  89. if (wrapper.getChildren() != null && wrapper.getChildren().size() > 0) {
  90. hierarchicalKUR(wrapper.getChildren(), scrapingResults);
  91. }
  92.  
  93. for (ScrapingResultDTO result : scrapingResults) {
  94. if (result.getResult() == null || result.getResult().get("description") == null) {
  95. LOGGER.warn("The scraping result has no description - SKIP");
  96. continue;
  97. }
  98.  
  99. String text = result.getResult().get("description");
  100. AnalyzedText res = syntaxService.analyze(text);
  101. Statement statement = createStatementFromAnalyzeResult(res);
  102. statement.setText(text);
  103.  
  104. redisService.addMessage(redisMessage, statement);
  105. }
  106. } catch (Exception e) {
  107. redisService.handleFailure(redisMessage, e);
  108. }
  109. }
  110. });
  111. }
  112. }
  113. }
  114.  
  115. private static void hierarchicalKUR(List<ScrapingResultDTO> wrapper, List<ScrapingResultDTO> list) {
  116. if (wrapper != null && !wrapper.isEmpty()) {
  117.  
  118. for (ScrapingResultDTO dto : wrapper) {
  119. list.add(dto);
  120. if (dto.getChildren() != null && dto.getChildren().size() > 0) {
  121. hierarchicalKUR(dto.getChildren(), list);
  122. }
  123. }
  124.  
  125. }
  126. }
  127.  
  128. private static Statement createStatementFromAnalyzeResult(AnalyzedText res) {
  129. Statement statement = new Statement();
  130. statement.setSentences(res.getSentences());
  131. statement.setFrequencyAnalysis(res.getFrequencyAnalysis());
  132. statement.setStatus(ProcessingStatus.PROCESSED);
  133. return statement;
  134. }
  135. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement