Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package vj.polimi.thesis.gmql.model
- import org.apache.flink.api.scala._
- /**
- * Created by VinhJune on 08/02/15.
- */
- class GmqlSampleSet(env: ExecutionEnvironment, outputDataPath: String) extends Serializable {
- // read CSV3 metadata
- val csv3MetadataSet = env
- .readCsvFile[Tuple3[String,String,String]](
- filePath = outputDataPath + ".meta",
- fieldDelimiter = '\t',
- ignoreFirstLine = false
- )
- // get the headers for value fields from CSV3 metadata
- val gmqlHeaderSet = csv3MetadataSet.first(1) // dataset to be broadcasted
- // get the metadata set from CSV3 metadata
- val metadataWithIdSet: DataSet[MetadataWithId] = csv3MetadataSet
- .map {
- line =>
- val id = line._1.toString.toLong
- val attrVal = new AttrVal(line._2.toString, line._3.toString)
- val ret = new MetadataWithId(id, attrVal)
- ret
- }
- val metadataSet = metadataWithIdSet
- .map {
- m1 =>
- new MetadataSet(m1.id, Array(m1.attrVal))
- }
- .groupBy("id")
- .reduce {
- (m1, m2) =>
- new MetadataSet(m1.id, m1.attrValList ++ m2.attrValList)
- }
- // map from CSV3 to GmqlRegion
- val regionWithIdSet: DataSet[RegionWithId] = env
- .readCsvFile[Tuple3[String,String,String]](
- filePath = outputDataPath,
- fieldDelimiter = '\t',
- ignoreFirstLine = false
- )
- .map (new RegionWithIdFromCSV3).withBroadcastSet(gmqlHeaderSet, "csv3HeaderSet")
- val regionSet = regionWithIdSet
- .map {
- r =>
- new RegionSet(
- r.id,
- Array(new Region(r.coordinates, r.values))
- )
- }
- .groupBy("id")
- .reduce {
- (r1, r2) =>
- new RegionSet(r1.id, r1.regionList ++ r2.regionList)
- }
- val sampleSet = regionSet.join(metadataSet).where("id").equalTo("id")
- /*******************************************************************
- GMQL OPERATORS
- *******************************************************************/
- def select() {
- //TODO
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement