Advertisement
Guest User

Untitled

a guest
May 16th, 2018
117
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 16.84 KB | None | 0 0
  1. package thetajoin
  2.  
  3. import org.apache.spark.rdd.RDD
  4. import org.slf4j.LoggerFactory
  5. import org.apache.spark.sql.catalyst.expressions.Ascending
  6. import org.scalatest.enablers.Length
  7. import breeze.linalg.sum
  8. import org.apache.hadoop.hdfs.tools.GetConf
  9.  
  10. class ThetaJoin(numR: Long, numS: Long, reducers: Int, bucketsize: Int) extends java.io.Serializable {
  11.   val logger = LoggerFactory.getLogger("ThetaJoin")    
  12.  
  13.   // random samples for each relation
  14.   // helper structures, you are allowed
  15.   // not to use them
  16.   var horizontalBoundaries = Array[Int]()
  17.   var verticalBoundaries = Array[Int]()
  18.  
  19.   // number of values that fall in each partition
  20.   // helper structures, you are allowed
  21.   // not to use them
  22.   var horizontalCounts = Array[Int]()
  23.   var verticalCounts = Array[Int]()      
  24.  
  25.   /*
  26.    * this method gets as input two datasets and the condition
  27.    * and returns an RDD with the result by projecting only
  28.    * attr1 and attr2
  29.    * You are not allowed to change the definition of this function.
  30.    * */  
  31.   def theta_join(dataset1: Dataset, dataset2: Dataset, attr1:String, attr2:String, op:String): RDD[(Int, Int)] = {
  32.     val schema1 = dataset1.getSchema
  33.     val schema2 = dataset2.getSchema        
  34.    
  35.     val rdd1 = dataset1.getRDD
  36.     val rdd2 = dataset2.getRDD
  37.    
  38.     val index1 = schema1.indexOf(attr1)
  39.     val index2 = schema2.indexOf(attr2)        
  40.    
  41.  
  42.     println("First dataset size: "+rdd1.count())
  43.     println("Second dataset size: "+rdd2.count())
  44.    
  45.     // extract random sample od cr and cs values and create histogram from that
  46.     // TODO: we need the number of reducers here to calculate cr and cs? -> GIVEN IN THE CLASS ABOVE
  47.     val r = 10
  48.     val r_count = rdd1.count()
  49.     val s_count = rdd2.count()
  50.     val cs = s_count/Math.sqrt((s_count*r_count)/r)
  51.     val cr = r_count/Math.sqrt((s_count*r_count)/r)
  52.     println("Cs: "+cs)
  53.     println("Cr: "+cr)
  54.    
  55.     //sort datasets by join attribute
  56.     val rdd1_sorted = rdd1.sortBy(_.apply(index1).toString().toInt)
  57.     val rdd2_sorted = rdd2.sortBy(_.apply(index2).toString().toInt)
  58.    
  59.    
  60.     // choose approx. 30% of the data from the dataset to create histogram
  61.     val seed = 4;
  62.     val part = 3;
  63.     val sample1 = rdd1_sorted.map(row => row.get(index1).toString().toInt).takeSample(false, Math.floor(s_count.toDouble/part).toInt, seed)
  64.     val sample2 = rdd2_sorted.map(row => row.get(index2).toString().toInt).takeSample(false, Math.floor(r_count.toDouble/part).toInt, seed)
  65.  
  66.    
  67.     println("Min: "+ sample1.reduceLeft(_ min _))
  68.     println("Max: "+ sample1.reduceLeft(_ max _))
  69.    
  70.    
  71.     // choose nth rows to be the boundaries - decide what to take for n
  72.     // sort and discard the duplicates
  73.     val n = 3
  74.     val bucket_boundaries1 = sample1.zipWithIndex.filter(_._2 % n == 0).map(_._1).sorted.distinct
  75.     //println("Bucket boundaries size: "+ bucket_boundaries1.length)
  76.    
  77.     var p =  List[(Int, Int)]()
  78.     for ( (value,i) <- bucket_boundaries1.view.zipWithIndex){
  79.       if(i>0){
  80.         p = p:+((bucket_boundaries1(i-1), bucket_boundaries1(i)))
  81.       }
  82.     }
  83.     // add boundaries for infinite
  84.     p = p:+((bucket_boundaries1(bucket_boundaries1.length-1), -1))
  85.    
  86.     // create count histogram
  87.     val r_values = p.map({ case (val1, val2) => {
  88.       // count all values from rdd1 that should go in this bucket
  89.       var count = 0.toLong
  90.       var minBucketIndex = 0.toLong
  91.       var maxBucketIndex = 0.toLong
  92.       if(val2 == -1){
  93.          //var filteredValues = rdd1_sorted.zipWithIndex().filter(row => (row.))
  94.          var filteredValues = rdd1_sorted.zipWithIndex.filter(row => (row._1.get(index1).toString().toInt>=val1))
  95.          minBucketIndex = filteredValues.map(row => row._2).reduce(_ min _)
  96.          maxBucketIndex = filteredValues.map(row => row._2).reduce(_ max _)
  97.       }
  98.       else{
  99.          var filteredValues= rdd1_sorted.zipWithIndex.filter(row => (row._1.get(index1).toString().toInt>=val1 && row._1.get(index1).toString().toInt<val2) )
  100.          // TODO: here you suppose that the index of the rdd is the first row!!!!
  101.          minBucketIndex = filteredValues.map(row => row._2).reduce(_ min _)
  102.          maxBucketIndex = filteredValues.map(row => row._2).reduce(_ max _)
  103.       }
  104.       ((val1, val2), (minBucketIndex, maxBucketIndex))
  105.     }})
  106.    
  107.    
  108.    // r_values.foreach(println)
  109.    
  110.    println("------------------------------")
  111.  
  112.    val bucket_boundaries2 = sample2.zipWithIndex.filter(_._2 % n == 0).map(_._1).sorted.distinct
  113.    //println("Bucket boundaries size: "+ bucket_boundaries2.length)
  114.    
  115.    p =  List[(Int, Int)]()
  116.     for ( (value,i) <- bucket_boundaries2.view.zipWithIndex){
  117.       if(i>0){
  118.         p = p:+((bucket_boundaries2(i-1), bucket_boundaries2(i)))
  119.       }
  120.     }
  121.     // add boundaries for infinite
  122.     p = p:+((bucket_boundaries2(bucket_boundaries2.length-1), -1))
  123.    
  124.     // create count histogram
  125.     val s_values = p.map({ case (val1, val2) => {
  126.       // count all values from rdd1 that should go in this bucket
  127.       var count = 0.toLong
  128.       var minBucketIndex = 0.toLong
  129.       var maxBucketIndex = 0.toLong
  130.       if(val2 == -1){
  131.          //count = rdd1_sorted.filter(row => (row.get(index1).toString().toInt>=val1) ).count()
  132.          //var filteredValues = rdd1_sorted.zipWithIndex().filter(row => (row.))
  133.          var filteredValues = rdd2_sorted.zipWithIndex.filter(row => (row._1.get(index2).toString().toInt>=val1))
  134.          minBucketIndex = filteredValues.map(row => row._2).reduce(_ min _)
  135.          maxBucketIndex = filteredValues.map(row => row._2).reduce(_ max _)
  136.       }
  137.       else{
  138.          var filteredValues= rdd2_sorted.zipWithIndex.filter(row => (row._1.get(index2).toString().toInt>=val1 && row._1.get(index2).toString().toInt<val2) )
  139.          // TODO: here you suppose that the index of the rdd is the first row!!!!
  140.          minBucketIndex = filteredValues.map(row => row._2).reduce(_ min _)
  141.          maxBucketIndex = filteredValues.map(row => row._2).reduce(_ max _)
  142.       }
  143.       ((val1, val2), (minBucketIndex, maxBucketIndex))
  144.     }})
  145.    
  146.     println("------------------------------")
  147.  
  148.    
  149.     var current_row = 0
  150.     val divisors = divisorGenerator(this.bucketsize)
  151.     var region_num = this.reducers
  152.     var final_regions_list = List[((Int, Int), (Int, Int))]()
  153.    
  154.     while(current_row < r_count.toInt){
  155.      
  156.       var res = getBestStripeRegions(current_row, divisors, this.bucketsize, r_count.toInt, s_count.toInt, r_values, s_values, op )
  157.       var best_stripe_regions = res._1
  158.       // save best regions
  159.       best_stripe_regions.foreach(value => {
  160.         final_regions_list = final_regions_list:+(value)
  161.       })
  162.       var last_row_index = res._2  
  163.       // update current row
  164.       current_row = last_row_index + 1
  165.     }
  166.    
  167.     println("-------------RESULT FOR BEST REGIONS:------------ ")
  168.     //final_regions_list.foreach(println)
  169.     println("Regions number "+final_regions_list.length)
  170.    
  171.     // map each region to the real dataset values
  172.     var final_dataset_regions = final_regions_list.map(tuple => {
  173.       // filter real attribute values from the real dataset
  174.       var r_attr = rdd1_sorted.zipWithIndex.filter(row=> (row._2 >= tuple._1._1) && (row._2 <= tuple._1._2)).map(row => row._1.get(index1).toString().toInt).collect()
  175.       var s_attr = rdd2_sorted.zipWithIndex.filter(row => (row._2 >= tuple._2._1 && row._2 <= tuple._2._2) ).map(row =>row._1.get(index1).toString().toInt).collect()
  176.      
  177.       // TODO: SHOULD DO DISTINCT HERE, OR WHAT? OR IN THIS PART I SHOULD NOT TAKE THE VALUES INTO ACCOUNT
  178.       // this should be after all the input-optimizing alg
  179.       (r_attr.toList, s_attr.toList)
  180.     })
  181.    
  182.     // map each region with exactly one reducer using mod and number of reducers
  183.     var final_dataset = final_dataset_regions.zipWithIndex.map(row => (row._2%this.reducers, row._1))
  184.  
  185.     var new_something = org.apache.spark.SparkContext.getOrCreate().parallelize(final_dataset.toSeq).map(row => (row._1, row._2))
  186.    
  187.     // map the data for each reducer
  188.     // create list of lists, or how to do this?! -> nope, first join all the r and s values
  189.     // for each reducer and then perform local theta join
  190.     // or maybe use COMBINEBYKEY?
  191.     //var what = new_something.reduceByKey({case (p, q)=> (p._1::: q._1, p._2:::q._2)})//.map()
  192.     // ovo dole daje vise redova sa istim kljucevima, vise od dve liste u rezultatu, itd..
  193.     var what = new_something.groupByKey()//.map({case (k, v) => (k, (v.toList))})
  194.     what.foreach(println)
  195.    
  196.     //new_something.map({case (k, v) => (k, (v._1))})
  197.     // new_something.reduceByKey({case(k, v) => (k, local_thetajoin(v._1, v._2, op)) })//.foreach(println)
  198.    
  199.     // TODO: try combine by key...
  200.    
  201.     // join all theta-join results
  202.     // return final result
  203.     null
  204.   }  
  205.  
  206.   // select best dimension for regions for the stripe starting from row r, and using proper divisors of maxInput
  207.   def getBestStripeRegions(row: Int, divisors: List[Int], maxInput: Int, rows_number: Int, columns_number: Int ,r_histogram:List[((Int, Int), (Long, Long))],
  208.       s_histogram: List[((Int, Int), (Long, Long))], op: String ): (List[((Int, Int), (Int, Int))], Int) = {
  209.     var best_regions_list = List[((Int, Int), (Int, Int))]()
  210.     var best_cost = 0.toLong
  211.     var r = row
  212.  
  213.     for(r_size <- divisors){
  214.       var r_end = r+r_size-1
  215.       if(r_end >= rows_number){
  216.         r_end = rows_number-1
  217.       }
  218.       // check if it is the last row and it is not possible to get int for col_size!! do the calculations!!!
  219.       var col_size = math.floor(maxInput/r_size).toInt
  220.       println("Row size: "+r_size)
  221.       println("Column size: "+col_size)
  222.      
  223.       var fixedStripeRegions = getFixedStripeRegions(r, r_end, col_size, columns_number, r_histogram, s_histogram, op)
  224.       var current_cost = fixedStripeRegions._2
  225.      
  226.       if(current_cost>best_cost){
  227.         println("Old best cost: "+best_cost+", new best cost "+current_cost)
  228.         best_regions_list = fixedStripeRegions._1
  229.         best_cost = current_cost
  230.       }        
  231.     }
  232.    
  233.     var last_row_index = best_regions_list(0)._1._2
  234.     // return best found regions list given row offset and divisors
  235.     (best_regions_list, last_row_index)
  236.   }
  237.  
  238.   // return the overall cost and regions for the fixed stripe dimensions (eg. for 250x4, or 500x2)
  239.   def getFixedStripeRegions(r: Int, r_end: Int, col_size: Int, columns_number: Int, r_histogram: List[((Int, Int),
  240.       (Long, Long))], s_histogram: List[((Int, Int), (Long, Long))], op: String): (List[((Int, Int), (Int, Int))], Long)  ={
  241.    
  242.     var overall_cost = 0.toLong
  243.     var currentColumn = 0
  244.     var regions_list = List[((Int, Int), (Int, Int))]()
  245.     var row_boundaries = (r, r_end) // eg size 5 has indexes 0 to 4
  246.    
  247.     while(currentColumn < columns_number){
  248.       var end_col = currentColumn+col_size-1
  249.       if(end_col>=columns_number){
  250.         // last region can be smaller if there are no columns
  251.         end_col = columns_number-1
  252.       }
  253.       var col_boundaries = (currentColumn, end_col)
  254.       var currentCost = getRegionCost(row_boundaries, col_boundaries, r_histogram, s_histogram, op)
  255.       overall_cost = overall_cost + currentCost
  256.       if(currentCost>0){
  257.         // regions with cost=0 do not need to be mapped to the output!
  258.         regions_list = regions_list:+((row_boundaries), (col_boundaries))
  259.       }
  260.      
  261.      
  262.       currentColumn = currentColumn+col_size
  263.     }
  264.  
  265.     (regions_list, overall_cost)
  266.   }
  267.  
  268.  
  269.   def getRegionCost(r_boundaries: (Int, Int), s_boundaries: (Int, Int), r_histogram:List[((Int, Int), (Long, Long))], s_histogram:List[((Int, Int), (Long, Long))],op:String): Long = {
  270.  
  271.     val r_buckets = getRegionBucketBoundaries(r_boundaries, r_histogram)
  272.     val s_buckets = getRegionBucketBoundaries(s_boundaries, s_histogram)
  273.    
  274.     var cost = 0
  275.     r_buckets.foreach(r_bucket => {
  276.       s_buckets.foreach(s_bucket => {
  277.           //println("Check for the buckets: "+ r_bucket._1 + ", "+s_bucket._1)
  278.           val hasOutputTuples = bucketsSatisfyCondition(r_bucket._1, s_bucket._1, op)
  279.           if (hasOutputTuples){
  280.             //println("Goes to result!")
  281.             // if exists at least one output tuple, then calculate the cost for the region and add it to the cumulative cost
  282.             var currentCost =  (r_bucket._2._2 - r_bucket._2._1+1).toInt*(s_bucket._2._2 - s_bucket._2._1+1).toInt
  283.             //println("Adding cost " + currentCost)
  284.             cost = cost + currentCost
  285.           }
  286.       })
  287.     })
  288.     cost
  289.   }
  290.  
  291.   // e.g. (6, 19) are row indexes for which we are looking the corresponding histogram buckets
  292.   // output example [((6,10), (6,23)), ((10, 14), (24, 250))]
  293.   def getRegionBucketBoundaries(rowIndexBoundaries: (Int, Int), histogram: List[((Int, Int), (Long, Long))]): List[((Int, Int), (Long, Long))] = {
  294.    
  295.     val minIndex = rowIndexBoundaries._1
  296.     val maxIndex = rowIndexBoundaries._2
  297.     var retVal = List[((Int, Int), (Long, Long))]()
  298.    
  299.     // find the bucket for which indexBoundaries are in it
  300.     var ind: Int = minIndex
  301.     while (ind <= maxIndex){
  302.       val resultBucket = histogram.filter({ case ((k1, k2), (v1, v2)) =>
  303.         (ind>=v1 && ind<=v2)
  304.       })
  305.      
  306.       val key = resultBucket(0)._1 // suppose that the index fits in just one histogram bucket
  307.       var rowOffset = resultBucket(0)._2._2 - ind // find the number of following rows that are in the same histogram bucket
  308.      
  309.       if((ind+rowOffset) > maxIndex){
  310.         rowOffset = maxIndex-ind // prevent to go out of the region boundaries
  311.       }
  312.       retVal = retVal:+((key),(ind.toLong, (ind+rowOffset).toLong))
  313.       ind = ind + rowOffset.toInt + 1;
  314.     }
  315.     retVal
  316.   }
  317.  
  318.  
  319.   def bucketsSatisfyCondition(bucket1: (Int, Int), bucket2: (Int, Int), op: String) : Boolean = {
  320.      val bucketR = bucket1
  321.      val bucketS = bucket2
  322.    
  323.      op match {
  324.        // be careful, the bucket (v1,v2) does not include values for second boundary v2!
  325.       case "=" => ((bucket1._1>=bucket2._1 && bucket1._1<bucket2._2) || (bucket1._2>bucket2._1 && bucket1._2<bucket2._2)) // bucket1 has at least one value in range of bucket2
  326.       case "<" =>  bucketR._1 < bucketS._2 // enough that the smallest R value is smaller than the biggest S value
  327.       case "<=" => bucketR._1 <= bucketS._2
  328.       case ">" => (bucketR._2-1) > bucketS._1 // enough that the biggest value of R is bigger than the smallest S value
  329.       case ">=" => (bucketR._2-1) >= bucketS._1
  330.     }
  331.   }
  332.  
  333.  
  334.   def test_helper_functions(r_histogram: List[((Int, Int), (Long, Long))], s_histogram: List[((Int, Int), (Long, Long))]) {
  335.     // println("For the smallest datasets!!!")
  336.     getRegionBucketBoundaries((0, 149), r_histogram)//.foreach(println)
  337.     // println("S buckets for (383, 415) index")
  338.     getRegionBucketBoundaries((383, 415), s_histogram)//.foreach(println)
  339.      
  340.     // get region costs - test
  341.     var cost = getRegionCost((0, 136), (0,150), r_histogram, s_histogram, "=")
  342.     //println("Cost = "+cost) //should be 10688
  343.     assert(cost==10688)
  344.     cost = getRegionCost((0, 136), (0,150), r_histogram, s_histogram, "<")
  345.     //println("Cost < "+cost)
  346.     assert(cost==16860)  
  347.     cost = getRegionCost((0, 136), (0,150), r_histogram, s_histogram, ">")
  348.     //println("Cost > "+cost)
  349.     assert(cost==3827)
  350.    
  351.   }
  352.  
  353.   def divisorGenerator(n: Int): List[Int] = {
  354.     var results = List[Int]()
  355.     for (i <- 1 to (n/2+1)) {
  356.         if (n%i == 0) {
  357.           results = results:+(i)
  358.         }
  359.      }
  360.     results
  361.   }
  362.    
  363.    def local_thetajoin(dat1:List[(Int)], dat2:List[(Int)], op:String) : List[(Int, Int)] = {
  364.     var res = List[(Int, Int)]()
  365.     for (d1 <- dat1){
  366.       for(d2<-dat2){
  367.         if(checkCondition(d1, d2, op)){
  368.           res = res :+ (d1, d2)
  369.         }
  370.       }
  371.     }  
  372.     res //.iterator
  373.   }  
  374.  
  375.   /*
  376.    * this method takes as input two lists of values that belong to the same partition
  377.    * and performs the theta join on them. Both datasets are lists of tuples (Int, Int)
  378.    * where ._1 is the partition number and ._2 is the value.
  379.    * Of course you might change this function (both definition and body) if it does not
  380.    * fit your needs :)
  381.    * */  
  382.   def local_thetajoin_2(dat1:Iterator[(Int, Int)], dat2:Iterator[(Int, Int)], op:String) : Iterator[(Int, Int)] = {
  383.     var res = List[(Int, Int)]()
  384.     var dat2List = dat2.toList
  385.        
  386.     while(dat1.hasNext) {
  387.       val row1 = dat1.next()      
  388.       for(row2 <- dat2List) {
  389.         if(checkCondition(row1._2, row2._2, op)) {
  390.           res = res :+ (row1._2, row2._2)
  391.         }        
  392.       }      
  393.     }    
  394.     res.iterator
  395.   }  
  396.  
  397.   def checkCondition(value1: Int, value2: Int, op:String): Boolean = {
  398.     op match {
  399.       case "=" => value1 == value2
  400.       case "<" => value1 < value2
  401.       case "<=" => value1 <= value2
  402.       case ">" => value1 > value2
  403.       case ">=" => value1 >= value2
  404.     }
  405.   }    
  406. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement