Guest User

Untitled

a guest
Nov 25th, 2017
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.76 KB | None | 0 0
  1. import scala.collection.mutable.Map
  2.  
  3. import org.apache.spark.sql.expressions.Aggregator
  4. import org.apache.spark.sql.Encoder
  5. import org.apache.spark.sql.Encoders
  6.  
  7. import spark.implicits._
  8. import org.apache.spark.sql.types._
  9.  
  10.  
  11. case class Span(
  12. ref_name: String,
  13. bc: String,
  14. beg: Int,
  15. end: Int,
  16. read_count: Int)
  17.  
  18. val spanSchema = StructType(
  19. Array(
  20. StructField("ref_name", StringType, true),
  21. StructField("bc", StringType, true),
  22. StructField("beg", IntegerType, true),
  23. StructField("end", IntegerType, true),
  24. StructField("read_count", IntegerType, true)
  25. )
  26. )
  27.  
  28.  
  29. object CalcBreakPoints extends Aggregator[Span, Map[Int, Int], Array[Int]] {
  30. // Reduce an array of spans to coverage, then to break points
  31.  
  32. // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  33. def zero: Map[Int, Int] = Map[Int, Int]()
  34.  
  35. // Combine two values to produce a new value. For performance, the function
  36. // may modify `buffer` and return it instead of constructing a new object
  37. def reduce(buffer: Map[Int, Int], span: Span): Map[Int, Int] = {
  38. (span.beg until span.end).foreach(
  39. i => buffer += (i -> (buffer.getOrElse[Int](i, 0) + 1)))
  40. buffer
  41. }
  42.  
  43. // Merge two intermediate values
  44. def merge(b1: Map[Int, Int], b2: Map[Int, Int]): Map[Int, Int] = {
  45. b2.foreach {
  46. case (key, value) => b1 += (key -> (value + b1.getOrElse[Int](key, 0)))
  47. }
  48. b1
  49. }
  50.  
  51. // Transform the output of the reduction, convert to BreakPoint
  52. def finish(coverage: Map[Int, Int]): Array[Int] = {
  53. val cov_cutoff = 20;
  54. val f = (i: Int) => if (i >= cov_cutoff) 1 else 0
  55.  
  56. val coords = coverage.keys.toArray.sorted;
  57.  
  58.  
  59. val bp = coords.slice(1, coords.length).map(
  60. c => {
  61. val current = f(coverage(c))
  62. val previous_step = f(coverage.getOrElse(c - 1, 0))
  63. (c, current - previous_step)
  64. })
  65. .filter { case(c, d) => d != 0}
  66. .map {case (c, d) => c}
  67.  
  68. // val qualified = qualified.slice(1, qualified.length).map {
  69. // case (c, b) =>
  70. // c => if (coverage(c) >= read_count_cutoff) (c, 1) else (c, 0))
  71. // val diff = coords.slice(1, coords.length).map(c => (c, (reduction(c) - reduction.getOrElse(c - 1, 0))))
  72. // val bp = diff.filter {case (c, d) => d != 0} map {case (c, d) => c}
  73. bp
  74. }
  75.  
  76. // Specifies the Encoder for the intermediate value type
  77. def bufferEncoder: Encoder[Map[Int, Int]] = Encoders.kryo
  78.  
  79. // Specifies the Encoder for the final output value type
  80. def outputEncoder: Encoder[Array[Int]] = Encoders.kryo
  81. }
  82.  
  83.  
  84. val ds = spark.read.option("sep", "\t").schema(spanSchema).csv("/projects/btl/zxue/assembly_correction/celegans/toy_cov.csv").as[Span]
  85.  
  86. val cc = CalcBreakPoints.toColumn.name("bp")
  87. val res = ds.groupByKey(a => a.ref_name).agg(cc)
  88. res.write.format("parquet").save("./lele.parquet")
Add Comment
Please, Sign In to add comment