Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- spark
- .sparkContext
- .register(covSumAcc, "CoverageSumAcc")
- reducedEvents
- .persist(StorageLevel.MEMORY_AND_DISK)
- .foreach(x => {
- val maxC = x._2._2 + x._2._1.length - 1
- if (!targetType.equals(StringType)) {
- val maxCounter = if (targetType.equals(IntegerType))
- maxC % windowLength.get
- else // Blocks and bases for faster computing
- 1
- val rightCovSumEdge = RightCovSumEdge(x._1, maxC, x._2._1.takeRight(maxCounter))
- val rightCovSumUpdate = new CovSumUpdate(ArrayBuffer(rightCovSumEdge))
- covSumAcc.add(rightCovSumUpdate)
- } else { //Targets from table
- val targetsTab = target.get
- val session = SparkSession.builder().getOrCreate()
- val crossingTargets = session.sql(s"SELECT * FROM ${targetsTab} WHERE start < ${maxC} AND end > ${maxC} AND contigName = '${x._1}'")
- crossingTargets
- .collect()
- .foreach(targetRow => {
- val maxCounter = maxC - targetRow.get(1).asInstanceOf[Int]
- val rightCovSumEdge = RightCovSumEdge(x._1, maxC, x._2._1.takeRight(maxCounter))
- val rightCovSumUpdate = new CovSumUpdate(ArrayBuffer(rightCovSumEdge))
- covSumAcc.add(rightCovSumUpdate)
- })
- }
- })
- lazy val covSumBroad = spark.sparkContext.broadcast(prepareSumBroadcast(covSumAcc.value(), reducedEvents))
- lazy val reducedSumEvents = CoverageMethodsMos.upateContigSumRange(covSumBroad, reducedEvents)
- lazy val cov =
- if (targetType.equals(IntegerType) && !optimizeWindow) // fixed-length window
- CoverageMethodsMos.eventsToCoverage(sampleId, reducedSumEvents, covBroad.value.minmax, blocksResult, allPos, windowLength, None)
- .keyBy(_.key)
- .reduceByKey(
- (a, b) =>
- CovRecordWindow(
- a.contigName,
- a.start,
- a.end,
- (a.asInstanceOf[CovRecordWindow].overLap.get * a.asInstanceOf[CovRecordWindow].cov + b.asInstanceOf[CovRecordWindow].overLap.get * b.asInstanceOf[CovRecordWindow].cov) / (a.asInstanceOf[CovRecordWindow].overLap.get + b.asInstanceOf[CovRecordWindow].overLap.get),
- Some(a.asInstanceOf[CovRecordWindow].overLap.get + b.asInstanceOf[CovRecordWindow].overLap.get)
- )
- )
- .map(_._2)
- else if (targetType.equals(StringType) && !optimizeWindow) {
- CoverageMethodsMos.eventsToCoverage(sampleId, reducedSumEvents, covBroad.value.minmax, blocksResult, allPos, None, target)
- .keyBy(_.key)
- .reduceByKey(
- (a, b) =>
- CovRecordWindow(
- a.contigName,
- a.start,
- a.end,
- (a.asInstanceOf[CovRecordWindow].overLap.get * a.asInstanceOf[CovRecordWindow].cov + b.asInstanceOf[CovRecordWindow].overLap.get * b.asInstanceOf[CovRecordWindow].cov) / (a.asInstanceOf[CovRecordWindow].overLap.get + b.asInstanceOf[CovRecordWindow].overLap.get),
- Some(a.asInstanceOf[CovRecordWindow].overLap.get + b.asInstanceOf[CovRecordWindow].overLap.get)
- )
- )
- .map(_._2)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement