Advertisement
Guest User

Untitled

a guest
Sep 1st, 2019
120
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.22 KB | None | 0 0
  1. spark
  2.       .sparkContext
  3.       .register(covSumAcc, "CoverageSumAcc")
  4.  
  5.     reducedEvents
  6.       .persist(StorageLevel.MEMORY_AND_DISK)
  7.       .foreach(x => {
  8.         val maxC = x._2._2 + x._2._1.length - 1
  9.         if (!targetType.equals(StringType)) {
  10.           val maxCounter = if (targetType.equals(IntegerType))
  11.             maxC % windowLength.get
  12.           else // Blocks and bases for faster computing
  13.             1
  14.           val rightCovSumEdge = RightCovSumEdge(x._1, maxC, x._2._1.takeRight(maxCounter))
  15.           val rightCovSumUpdate = new CovSumUpdate(ArrayBuffer(rightCovSumEdge))
  16.           covSumAcc.add(rightCovSumUpdate)
  17.         } else { //Targets from table
  18.           val targetsTab = target.get
  19.           val session = SparkSession.builder().getOrCreate()
  20.           val crossingTargets = session.sql(s"SELECT * FROM ${targetsTab} WHERE start < ${maxC} AND end > ${maxC} AND contigName = '${x._1}'")
  21.           crossingTargets
  22.             .collect()
  23.             .foreach(targetRow => {
  24.               val maxCounter = maxC - targetRow.get(1).asInstanceOf[Int]
  25.               val rightCovSumEdge = RightCovSumEdge(x._1, maxC, x._2._1.takeRight(maxCounter))
  26.               val rightCovSumUpdate = new CovSumUpdate(ArrayBuffer(rightCovSumEdge))
  27.               covSumAcc.add(rightCovSumUpdate)
  28.             })
  29.         }
  30.       })
  31.  
  32.     lazy val covSumBroad = spark.sparkContext.broadcast(prepareSumBroadcast(covSumAcc.value(), reducedEvents))
  33.  
  34.     lazy val reducedSumEvents = CoverageMethodsMos.upateContigSumRange(covSumBroad, reducedEvents)
  35.  
  36.  
  37.     lazy val cov =
  38.       if (targetType.equals(IntegerType) && !optimizeWindow) // fixed-length window
  39.         CoverageMethodsMos.eventsToCoverage(sampleId, reducedSumEvents, covBroad.value.minmax, blocksResult, allPos, windowLength, None)
  40.           .keyBy(_.key)
  41.           .reduceByKey(
  42.             (a, b) =>
  43.               CovRecordWindow(
  44.                 a.contigName,
  45.                 a.start,
  46.                 a.end,
  47.                 (a.asInstanceOf[CovRecordWindow].overLap.get * a.asInstanceOf[CovRecordWindow].cov + b.asInstanceOf[CovRecordWindow].overLap.get * b.asInstanceOf[CovRecordWindow].cov) / (a.asInstanceOf[CovRecordWindow].overLap.get + b.asInstanceOf[CovRecordWindow].overLap.get),
  48.                 Some(a.asInstanceOf[CovRecordWindow].overLap.get + b.asInstanceOf[CovRecordWindow].overLap.get)
  49.               )
  50.           )
  51.           .map(_._2)
  52.       else if (targetType.equals(StringType) && !optimizeWindow) {
  53.         CoverageMethodsMos.eventsToCoverage(sampleId, reducedSumEvents, covBroad.value.minmax, blocksResult, allPos, None, target)
  54.           .keyBy(_.key)
  55.           .reduceByKey(
  56.             (a, b) =>
  57.               CovRecordWindow(
  58.                 a.contigName,
  59.                 a.start,
  60.                 a.end,
  61.                 (a.asInstanceOf[CovRecordWindow].overLap.get * a.asInstanceOf[CovRecordWindow].cov + b.asInstanceOf[CovRecordWindow].overLap.get * b.asInstanceOf[CovRecordWindow].cov) / (a.asInstanceOf[CovRecordWindow].overLap.get + b.asInstanceOf[CovRecordWindow].overLap.get),
  62.                 Some(a.asInstanceOf[CovRecordWindow].overLap.get + b.asInstanceOf[CovRecordWindow].overLap.get)
  63.               )
  64.           )
  65.           .map(_._2)
  66.       }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement