Guest User

GmqlOps

a guest
Jun 16th, 2015
296
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.70 KB | None | 0 0
  1. package vj.polimi.thesis.gmql.operator
  2.  
  3. import org.apache.flink.api.common.functions.{RichMapFunction, RichJoinFunction}
  4. import org.apache.flink.api.common.operators._
  5. import org.apache.flink.api.scala._
  6. import vj.polimi.thesis.gmql.model._
  7. import org.slf4j.LoggerFactory
  8.  
  9. import scala.runtime.BoxedUnit
  10.  
  11. /**
  12.  * Created by VinhJune on 13/06/15.
  13.  */
  14. object GmqlOps extends Serializable{
  15.   val log = LoggerFactory.getLogger(this.getClass)
  16.  
  17.     def join(refSet: GmqlDataSet, expSet: GmqlDataSet, metadataPred: String, genometricPred: String, regionConstr: String, stranded: Boolean): GmqlDataSet = {
  18.     /** **********************************************************
  19.       * FIND COMPATIBLE SAMPLEID PAIRS BASED ON METADATA PREDICATE
  20.       * **********************************************************/
  21.     val metadataPreds = metadataPred.split(",")
  22.     val refMetadata = refSet.metadataSet
  23.     val expMetadata = expSet.metadataSet
  24.  
  25.     //TODO: control empty predicate case
  26.     val metadataJoinedIdPairs: DataSet[(Long, Long)] = refMetadata.cross(expMetadata) {
  27.       (l: Metadata, r: Metadata) =>
  28.       var ret = false
  29.       for (pred <- metadataPreds) {
  30.         if (l.attrVals.contains(pred) && r.attrVals.contains(pred)) {
  31.           ret = l.attrVals.get(pred).equals(r.attrVals.get(pred))
  32.         }
  33.       }
  34.       if (ret) {
  35.         (l.sampleId, r.sampleId)
  36.       }
  37.     }
  38.       .filter(!_.isInstanceOf[BoxedUnit])
  39.       .asInstanceOf[DataSet[(Long, Long)]]
  40.  
  41.     metadataJoinedIdPairs.print("matched id pair found from metadata")
  42.  
  43. //    log.info("test check contains in dataset [true] " + metadataJoinedIdPairs.collect().contains((1172198262, 1301280981)))
  44. //    log.info("test check contains in dataset [true] " + metadataJoinedIdPairs.collect().contains((1172198262, 1301280982)))
  45. //    log.info("test check contains in dataset [false] " + metadataJoinedIdPairs.collect().contains((1172198262, 1301280983)))
  46.  
  47.     /** *************************************************************
  48.       * JOIN GENOMETRIC DATA BASED ON ID PAIRS FOUND
  49.       * ************************************************************/
  50.     val refRegionSet = refSet.regionSet.sortPartition("left", Order.ASCENDING)
  51.     val expRegionSet = expSet.regionSet.sortPartition("left", Order.ASCENDING)
  52.     log.info("VJlog: refRegion count: " + refRegionSet.count + " exp Region count: " + expRegionSet.count)
  53.  
  54. //    val refRegionSetGroupedByChr = refRegionSet.groupBy("chr")
  55. //    refRegionSetGroupedByChr.first(1).print("First grouped refRegion")
  56.  
  57.  
  58.     val joinedRegionOnChr = refRegionSet.join(expRegionSet).where("chr").equalTo("chr")
  59.       .filter ( joined =>
  60.         metadataJoinedIdPairs.collect.contains((joined._1.sampleId, joined._2.sampleId))
  61.       )
  62. //    .map(joined => (joined._1.sampleId + "." + joined._1.chr, joined._2.sampleId + "." + joined._2.chr))
  63. //    joinedRegionOnChr.print("joined region on chr ")
  64.  
  65.  
  66.  
  67.  
  68.     val joinedRegionSet = GenometricOps.genometricJoin(refRegionSet, expRegionSet, genometricPred, regionConstr, stranded)
  69.  
  70. //    val joinedRegionSet = ExecutionEnvironment.getExecutionEnvironment.fromCollection(joinedRegionArray)
  71.  
  72.     /** **************************************************************
  73.       * BUILD RESULT SET
  74.       * **************************************************************/
  75.     // Build joined metadata for result set
  76.     val joinedMetadataSet = refMetadata
  77.  
  78.  
  79.     return new GmqlDataSet(joinedRegionSet, joinedMetadataSet)
  80.  
  81.   }
  82.  
  83.   // for calculating ID
  84.   def cantorPairID(a: Long, b: Long): Long = {
  85.     return (a + b) * (a + b + 1) / 2 + a
  86.   }
  87.   def szudzik(a: Long, b: Long): Long = {
  88.     if (a >= b) {
  89.       return a * a + a + b
  90.     }
  91.     else {
  92.       return a + b * b
  93.     }
  94.   }
  95. }
Advertisement
Add Comment
Please, Sign In to add comment