viraco4a

Suggestion

Apr 2nd, 2019
179
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.61 KB | None | 0 0
  1. package com.aurora.intelligence.scheduling;
  2.  
  3. import com.aurora.intelligence.SyntaxService;
  4. import com.fasterxml.jackson.core.JsonParseException;
  5. import com.fasterxml.jackson.databind.JsonMappingException;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.scheduling.annotation.Scheduled;
  10.  
  11. import java.io.IOException;
  12. import java.util.ArrayList;
  13. import java.util.List;
  14. import java.util.concurrent.ThreadPoolExecutor;
  15.  
  16. public class RedisConsumer {
  17. private final RedisService redisService;
  18.  
  19. @Autowired
  20. private SyntaxService syntaxService;
  21.  
  22. private ThreadPoolExecutor executor;
  23.  
  24. private static final Logger LOGGER = LoggerFactory.getLogger(RedisConsumer.class);
  25.  
  26. public RedisConsumer(RedisService redisService, ThreadPoolExecutor executor) {
  27. this.redisService = redisService;
  28. this.executor = executor;
  29. }
  30.  
  31. @Scheduled(fixedRate = 1000)
  32. private void receive() throws JsonParseException, JsonMappingException, IOException {
  33. if (executor.getActiveCount() >= executor.getMaximumPoolSize()) {
  34. return;
  35. }
  36.  
  37. RedisMessage redisMessage = redisService.getRedisMessage(
  38. RedisMessageType.STATEMENT_DATA,
  39. RedisMessageType.CRAWLING_DATA
  40. );
  41.  
  42. if (redisMessage != null) {
  43. LOGGER.info("Found message type: " + redisMessage.getType());
  44. LOGGER.info("Currently " + executor.getActiveCount() + " out of " +
  45. executor.getMaximumPoolSize() + " threads are busy.");
  46. LOGGER.info("Tasks in work queue: " + redisService.getWorkQueueSize());
  47.  
  48. if (redisMessage.getType() == RedisMessageType.STATEMENT_DATA) {
  49.  
  50. Statement statement = redisService.getDataObject(redisMessage, Statement.class);
  51. executor.submit(new Runnable() {
  52. @Override
  53. public void run() {
  54. try {
  55. LOGGER.info("Analyzing " + statement.getOriginalStatementId() + " : " +
  56. statement.getText());
  57. AnalyzedText res = syntaxService.analyze(
  58. statement.getText(),
  59. statement.getAnnotations(),
  60. statement.getOriginalStatementId());
  61. updateStatementWithAnalyzedText(res, statement);
  62.  
  63. redisService.addMessage(redisMessage, statement);
  64.  
  65. } catch (Exception e) {
  66. statement.setStatus(ProcessingStatus.FAULTY);
  67. redisService.handleFailure(redisMessage, e);
  68. }
  69. }
  70. });
  71. } else if (redisMessage.getType() == RedisMessageType.CRAWLING_DATA) {
  72.  
  73. CrawlingMessage crawlingMessage = redisService.getDataObject(redisMessage, CrawlingMessage.class);
  74.  
  75. ScrapingResultDTO wrapper = crawlingMessage.getResult();
  76. if (wrapper == null) {
  77. redisService.handleFailure(redisMessage, new Exception("Scraping result is null"));
  78. }
  79.  
  80. executor.submit(new Runnable() {
  81. @Override
  82. public void run() {
  83. try {
  84. List<ScrapingResultDTO> scrapingResults = new ArrayList<>();
  85. scrapingResults.add(wrapper);
  86. if (wrapper.getChildren() != null && wrapper.getChildren().size() > 0) {
  87. hierarchicalKUR(wrapper.getChildren(), scrapingResults);
  88. }
  89.  
  90. for (ScrapingResultDTO result : scrapingResults) {
  91. if (result.getResult() == null || result.getResult().get("description") == null) {
  92. LOGGER.warn("The scraping result has no description - SKIP");
  93. continue;
  94. }
  95.  
  96. String text = result.getResult().get("description");
  97. AnalyzedText res = syntaxService.analyze(text);
  98. Statement statement = new Statement();
  99. updateStatementWithAnalyzedText(res, statement);
  100. statement.setText(text);
  101.  
  102. redisService.addMessage(redisMessage, statement);
  103. }
  104. } catch (Exception e) {
  105. redisService.handleFailure(redisMessage, e);
  106. }
  107. }
  108. });
  109. }
  110. }
  111. }
  112.  
  113. private static void hierarchicalKUR(List<ScrapingResultDTO> wrapper, List<ScrapingResultDTO> list) {
  114. if (wrapper != null && !wrapper.isEmpty()) {
  115.  
  116. for (ScrapingResultDTO dto : wrapper) {
  117. list.add(dto);
  118. if (dto.getChildren() != null && dto.getChildren().size() > 0) {
  119. hierarchicalKUR(dto.getChildren(), list);
  120. }
  121. }
  122.  
  123. }
  124. }
  125.  
  126. private static void updateStatementWithAnalyzedText(AnalyzedText res, Statement statement) {
  127. statement.setSentences(res.getSentences());
  128. statement.setFrequencyAnalysis(res.getFrequencyAnalysis());
  129. statement.setStatus(ProcessingStatus.PROCESSED);
  130. }
  131. }
Advertisement
Add Comment
Please, Sign In to add comment