SHARE
TWEET

Untitled

a guest Dec 12th, 2019 142 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1.  
  2.  
  3. Перейти к содержимому
  4. Gmail используется с программой чтения с экрана
  5.  
  6. 1 из 717
  7. Fwd: Про рефакторинг процессинга
  8. Входящие
  9. x
  10.  
  11. Evgeny Tarasenko <tarasenko@nprog.ru>
  12. Приложения
  13. 10:22 (14 минут назад)
  14. кому: я
  15.  
  16. Посмотри тоже, пожалуйста.
  17.  
  18. -------- Исходное сообщение --------
  19. Тема: Про рефакторинг процессинга
  20. Дата: 2019-12-12 20:15
  21. От: Павел Карасюк <p.karasyuk@gmail.com>
  22. Кому: Evgeny Tarasenko <tarasenko@nprog.ru>
  23.  
  24. Попробовал набросать то, что имел в
  25. виду. Там все очень неаккуратно и
  26. неполно (многие параметры непонятно
  27. как передаются, скипы не показаны), но
  28. делать аккуратно - долго, и пока
  29. непонятно, стоит ли того. Посмотри, если
  30. интересно, можно попробовать доделать
  31.  
  32. Область прикрепленных файлов
  33.  
  34. class ProcessingStage {
  35.   public ProcessingStage(String stageId) {
  36.     this.stageId = stageId;
  37.   }
  38.  
  39.   private Map<String, Object> params;
  40.   private Map<String, Object> results;
  41.  
  42.   protected String stageId;
  43.  
  44.   public void prePreform() {};
  45.   public CompletedFuture<ITaskResult> void perform() {};
  46.   public void postPerform(ITaskResult performResult); //must be changed to store results
  47.  
  48.   public boolean isSkipped() { return false; };
  49.  
  50.   public void addParams(Map<> params) {this.params.addAll(params)};
  51.   public void addResults(Map<> results) {this.results.addAll(results)};
  52.  
  53.   public final insert(ProcessingStage stage) {
  54.     stage.addParams(this.params);
  55.     graph.insert(stage, this.stageId);
  56.   }
  57.  
  58.   private ProcessnigGraph graph;
  59.  
  60.   Collection<ProcessingStage> previousStages;
  61.   Collection<ProcessingStage> nextStages;
  62. }
  63.  
  64. class ProcessingGraph {
  65.   Collection<ProcessingStep> nodes = new HashSet<> ();
  66.  
  67.   public void addRootNode(ProcessingStage node) {
  68.     nodes.add(node);
  69.   }
  70.  
  71.   public void addNode(ProcessingStage node, String nodeId) {
  72.     nodes.add(node);
  73.     parentNode = findNode(nodeId);
  74.     parentNode.addChild(node);
  75.     node.addParent(parentNode);
  76.   }
  77.  
  78.   public void insert(ProcessingStage node, String nodeId) {
  79.     nodes.add(node);
  80.     ProcessingStage parentNode = findNode(nodeId);
  81.     //node.addParams(parentNode.getParams())  // Not sure
  82.     parentNode.addChild(node);
  83.     node.addParent(parentNode);
  84.     node.addChildren(parentNode.removeAllChildren());
  85.   }
  86.  
  87.   public void addParent(String childId, String parentId) {
  88.     ProcessingStage child = findNode(childId);
  89.     ProcessingStage parent = findNote(parentId);
  90.     child.addParent(parent);
  91.     parent.addChild(child);
  92.   }
  93.  
  94.   public void schedule() {
  95.     ProcessingStage root = findRootNode();
  96.   }
  97.  
  98.   private CompletableFuture scheduleNode(ProcessingStage node) {
  99.     for(ProcessingStage parent : node.getPreviousStages()) {
  100.       if(!parent.isProcessingOver) {
  101.         return CompletableFuture.completedFuture(null);
  102.       }
  103.     }
  104.  
  105.     return CompletableFuture.completedFuture(null)
  106.       .thenApply(result -> node.prePerform())
  107.       .thenCompose(result -> node.perform())
  108.       .thenApply(result -> node.postPerform(result))
  109.       .thenApply(result -> node.setProcessingOver());
  110.   }
  111.  
  112.   private CompletableFuture scheduleRecursively(ProcessingNode node, CompletableFuture workflow) {
  113.     workflow
  114.       .thenCompose(nothing -> scheduleNode(node))
  115.       .thenCompose(nothing -> CompletableFuture.allOf(
  116.           node.getNextStages()
  117.               .stream()
  118.               .map(node -> scheduleRecursively(node, CompletableFuture.completedFuture(null)))
  119.               .toArray() )
  120.           );
  121.   }
  122. }
  123.  
  124. class FileProcessingService {
  125.   ...
  126.  
  127.   public void scheduleInitialGraph() {
  128.     ProcessingGraph graph = ProcessingGraph();
  129.     graph.addRootNode(new identifyProcessingStage("Identify"));
  130.     graph.addNode(new VerifyProcessingStage("Verify"), "Identify");  //Add after identify
  131.     graph.addNode(new CheckQualityStage("Check quality"), "Verify");
  132.     graph.addNode(new CleaningStage("Cleaning"), "Check quality");
  133.     graph.addNote(new AlignmentStage("Alignment"), "Cleaning");
  134.  
  135.     graph.addNode(new GermalineVariationsCallingStage("Germline calling"), "Alignment");
  136.     graph.addNode(new SomaticVariationsCallingStage("Somatic calling"), "Alignment");
  137.  
  138.     graph.addNode(new SomeStepAfterCalling("Some step after both calling"), "Germline calling");
  139.     graph.addParent("Some step after both calling", "Somatic calling");
  140.  
  141.     graph.schedule();
  142.   }
  143. }
  144.  
  145. class CleaningStage extends ProcessingStage {
  146.   @Override
  147.   protected void prePerform() {
  148.     insert(new ShortReadsFilteringStage("Short reads filtering"), this.stageId);
  149.     insert(new FilterByTileStage("Filter by tile"), "Short reads filtering");
  150.     String latestStage;
  151.     if(sourceFile.getReadsCount() < THRESHOLD_READS_COUNT) {
  152.       insert(new TrimmingAndFilteringStage("Trimming and filtering"), "Filter by tile");
  153.       latestStage = "Trimming and filtering";
  154.     } else {
  155.       insert(new GetFileSampleStage("Sampling"), "Filter by tile");
  156.       insert(new TrimmingAndFilteringStage("Trimming and filtering"), "Sampling");
  157.       insert(new RecoverAfterSampling("Recover after sampling"), "Trimming and filtering");
  158.       latestStage = "Recover after sampling";
  159.     }
  160.     insert(new AdapterFilteringStage("Adapter filtering"), latestStage);
  161.   }
  162.  
  163.   @Override
  164.   public void postPerform() {
  165.     this.results.put("Cleanup context", getContextWithQualityResults());
  166.     this.results.put("Cleanup quality result", /*get from this.params*/);
  167.   }
  168.  
  169.   private CleanupContext getContextWithQualityResults() {
  170.     CleanupContext context = new CleanupConext();
  171.     //update context with quality results from this.params.get("Task result")
  172.   }
  173. }
  174.  
  175. class TrimmingAndFilteringStage extends ProcessingStage {
  176.   @Override
  177.   public void prePerform() {
  178.     String previousId = this.stageId;
  179.     for(/*iterate over params configuration*/) {
  180.       String newId = "Trimming " + UUID.randomUUID().toString();
  181.       insert(new TrimmingStep(newId, /*trimming params*/...), previousId);
  182.       previousId = newId;
  183.     }
  184.    
  185.     ...
  186.   }
  187.  
  188.   @Override
  189.   public void postPerform(ITaskResult performResult) {
  190.     // Copy context and quality results from params to results
  191.   }
  192. }
  193.  
  194. class TrimmingStage extends ProcessingStage {
  195.  
  196.   private TrimmingParams trimmingParams;
  197.  
  198.   public TrimmingStage(String stageId, TrimmingParams trimmingParams) {
  199.     super(stageId);
  200.     this.trimmingParams = trimmingParams;
  201.   }
  202.  
  203.   private CleanupContext updateContext(CleanupContext prevContext) {
  204.     //backup previous results for potential backtracking, and other context data as needed
  205.   }
  206.  
  207.   @Override
  208.   public void prePerform() {
  209.     if (isEndQualityMetricFailed(quality) && !stopCondition(quality, sourceFile, isSample)) {
  210.       insert(new CheckCleanupQualityStage("Check cleanup quality " + UUID.randomUUID), this.stageId);
  211.     }
  212.   }
  213.  
  214.   @Override
  215.   public CompletableFuture<ITaskResult> perform() {
  216.     FileQualityResults quality = (FileQualityResults) this.params.get("Cleanup quality result");
  217.     if (isEndQualityMetricFailed(quality) && !stopCondition(quality, sourceFile, isSample)) {
  218.       return taskExecutionService.submitQualityTrimmerTask(sourceFile, getResultFilePath(state.result, sourceFile.getFilePath(), FileFormat.FASTQ), minQuality)
  219.     }
  220.  
  221.     return null;
  222.   }
  223.  
  224.   @Override
  225.   public void postPerform(ITaskResult performResult) {
  226.     if (isEndQualityMetricFailed(quality) && !stopCondition(quality, sourceFile, isSample)) {
  227.       this.results.put("Cleanup context", updateContext(this.params.get("Cleanup context"), performResult));
  228.     } else {
  229.       this.results.put("Cleanup context", this.params.get("Cleanup context"));
  230.     }
  231.   }
  232. }
  233.  
  234. class CheckCleanupQualityStage extends ProcessingStage {
  235.  
  236.   private CompletableFuture<ITaskResult> checkQuality(SourceFileEntity sourceFile, ITaskResults taskResults) {
  237.     return taskExecutionService.submitCheckQuality(sourceFile, getResultFilePath(taskResults, sourceFile.getFilePath(), FileFormat.FASTQ));
  238.   }
  239.  
  240.   @Override
  241.   public CompletableFuture<ITaskResult> perform() {
  242.     return checkQuality(sourceFile, params.get("Stage result"));
  243.   }
  244.  
  245.   @Override
  246.   public void postPerform(ITaskResult performResult) {
  247.     StepResult stepResult = StepResult.processed(quality, taskResults);
  248.     CleanupContext context = (CleanupContext) params.get("Cleanup Context");
  249.     if(ITaskResult.class == FileFastqQUalityTrimmingTaskResults) {
  250.       verifyTrimmingResult(context, stepResult);
  251.     }
  252.     // ...
  253.     results.put("Cleanup Context", context);
  254.     results.put("Cleanup quality result", performResult);
  255.   }
  256. }
  257. proto.java
  258. Отображается файл "proto.java"
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top