Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package thetajoin
- import org.apache.spark.rdd.RDD
- import org.slf4j.LoggerFactory
- import org.apache.spark.sql.catalyst.expressions.Ascending
- import org.scalatest.enablers.Length
- import breeze.linalg.sum
- import org.apache.hadoop.hdfs.tools.GetConf
- class ThetaJoin(numR: Long, numS: Long, reducers: Int, bucketsize: Int) extends java.io.Serializable {
- val logger = LoggerFactory.getLogger("ThetaJoin")
- // random samples for each relation
- // helper structures, you are allowed
- // not to use them
- var horizontalBoundaries = Array[Int]()
- var verticalBoundaries = Array[Int]()
- // number of values that fall in each partition
- // helper structures, you are allowed
- // not to use them
- var horizontalCounts = Array[Int]()
- var verticalCounts = Array[Int]()
- /*
- * this method gets as input two datasets and the condition
- * and returns an RDD with the result by projecting only
- * attr1 and attr2
- * You are not allowed to change the definition of this function.
- * */
- def theta_join(dataset1: Dataset, dataset2: Dataset, attr1:String, attr2:String, op:String): RDD[(Int, Int)] = {
- val schema1 = dataset1.getSchema
- val schema2 = dataset2.getSchema
- val rdd1 = dataset1.getRDD
- val rdd2 = dataset2.getRDD
- val index1 = schema1.indexOf(attr1)
- val index2 = schema2.indexOf(attr2)
- println("First dataset size: "+rdd1.count())
- println("Second dataset size: "+rdd2.count())
- // extract random sample od cr and cs values and create histogram from that
- // TODO: we need the number of reducers here to calculate cr and cs? -> GIVEN IN THE CLASS ABOVE
- val r = 10
- val r_count = rdd1.count()
- val s_count = rdd2.count()
- val cs = s_count/Math.sqrt((s_count*r_count)/r)
- val cr = r_count/Math.sqrt((s_count*r_count)/r)
- println("Cs: "+cs)
- println("Cr: "+cr)
- //sort datasets by join attribute
- val rdd1_sorted = rdd1.sortBy(_.apply(index1).toString().toInt)
- val rdd2_sorted = rdd2.sortBy(_.apply(index2).toString().toInt)
- // choose approx. 30% of the data from the dataset to create histogram
- val seed = 4;
- val part = 3;
- val sample1 = rdd1_sorted.map(row => row.get(index1).toString().toInt).takeSample(false, Math.floor(s_count.toDouble/part).toInt, seed)
- val sample2 = rdd2_sorted.map(row => row.get(index2).toString().toInt).takeSample(false, Math.floor(r_count.toDouble/part).toInt, seed)
- println("Min: "+ sample1.reduceLeft(_ min _))
- println("Max: "+ sample1.reduceLeft(_ max _))
- // choose nth rows to be the boundaries - decide what to take for n
- // sort and discard the duplicates
- val n = 3
- val bucket_boundaries1 = sample1.zipWithIndex.filter(_._2 % n == 0).map(_._1).sorted.distinct
- //println("Bucket boundaries size: "+ bucket_boundaries1.length)
- var p = List[(Int, Int)]()
- for ( (value,i) <- bucket_boundaries1.view.zipWithIndex){
- if(i>0){
- p = p:+((bucket_boundaries1(i-1), bucket_boundaries1(i)))
- }
- }
- // add boundaries for infinite
- p = p:+((bucket_boundaries1(bucket_boundaries1.length-1), -1))
- // create count histogram
- val r_values = p.map({ case (val1, val2) => {
- // count all values from rdd1 that should go in this bucket
- var count = 0.toLong
- var minBucketIndex = 0.toLong
- var maxBucketIndex = 0.toLong
- if(val2 == -1){
- //var filteredValues = rdd1_sorted.zipWithIndex().filter(row => (row.))
- var filteredValues = rdd1_sorted.zipWithIndex.filter(row => (row._1.get(index1).toString().toInt>=val1))
- minBucketIndex = filteredValues.map(row => row._2).reduce(_ min _)
- maxBucketIndex = filteredValues.map(row => row._2).reduce(_ max _)
- }
- else{
- var filteredValues= rdd1_sorted.zipWithIndex.filter(row => (row._1.get(index1).toString().toInt>=val1 && row._1.get(index1).toString().toInt<val2) )
- // TODO: here you suppose that the index of the rdd is the first row!!!!
- minBucketIndex = filteredValues.map(row => row._2).reduce(_ min _)
- maxBucketIndex = filteredValues.map(row => row._2).reduce(_ max _)
- }
- ((val1, val2), (minBucketIndex, maxBucketIndex))
- }})
- // r_values.foreach(println)
- println("------------------------------")
- val bucket_boundaries2 = sample2.zipWithIndex.filter(_._2 % n == 0).map(_._1).sorted.distinct
- //println("Bucket boundaries size: "+ bucket_boundaries2.length)
- p = List[(Int, Int)]()
- for ( (value,i) <- bucket_boundaries2.view.zipWithIndex){
- if(i>0){
- p = p:+((bucket_boundaries2(i-1), bucket_boundaries2(i)))
- }
- }
- // add boundaries for infinite
- p = p:+((bucket_boundaries2(bucket_boundaries2.length-1), -1))
- // create count histogram
- val s_values = p.map({ case (val1, val2) => {
- // count all values from rdd1 that should go in this bucket
- var count = 0.toLong
- var minBucketIndex = 0.toLong
- var maxBucketIndex = 0.toLong
- if(val2 == -1){
- //count = rdd1_sorted.filter(row => (row.get(index1).toString().toInt>=val1) ).count()
- //var filteredValues = rdd1_sorted.zipWithIndex().filter(row => (row.))
- var filteredValues = rdd2_sorted.zipWithIndex.filter(row => (row._1.get(index2).toString().toInt>=val1))
- minBucketIndex = filteredValues.map(row => row._2).reduce(_ min _)
- maxBucketIndex = filteredValues.map(row => row._2).reduce(_ max _)
- }
- else{
- var filteredValues= rdd2_sorted.zipWithIndex.filter(row => (row._1.get(index2).toString().toInt>=val1 && row._1.get(index2).toString().toInt<val2) )
- // TODO: here you suppose that the index of the rdd is the first row!!!!
- minBucketIndex = filteredValues.map(row => row._2).reduce(_ min _)
- maxBucketIndex = filteredValues.map(row => row._2).reduce(_ max _)
- }
- ((val1, val2), (minBucketIndex, maxBucketIndex))
- }})
- println("------------------------------")
- var current_row = 0
- val divisors = divisorGenerator(this.bucketsize)
- var region_num = this.reducers
- var final_regions_list = List[((Int, Int), (Int, Int))]()
- while(current_row < r_count.toInt){
- var res = getBestStripeRegions(current_row, divisors, this.bucketsize, r_count.toInt, s_count.toInt, r_values, s_values, op )
- var best_stripe_regions = res._1
- // save best regions
- best_stripe_regions.foreach(value => {
- final_regions_list = final_regions_list:+(value)
- })
- var last_row_index = res._2
- // update current row
- current_row = last_row_index + 1
- }
- println("-------------RESULT FOR BEST REGIONS:------------ ")
- //final_regions_list.foreach(println)
- println("Regions number "+final_regions_list.length)
- // map each region to the real dataset values
- var final_dataset_regions = final_regions_list.map(tuple => {
- // filter real attribute values from the real dataset
- var r_attr = rdd1_sorted.zipWithIndex.filter(row=> (row._2 >= tuple._1._1) && (row._2 <= tuple._1._2)).map(row => row._1.get(index1).toString().toInt).collect()
- var s_attr = rdd2_sorted.zipWithIndex.filter(row => (row._2 >= tuple._2._1 && row._2 <= tuple._2._2) ).map(row =>row._1.get(index1).toString().toInt).collect()
- // TODO: SHOULD DO DISTINCT HERE, OR WHAT? OR IN THIS PART I SHOULD NOT TAKE THE VALUES INTO ACCOUNT
- // this should be after all the input-optimizing alg
- (r_attr.toList, s_attr.toList)
- })
- // map each region with exactly one reducer using mod and number of reducers
- var final_dataset = final_dataset_regions.zipWithIndex.map(row => (row._2%this.reducers, row._1))
- var new_something = org.apache.spark.SparkContext.getOrCreate().parallelize(final_dataset.toSeq).map(row => (row._1, row._2))
- // map the data for each reducer
- // create list of lists, or how to do this?! -> nope, first join all the r and s values
- // for each reducer and then perform local theta join
- // or maybe use COMBINEBYKEY?
- //var what = new_something.reduceByKey({case (p, q)=> (p._1::: q._1, p._2:::q._2)})//.map()
- // ovo dole daje vise redova sa istim kljucevima, vise od dve liste u rezultatu, itd..
- var what = new_something.groupByKey()//.map({case (k, v) => (k, (v.toList))})
- what.foreach(println)
- //new_something.map({case (k, v) => (k, (v._1))})
- // new_something.reduceByKey({case(k, v) => (k, local_thetajoin(v._1, v._2, op)) })//.foreach(println)
- // TODO: try combine by key...
- // join all theta-join results
- // return final result
- null
- }
- // select best dimension for regions for the stripe starting from row r, and using proper divisors of maxInput
- def getBestStripeRegions(row: Int, divisors: List[Int], maxInput: Int, rows_number: Int, columns_number: Int ,r_histogram:List[((Int, Int), (Long, Long))],
- s_histogram: List[((Int, Int), (Long, Long))], op: String ): (List[((Int, Int), (Int, Int))], Int) = {
- var best_regions_list = List[((Int, Int), (Int, Int))]()
- var best_cost = 0.toLong
- var r = row
- for(r_size <- divisors){
- var r_end = r+r_size-1
- if(r_end >= rows_number){
- r_end = rows_number-1
- }
- // check if it is the last row and it is not possible to get int for col_size!! do the calculations!!!
- var col_size = math.floor(maxInput/r_size).toInt
- println("Row size: "+r_size)
- println("Column size: "+col_size)
- var fixedStripeRegions = getFixedStripeRegions(r, r_end, col_size, columns_number, r_histogram, s_histogram, op)
- var current_cost = fixedStripeRegions._2
- if(current_cost>best_cost){
- println("Old best cost: "+best_cost+", new best cost "+current_cost)
- best_regions_list = fixedStripeRegions._1
- best_cost = current_cost
- }
- }
- var last_row_index = best_regions_list(0)._1._2
- // return best found regions list given row offset and divisors
- (best_regions_list, last_row_index)
- }
- // return the overall cost and regions for the fixed stripe dimensions (eg. for 250x4, or 500x2)
- def getFixedStripeRegions(r: Int, r_end: Int, col_size: Int, columns_number: Int, r_histogram: List[((Int, Int),
- (Long, Long))], s_histogram: List[((Int, Int), (Long, Long))], op: String): (List[((Int, Int), (Int, Int))], Long) ={
- var overall_cost = 0.toLong
- var currentColumn = 0
- var regions_list = List[((Int, Int), (Int, Int))]()
- var row_boundaries = (r, r_end) // eg size 5 has indexes 0 to 4
- while(currentColumn < columns_number){
- var end_col = currentColumn+col_size-1
- if(end_col>=columns_number){
- // last region can be smaller if there are no columns
- end_col = columns_number-1
- }
- var col_boundaries = (currentColumn, end_col)
- var currentCost = getRegionCost(row_boundaries, col_boundaries, r_histogram, s_histogram, op)
- overall_cost = overall_cost + currentCost
- if(currentCost>0){
- // regions with cost=0 do not need to be mapped to the output!
- regions_list = regions_list:+((row_boundaries), (col_boundaries))
- }
- currentColumn = currentColumn+col_size
- }
- (regions_list, overall_cost)
- }
- def getRegionCost(r_boundaries: (Int, Int), s_boundaries: (Int, Int), r_histogram:List[((Int, Int), (Long, Long))], s_histogram:List[((Int, Int), (Long, Long))],op:String): Long = {
- val r_buckets = getRegionBucketBoundaries(r_boundaries, r_histogram)
- val s_buckets = getRegionBucketBoundaries(s_boundaries, s_histogram)
- var cost = 0
- r_buckets.foreach(r_bucket => {
- s_buckets.foreach(s_bucket => {
- //println("Check for the buckets: "+ r_bucket._1 + ", "+s_bucket._1)
- val hasOutputTuples = bucketsSatisfyCondition(r_bucket._1, s_bucket._1, op)
- if (hasOutputTuples){
- //println("Goes to result!")
- // if exists at least one output tuple, then calculate the cost for the region and add it to the cumulative cost
- var currentCost = (r_bucket._2._2 - r_bucket._2._1+1).toInt*(s_bucket._2._2 - s_bucket._2._1+1).toInt
- //println("Adding cost " + currentCost)
- cost = cost + currentCost
- }
- })
- })
- cost
- }
- // e.g. (6, 19) are row indexes for which we are looking the corresponding histogram buckets
- // output example [((6,10), (6,23)), ((10, 14), (24, 250))]
- def getRegionBucketBoundaries(rowIndexBoundaries: (Int, Int), histogram: List[((Int, Int), (Long, Long))]): List[((Int, Int), (Long, Long))] = {
- val minIndex = rowIndexBoundaries._1
- val maxIndex = rowIndexBoundaries._2
- var retVal = List[((Int, Int), (Long, Long))]()
- // find the bucket for which indexBoundaries are in it
- var ind: Int = minIndex
- while (ind <= maxIndex){
- val resultBucket = histogram.filter({ case ((k1, k2), (v1, v2)) =>
- (ind>=v1 && ind<=v2)
- })
- val key = resultBucket(0)._1 // suppose that the index fits in just one histogram bucket
- var rowOffset = resultBucket(0)._2._2 - ind // find the number of following rows that are in the same histogram bucket
- if((ind+rowOffset) > maxIndex){
- rowOffset = maxIndex-ind // prevent to go out of the region boundaries
- }
- retVal = retVal:+((key),(ind.toLong, (ind+rowOffset).toLong))
- ind = ind + rowOffset.toInt + 1;
- }
- retVal
- }
- def bucketsSatisfyCondition(bucket1: (Int, Int), bucket2: (Int, Int), op: String) : Boolean = {
- val bucketR = bucket1
- val bucketS = bucket2
- op match {
- // be careful, the bucket (v1,v2) does not include values for second boundary v2!
- case "=" => ((bucket1._1>=bucket2._1 && bucket1._1<bucket2._2) || (bucket1._2>bucket2._1 && bucket1._2<bucket2._2)) // bucket1 has at least one value in range of bucket2
- case "<" => bucketR._1 < bucketS._2 // enough that the smallest R value is smaller than the biggest S value
- case "<=" => bucketR._1 <= bucketS._2
- case ">" => (bucketR._2-1) > bucketS._1 // enough that the biggest value of R is bigger than the smallest S value
- case ">=" => (bucketR._2-1) >= bucketS._1
- }
- }
- def test_helper_functions(r_histogram: List[((Int, Int), (Long, Long))], s_histogram: List[((Int, Int), (Long, Long))]) {
- // println("For the smallest datasets!!!")
- getRegionBucketBoundaries((0, 149), r_histogram)//.foreach(println)
- // println("S buckets for (383, 415) index")
- getRegionBucketBoundaries((383, 415), s_histogram)//.foreach(println)
- // get region costs - test
- var cost = getRegionCost((0, 136), (0,150), r_histogram, s_histogram, "=")
- //println("Cost = "+cost) //should be 10688
- assert(cost==10688)
- cost = getRegionCost((0, 136), (0,150), r_histogram, s_histogram, "<")
- //println("Cost < "+cost)
- assert(cost==16860)
- cost = getRegionCost((0, 136), (0,150), r_histogram, s_histogram, ">")
- //println("Cost > "+cost)
- assert(cost==3827)
- }
- def divisorGenerator(n: Int): List[Int] = {
- var results = List[Int]()
- for (i <- 1 to (n/2+1)) {
- if (n%i == 0) {
- results = results:+(i)
- }
- }
- results
- }
- def local_thetajoin(dat1:List[(Int)], dat2:List[(Int)], op:String) : List[(Int, Int)] = {
- var res = List[(Int, Int)]()
- for (d1 <- dat1){
- for(d2<-dat2){
- if(checkCondition(d1, d2, op)){
- res = res :+ (d1, d2)
- }
- }
- }
- res //.iterator
- }
- /*
- * this method takes as input two lists of values that belong to the same partition
- * and performs the theta join on them. Both datasets are lists of tuples (Int, Int)
- * where ._1 is the partition number and ._2 is the value.
- * Of course you might change this function (both definition and body) if it does not
- * fit your needs :)
- * */
- def local_thetajoin_2(dat1:Iterator[(Int, Int)], dat2:Iterator[(Int, Int)], op:String) : Iterator[(Int, Int)] = {
- var res = List[(Int, Int)]()
- var dat2List = dat2.toList
- while(dat1.hasNext) {
- val row1 = dat1.next()
- for(row2 <- dat2List) {
- if(checkCondition(row1._2, row2._2, op)) {
- res = res :+ (row1._2, row2._2)
- }
- }
- }
- res.iterator
- }
- def checkCondition(value1: Int, value2: Int, op:String): Boolean = {
- op match {
- case "=" => value1 == value2
- case "<" => value1 < value2
- case "<=" => value1 <= value2
- case ">" => value1 > value2
- case ">=" => value1 >= value2
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement