Guest User

snippet

a guest
Sep 27th, 2016
52
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.57 KB | None | 0 0
  1. /**
  2.  * Created by michael on 9/15/16.
  3.  * self contained reproduction of the aggregateByKey MatchError
  4.  */
  5.  
  6. import org.apache.spark.{SparkContext, SparkConf}
  7. import scala.collection.immutable.HashMap
  8. import scala.util.Random
  9. import java.util.UUID.randomUUID
  10.  
  11. object mainBugTest {
  12.   def main(args: Array[String]): Unit = {
  13.  
  14.     // Spark settings
  15.     val sparkConf = new SparkConf()
  16.     sparkConf.setAppName("mainBugTest")
  17.     sparkConf.setMaster("local[*]")
  18.     sparkConf.set("spark.io.compression.codec", "lzf")
  19.     sparkConf.set("spark.speculation", "true")
  20.     sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  21.     sparkConf.set("spark.kryo.referenceTracking", "false")
  22.     sparkConf.set("spark.kryoserializer.buffer", "64k")
  23.     sparkConf.set("spark.kryoserializer.buffer.max", "1g")
  24.     sparkConf.registerKryoClasses(Array(classOf[HashMap[String, Double]]))
  25.     val sc = new SparkContext(sparkConf)
  26.  
  27.     // Other settings
  28.     val tagCount = 200
  29.     val tagPerEntity = 10
  30.     val entityCount = 1000
  31.     val entityRepeat = 200
  32.  
  33.     // Methods
  34.     def updateWeight(m: HashMap[String, Double], entry: (String, Double)): HashMap[String, Double] = {
  35.       val v = m.getOrElse(entry._1, 0.0)
  36.       m.updated(entry._1, v + entry._2)
  37.     }
  38.  
  39.     def mergeMaps(m1: HashMap[String, Double], m2: HashMap[String, Double]) =
  40.       m1.merged(m2) { case ((k, v1), (_, v2)) => (k, v1 + v2) }
  41.  
  42.  
  43.     // Data creation
  44.     val tags = (1 to tagCount).map(v => randomUUID().toString)
  45.     val entityIds = (1 to entityCount).map(v => randomUUID().toString.toUpperCase)
  46.     val data = Random.shuffle(entityIds.flatMap(g => (1 to entityRepeat).map(v => g).map(a => a -> Random.shuffle(tags).take(tagPerEntity))))
  47.     val rdd = sc.parallelize(data)
  48.  
  49.     println("# of rdd partitions: " + rdd.partitions.size)
  50.     rdd.mapPartitions(iter => Array(iter.size).iterator, true) foreach println
  51.  
  52.  
  53.     // Data manipulation
  54.     val newRdd = rdd.flatMap { case (id, tag) => tag.toSet[String].subsets(2).map(v => id ->(v.mkString(""), 1.0))}.repartition(3)
  55.     // the repartition is needed only to equate the size of each partition not to trigger out of memory exceptions
  56.  
  57.     println("# of newRdd partitions: " + newRdd.partitions.size)
  58.     newRdd.mapPartitions(iter => Array(iter.size).iterator, true) foreach println
  59.  
  60.     newRdd.take(10) foreach println
  61.  
  62.     val result = newRdd.aggregateByKey(new HashMap[String, Double]())(
  63.     { case (m, e) => updateWeight(m, e) },
  64.     { case (m1, m2) => mergeMaps(m1, m2) }
  65.     )
  66.  
  67.     result.take(10) foreach println
  68.  
  69.   }
  70.  
  71. }
Add Comment
Please, Sign In to add comment