Advertisement
Guest User

Untitled

a guest
Dec 12th, 2019
210
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 8.89 KB | None | 0 0
  1.  
  2.  
  3. Перейти к содержимому
  4. Gmail используется с программой чтения с экрана
  5.  
  6. 1 из 717
  7. Fwd: Про рефакторинг процессинга
  8. Входящие
  9. x
  10.  
  11. Evgeny Tarasenko <tarasenko@nprog.ru>
  12. Приложения
  13. 10:22 (14 минут назад)
  14. кому: я
  15.  
  16. Посмотри тоже, пожалуйста.
  17.  
  18. -------- Исходное сообщение --------
  19. Тема: Про рефакторинг процессинга
  20. Дата: 2019-12-12 20:15
  21. От: Павел Карасюк <p.karasyuk@gmail.com>
  22. Кому: Evgeny Tarasenko <tarasenko@nprog.ru>
  23.  
  24. Попробовал набросать то, что имел в
  25. виду. Там все очень неаккуратно и
  26. неполно (многие параметры непонятно
  27. как передаются, скипы не показаны), но
  28. делать аккуратно - долго, и пока
  29. непонятно, стоит ли того. Посмотри, если
  30. интересно, можно попробовать доделать
  31.  
  32. Область прикрепленных файлов
  33.  
  34. class ProcessingStage {
  35.   public ProcessingStage(String stageId) {
  36.     this.stageId = stageId;
  37.   }
  38.  
  39.   private Map<String, Object> params;
  40.   private Map<String, Object> results;
  41.  
  42.   protected String stageId;
  43.  
  44.   public void prePreform() {};
  45.   public CompletedFuture<ITaskResult> void perform() {};
  46.   public void postPerform(ITaskResult performResult); //must be changed to store results
  47.  
  48.   public boolean isSkipped() { return false; };
  49.  
  50.   public void addParams(Map<> params) {this.params.addAll(params)};
  51.   public void addResults(Map<> results) {this.results.addAll(results)};
  52.  
  53.   public final insert(ProcessingStage stage) {
  54.     stage.addParams(this.params);
  55.     graph.insert(stage, this.stageId);
  56.   }
  57.  
  58.   private ProcessnigGraph graph;
  59.  
  60.   Collection<ProcessingStage> previousStages;
  61.   Collection<ProcessingStage> nextStages;
  62. }
  63.  
  64. class ProcessingGraph {
  65.   Collection<ProcessingStep> nodes = new HashSet<> ();
  66.  
  67.   public void addRootNode(ProcessingStage node) {
  68.     nodes.add(node);
  69.   }
  70.  
  71.   public void addNode(ProcessingStage node, String nodeId) {
  72.     nodes.add(node);
  73.     parentNode = findNode(nodeId);
  74.     parentNode.addChild(node);
  75.     node.addParent(parentNode);
  76.   }
  77.  
  78.   public void insert(ProcessingStage node, String nodeId) {
  79.     nodes.add(node);
  80.     ProcessingStage parentNode = findNode(nodeId);
  81.     //node.addParams(parentNode.getParams())  // Not sure
  82.     parentNode.addChild(node);
  83.     node.addParent(parentNode);
  84.     node.addChildren(parentNode.removeAllChildren());
  85.   }
  86.  
  87.   public void addParent(String childId, String parentId) {
  88.     ProcessingStage child = findNode(childId);
  89.     ProcessingStage parent = findNote(parentId);
  90.     child.addParent(parent);
  91.     parent.addChild(child);
  92.   }
  93.  
  94.   public void schedule() {
  95.     ProcessingStage root = findRootNode();
  96.   }
  97.  
  98.   private CompletableFuture scheduleNode(ProcessingStage node) {
  99.     for(ProcessingStage parent : node.getPreviousStages()) {
  100.       if(!parent.isProcessingOver) {
  101.         return CompletableFuture.completedFuture(null);
  102.       }
  103.     }
  104.  
  105.     return CompletableFuture.completedFuture(null)
  106.       .thenApply(result -> node.prePerform())
  107.       .thenCompose(result -> node.perform())
  108.       .thenApply(result -> node.postPerform(result))
  109.       .thenApply(result -> node.setProcessingOver());
  110.   }
  111.  
  112.   private CompletableFuture scheduleRecursively(ProcessingNode node, CompletableFuture workflow) {
  113.     workflow
  114.       .thenCompose(nothing -> scheduleNode(node))
  115.       .thenCompose(nothing -> CompletableFuture.allOf(
  116.           node.getNextStages()
  117.               .stream()
  118.               .map(node -> scheduleRecursively(node, CompletableFuture.completedFuture(null)))
  119.               .toArray() )
  120.           );
  121.   }
  122. }
  123.  
  124. class FileProcessingService {
  125.   ...
  126.  
  127.   public void scheduleInitialGraph() {
  128.     ProcessingGraph graph = ProcessingGraph();
  129.     graph.addRootNode(new identifyProcessingStage("Identify"));
  130.     graph.addNode(new VerifyProcessingStage("Verify"), "Identify");  //Add after identify
  131.     graph.addNode(new CheckQualityStage("Check quality"), "Verify");
  132.     graph.addNode(new CleaningStage("Cleaning"), "Check quality");
  133.     graph.addNote(new AlignmentStage("Alignment"), "Cleaning");
  134.  
  135.     graph.addNode(new GermalineVariationsCallingStage("Germline calling"), "Alignment");
  136.     graph.addNode(new SomaticVariationsCallingStage("Somatic calling"), "Alignment");
  137.  
  138.     graph.addNode(new SomeStepAfterCalling("Some step after both calling"), "Germline calling");
  139.     graph.addParent("Some step after both calling", "Somatic calling");
  140.  
  141.     graph.schedule();
  142.   }
  143. }
  144.  
  145. class CleaningStage extends ProcessingStage {
  146.   @Override
  147.   protected void prePerform() {
  148.     insert(new ShortReadsFilteringStage("Short reads filtering"), this.stageId);
  149.     insert(new FilterByTileStage("Filter by tile"), "Short reads filtering");
  150.     String latestStage;
  151.     if(sourceFile.getReadsCount() < THRESHOLD_READS_COUNT) {
  152.       insert(new TrimmingAndFilteringStage("Trimming and filtering"), "Filter by tile");
  153.       latestStage = "Trimming and filtering";
  154.     } else {
  155.       insert(new GetFileSampleStage("Sampling"), "Filter by tile");
  156.       insert(new TrimmingAndFilteringStage("Trimming and filtering"), "Sampling");
  157.       insert(new RecoverAfterSampling("Recover after sampling"), "Trimming and filtering");
  158.       latestStage = "Recover after sampling";
  159.     }
  160.     insert(new AdapterFilteringStage("Adapter filtering"), latestStage);
  161.   }
  162.  
  163.   @Override
  164.   public void postPerform() {
  165.     this.results.put("Cleanup context", getContextWithQualityResults());
  166.     this.results.put("Cleanup quality result", /*get from this.params*/);
  167.   }
  168.  
  169.   private CleanupContext getContextWithQualityResults() {
  170.     CleanupContext context = new CleanupConext();
  171.     //update context with quality results from this.params.get("Task result")
  172.   }
  173. }
  174.  
  175. class TrimmingAndFilteringStage extends ProcessingStage {
  176.   @Override
  177.   public void prePerform() {
  178.     String previousId = this.stageId;
  179.     for(/*iterate over params configuration*/) {
  180.       String newId = "Trimming " + UUID.randomUUID().toString();
  181.       insert(new TrimmingStep(newId, /*trimming params*/...), previousId);
  182.       previousId = newId;
  183.     }
  184.    
  185.     ...
  186.   }
  187.  
  188.   @Override
  189.   public void postPerform(ITaskResult performResult) {
  190.     // Copy context and quality results from params to results
  191.   }
  192. }
  193.  
  194. class TrimmingStage extends ProcessingStage {
  195.  
  196.   private TrimmingParams trimmingParams;
  197.  
  198.   public TrimmingStage(String stageId, TrimmingParams trimmingParams) {
  199.     super(stageId);
  200.     this.trimmingParams = trimmingParams;
  201.   }
  202.  
  203.   private CleanupContext updateContext(CleanupContext prevContext) {
  204.     //backup previous results for potential backtracking, and other context data as needed
  205.   }
  206.  
  207.   @Override
  208.   public void prePerform() {
  209.     if (isEndQualityMetricFailed(quality) && !stopCondition(quality, sourceFile, isSample)) {
  210.       insert(new CheckCleanupQualityStage("Check cleanup quality " + UUID.randomUUID), this.stageId);
  211.     }
  212.   }
  213.  
  214.   @Override
  215.   public CompletableFuture<ITaskResult> perform() {
  216.     FileQualityResults quality = (FileQualityResults) this.params.get("Cleanup quality result");
  217.     if (isEndQualityMetricFailed(quality) && !stopCondition(quality, sourceFile, isSample)) {
  218.       return taskExecutionService.submitQualityTrimmerTask(sourceFile, getResultFilePath(state.result, sourceFile.getFilePath(), FileFormat.FASTQ), minQuality)
  219.     }
  220.  
  221.     return null;
  222.   }
  223.  
  224.   @Override
  225.   public void postPerform(ITaskResult performResult) {
  226.     if (isEndQualityMetricFailed(quality) && !stopCondition(quality, sourceFile, isSample)) {
  227.       this.results.put("Cleanup context", updateContext(this.params.get("Cleanup context"), performResult));
  228.     } else {
  229.       this.results.put("Cleanup context", this.params.get("Cleanup context"));
  230.     }
  231.   }
  232. }
  233.  
  234. class CheckCleanupQualityStage extends ProcessingStage {
  235.  
  236.   private CompletableFuture<ITaskResult> checkQuality(SourceFileEntity sourceFile, ITaskResults taskResults) {
  237.     return taskExecutionService.submitCheckQuality(sourceFile, getResultFilePath(taskResults, sourceFile.getFilePath(), FileFormat.FASTQ));
  238.   }
  239.  
  240.   @Override
  241.   public CompletableFuture<ITaskResult> perform() {
  242.     return checkQuality(sourceFile, params.get("Stage result"));
  243.   }
  244.  
  245.   @Override
  246.   public void postPerform(ITaskResult performResult) {
  247.     StepResult stepResult = StepResult.processed(quality, taskResults);
  248.     CleanupContext context = (CleanupContext) params.get("Cleanup Context");
  249.     if(ITaskResult.class == FileFastqQUalityTrimmingTaskResults) {
  250.       verifyTrimmingResult(context, stepResult);
  251.     }
  252.     // ...
  253.     results.put("Cleanup Context", context);
  254.     results.put("Cleanup quality result", performResult);
  255.   }
  256. }
  257. proto.java
  258. Отображается файл "proto.java"
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement