Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Перейти к содержимому
- Gmail используется с программой чтения с экрана
- 1 из 717
- Fwd: Про рефакторинг процессинга
- Входящие
- x
- Evgeny Tarasenko <tarasenko@nprog.ru>
- Приложения
- 10:22 (14 минут назад)
- кому: я
- Посмотри тоже, пожалуйста.
- -------- Исходное сообщение --------
- Тема: Про рефакторинг процессинга
- Дата: 2019-12-12 20:15
- От: Павел Карасюк <p.karasyuk@gmail.com>
- Кому: Evgeny Tarasenko <tarasenko@nprog.ru>
- Попробовал набросать то, что имел в
- виду. Там все очень неаккуратно и
- неполно (многие параметры непонятно
- как передаются, скипы не показаны), но
- делать аккуратно - долго, и пока
- непонятно, стоит ли того. Посмотри, если
- интересно, можно попробовать доделать
- Область прикрепленных файлов
- class ProcessingStage {
- public ProcessingStage(String stageId) {
- this.stageId = stageId;
- }
- private Map<String, Object> params;
- private Map<String, Object> results;
- protected String stageId;
- public void prePreform() {};
- public CompletedFuture<ITaskResult> void perform() {};
- public void postPerform(ITaskResult performResult); //must be changed to store results
- public boolean isSkipped() { return false; };
- public void addParams(Map<> params) {this.params.addAll(params)};
- public void addResults(Map<> results) {this.results.addAll(results)};
- public final insert(ProcessingStage stage) {
- stage.addParams(this.params);
- graph.insert(stage, this.stageId);
- }
- private ProcessnigGraph graph;
- Collection<ProcessingStage> previousStages;
- Collection<ProcessingStage> nextStages;
- }
- class ProcessingGraph {
- Collection<ProcessingStep> nodes = new HashSet<> ();
- public void addRootNode(ProcessingStage node) {
- nodes.add(node);
- }
- public void addNode(ProcessingStage node, String nodeId) {
- nodes.add(node);
- parentNode = findNode(nodeId);
- parentNode.addChild(node);
- node.addParent(parentNode);
- }
- public void insert(ProcessingStage node, String nodeId) {
- nodes.add(node);
- ProcessingStage parentNode = findNode(nodeId);
- //node.addParams(parentNode.getParams()) // Not sure
- parentNode.addChild(node);
- node.addParent(parentNode);
- node.addChildren(parentNode.removeAllChildren());
- }
- public void addParent(String childId, String parentId) {
- ProcessingStage child = findNode(childId);
- ProcessingStage parent = findNote(parentId);
- child.addParent(parent);
- parent.addChild(child);
- }
- public void schedule() {
- ProcessingStage root = findRootNode();
- }
- private CompletableFuture scheduleNode(ProcessingStage node) {
- for(ProcessingStage parent : node.getPreviousStages()) {
- if(!parent.isProcessingOver) {
- return CompletableFuture.completedFuture(null);
- }
- }
- return CompletableFuture.completedFuture(null)
- .thenApply(result -> node.prePerform())
- .thenCompose(result -> node.perform())
- .thenApply(result -> node.postPerform(result))
- .thenApply(result -> node.setProcessingOver());
- }
- private CompletableFuture scheduleRecursively(ProcessingNode node, CompletableFuture workflow) {
- workflow
- .thenCompose(nothing -> scheduleNode(node))
- .thenCompose(nothing -> CompletableFuture.allOf(
- node.getNextStages()
- .stream()
- .map(node -> scheduleRecursively(node, CompletableFuture.completedFuture(null)))
- .toArray() )
- );
- }
- }
- class FileProcessingService {
- ...
- public void scheduleInitialGraph() {
- ProcessingGraph graph = ProcessingGraph();
- graph.addRootNode(new identifyProcessingStage("Identify"));
- graph.addNode(new VerifyProcessingStage("Verify"), "Identify"); //Add after identify
- graph.addNode(new CheckQualityStage("Check quality"), "Verify");
- graph.addNode(new CleaningStage("Cleaning"), "Check quality");
- graph.addNote(new AlignmentStage("Alignment"), "Cleaning");
- graph.addNode(new GermalineVariationsCallingStage("Germline calling"), "Alignment");
- graph.addNode(new SomaticVariationsCallingStage("Somatic calling"), "Alignment");
- graph.addNode(new SomeStepAfterCalling("Some step after both calling"), "Germline calling");
- graph.addParent("Some step after both calling", "Somatic calling");
- graph.schedule();
- }
- }
- class CleaningStage extends ProcessingStage {
- @Override
- protected void prePerform() {
- insert(new ShortReadsFilteringStage("Short reads filtering"), this.stageId);
- insert(new FilterByTileStage("Filter by tile"), "Short reads filtering");
- String latestStage;
- if(sourceFile.getReadsCount() < THRESHOLD_READS_COUNT) {
- insert(new TrimmingAndFilteringStage("Trimming and filtering"), "Filter by tile");
- latestStage = "Trimming and filtering";
- } else {
- insert(new GetFileSampleStage("Sampling"), "Filter by tile");
- insert(new TrimmingAndFilteringStage("Trimming and filtering"), "Sampling");
- insert(new RecoverAfterSampling("Recover after sampling"), "Trimming and filtering");
- latestStage = "Recover after sampling";
- }
- insert(new AdapterFilteringStage("Adapter filtering"), latestStage);
- }
- @Override
- public void postPerform() {
- this.results.put("Cleanup context", getContextWithQualityResults());
- this.results.put("Cleanup quality result", /*get from this.params*/);
- }
- private CleanupContext getContextWithQualityResults() {
- CleanupContext context = new CleanupConext();
- //update context with quality results from this.params.get("Task result")
- }
- }
- class TrimmingAndFilteringStage extends ProcessingStage {
- @Override
- public void prePerform() {
- String previousId = this.stageId;
- for(/*iterate over params configuration*/) {
- String newId = "Trimming " + UUID.randomUUID().toString();
- insert(new TrimmingStep(newId, /*trimming params*/...), previousId);
- previousId = newId;
- }
- ...
- }
- @Override
- public void postPerform(ITaskResult performResult) {
- // Copy context and quality results from params to results
- }
- }
- class TrimmingStage extends ProcessingStage {
- private TrimmingParams trimmingParams;
- public TrimmingStage(String stageId, TrimmingParams trimmingParams) {
- super(stageId);
- this.trimmingParams = trimmingParams;
- }
- private CleanupContext updateContext(CleanupContext prevContext) {
- //backup previous results for potential backtracking, and other context data as needed
- }
- @Override
- public void prePerform() {
- if (isEndQualityMetricFailed(quality) && !stopCondition(quality, sourceFile, isSample)) {
- insert(new CheckCleanupQualityStage("Check cleanup quality " + UUID.randomUUID), this.stageId);
- }
- }
- @Override
- public CompletableFuture<ITaskResult> perform() {
- FileQualityResults quality = (FileQualityResults) this.params.get("Cleanup quality result");
- if (isEndQualityMetricFailed(quality) && !stopCondition(quality, sourceFile, isSample)) {
- return taskExecutionService.submitQualityTrimmerTask(sourceFile, getResultFilePath(state.result, sourceFile.getFilePath(), FileFormat.FASTQ), minQuality)
- }
- return null;
- }
- @Override
- public void postPerform(ITaskResult performResult) {
- if (isEndQualityMetricFailed(quality) && !stopCondition(quality, sourceFile, isSample)) {
- this.results.put("Cleanup context", updateContext(this.params.get("Cleanup context"), performResult));
- } else {
- this.results.put("Cleanup context", this.params.get("Cleanup context"));
- }
- }
- }
- class CheckCleanupQualityStage extends ProcessingStage {
- private CompletableFuture<ITaskResult> checkQuality(SourceFileEntity sourceFile, ITaskResults taskResults) {
- return taskExecutionService.submitCheckQuality(sourceFile, getResultFilePath(taskResults, sourceFile.getFilePath(), FileFormat.FASTQ));
- }
- @Override
- public CompletableFuture<ITaskResult> perform() {
- return checkQuality(sourceFile, params.get("Stage result"));
- }
- @Override
- public void postPerform(ITaskResult performResult) {
- StepResult stepResult = StepResult.processed(quality, taskResults);
- CleanupContext context = (CleanupContext) params.get("Cleanup Context");
- if(ITaskResult.class == FileFastqQUalityTrimmingTaskResults) {
- verifyTrimmingResult(context, stepResult);
- }
- // ...
- results.put("Cleanup Context", context);
- results.put("Cleanup quality result", performResult);
- }
- }
- proto.java
- Отображается файл "proto.java"
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement