Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package vj.polimi.thesis.gmql.operator
- import org.apache.flink.api.common.functions.{RichMapFunction, RichJoinFunction}
- import org.apache.flink.api.common.operators._
- import org.apache.flink.api.scala._
- import vj.polimi.thesis.gmql.model._
- import org.slf4j.LoggerFactory
- import scala.runtime.BoxedUnit
- /**
- * Created by VinhJune on 13/06/15.
- */
- object GmqlOps extends Serializable{
- val log = LoggerFactory.getLogger(this.getClass)
- def join(refSet: GmqlDataSet, expSet: GmqlDataSet, metadataPred: String, genometricPred: String, regionConstr: String, stranded: Boolean): GmqlDataSet = {
- /** **********************************************************
- * FIND COMPATIBLE SAMPLEID PAIRS BASED ON METADATA PREDICATE
- * **********************************************************/
- val metadataPreds = metadataPred.split(",")
- val refMetadata = refSet.metadataSet
- val expMetadata = expSet.metadataSet
- //TODO: control empty predicate case
- val metadataJoinedIdPairs: DataSet[(Long, Long)] = refMetadata.cross(expMetadata) {
- (l: Metadata, r: Metadata) =>
- var ret = false
- for (pred <- metadataPreds) {
- if (l.attrVals.contains(pred) && r.attrVals.contains(pred)) {
- ret = l.attrVals.get(pred).equals(r.attrVals.get(pred))
- }
- }
- if (ret) {
- (l.sampleId, r.sampleId)
- }
- }
- .filter(!_.isInstanceOf[BoxedUnit])
- .asInstanceOf[DataSet[(Long, Long)]]
- metadataJoinedIdPairs.print("matched id pair found from metadata")
- // log.info("test check contains in dataset [true] " + metadataJoinedIdPairs.collect().contains((1172198262, 1301280981)))
- // log.info("test check contains in dataset [true] " + metadataJoinedIdPairs.collect().contains((1172198262, 1301280982)))
- // log.info("test check contains in dataset [false] " + metadataJoinedIdPairs.collect().contains((1172198262, 1301280983)))
- /** *************************************************************
- * JOIN GENOMETRIC DATA BASED ON ID PAIRS FOUND
- * ************************************************************/
- val refRegionSet = refSet.regionSet.sortPartition("left", Order.ASCENDING)
- val expRegionSet = expSet.regionSet.sortPartition("left", Order.ASCENDING)
- log.info("VJlog: refRegion count: " + refRegionSet.count + " exp Region count: " + expRegionSet.count)
- // val refRegionSetGroupedByChr = refRegionSet.groupBy("chr")
- // refRegionSetGroupedByChr.first(1).print("First grouped refRegion")
- val joinedRegionOnChr = refRegionSet.join(expRegionSet).where("chr").equalTo("chr")
- .filter ( joined =>
- metadataJoinedIdPairs.collect.contains((joined._1.sampleId, joined._2.sampleId))
- )
- // .map(joined => (joined._1.sampleId + "." + joined._1.chr, joined._2.sampleId + "." + joined._2.chr))
- // joinedRegionOnChr.print("joined region on chr ")
- val joinedRegionSet = GenometricOps.genometricJoin(refRegionSet, expRegionSet, genometricPred, regionConstr, stranded)
- // val joinedRegionSet = ExecutionEnvironment.getExecutionEnvironment.fromCollection(joinedRegionArray)
- /** **************************************************************
- * BUILD RESULT SET
- * **************************************************************/
- // Build joined metadata for result set
- val joinedMetadataSet = refMetadata
- return new GmqlDataSet(joinedRegionSet, joinedMetadataSet)
- }
- // for calculating ID
- def cantorPairID(a: Long, b: Long): Long = {
- return (a + b) * (a + b + 1) / 2 + a
- }
- def szudzik(a: Long, b: Long): Long = {
- if (a >= b) {
- return a * a + a + b
- }
- else {
- return a + b * b
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment