Advertisement
Guest User

GmqlSampleSet.scala

a guest
Feb 10th, 2015
223
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.88 KB | None | 0 0
  1. package vj.polimi.thesis.gmql.model
  2.  
  3. import org.apache.flink.api.scala._
  4.  
  5. /**
  6.  * Created by VinhJune on 08/02/15.
  7.  */
  8. class GmqlSampleSet(env: ExecutionEnvironment, outputDataPath: String) extends Serializable {
  9.     // read CSV3 metadata
  10.     val csv3MetadataSet = env
  11.         .readCsvFile[Tuple3[String,String,String]](
  12.         filePath =  outputDataPath + ".meta",
  13.         fieldDelimiter = '\t',
  14.         ignoreFirstLine = false
  15.         )
  16.  
  17.     // get the headers for value fields from CSV3 metadata
  18.     val gmqlHeaderSet = csv3MetadataSet.first(1)   // dataset to be broadcasted
  19.  
  20.     // get the metadata set from CSV3 metadata
  21.     val metadataWithIdSet: DataSet[MetadataWithId] = csv3MetadataSet
  22.         .map {
  23.             line =>
  24.             val id = line._1.toString.toLong
  25.             val attrVal = new AttrVal(line._2.toString, line._3.toString)
  26.             val ret = new MetadataWithId(id, attrVal)
  27.             ret
  28.         }
  29.  
  30.     val metadataSet = metadataWithIdSet
  31.         .map {
  32.             m1 =>
  33.             new MetadataSet(m1.id, Array(m1.attrVal))
  34.         }
  35.         .groupBy("id")
  36.         .reduce {
  37.             (m1, m2) =>
  38.             new MetadataSet(m1.id, m1.attrValList ++ m2.attrValList)
  39.         }
  40.  
  41.     // map from CSV3 to GmqlRegion
  42.     val regionWithIdSet: DataSet[RegionWithId] = env
  43.         .readCsvFile[Tuple3[String,String,String]](
  44.         filePath =  outputDataPath,
  45.         fieldDelimiter = '\t',
  46.         ignoreFirstLine = false
  47.         )
  48.         .map (new RegionWithIdFromCSV3).withBroadcastSet(gmqlHeaderSet, "csv3HeaderSet")
  49.  
  50.     val regionSet = regionWithIdSet
  51.         .map {
  52.             r =>
  53.             new RegionSet(
  54.                 r.id,
  55.                 Array(new Region(r.coordinates, r.values))
  56.             )
  57.         }
  58.         .groupBy("id")
  59.         .reduce {
  60.             (r1, r2) =>
  61.             new RegionSet(r1.id, r1.regionList ++ r2.regionList)
  62.         }
  63.  
  64.     val sampleSet = regionSet.join(metadataSet).where("id").equalTo("id")
  65.  
  66.  
  67.     /*******************************************************************
  68.         GMQL OPERATORS
  69.   *******************************************************************/
  70.     def select() {
  71.         //TODO
  72.     }
  73. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement