Advertisement
Guest User

Aggregation

a guest
May 11th, 2019
104
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.29 KB | None | 0 0
  1. def main(args: Array[String]): Unit ={
  2.  
  3.     val conf = new SparkConf().setAppName("Simple Application").setMaster("local")
  4.     val sc = new SparkContext(conf)
  5.  
  6. /*
  7.  
  8.     val source = sc.textFile("szekspir")
  9.     val counts = source.map(line => line.split(" ").size)
  10.         .reduce((a,b) => a+b)
  11.  
  12.     println(counts)
  13.  
  14. */
  15.     /*val m =1000
  16.     val random = Random
  17.  
  18.     val groups = Map((for (i <- 0 to m) yield {
  19.       val mu = random.nextDouble()*10-5
  20.       val std = random.nextDouble()*10-5
  21.       (i->(mu,std))}) : _*)
  22.  
  23.     val n=1000000
  24.  
  25.     val data = for (i <- 1 to n) yield
  26.       {val g = random.nextInt(m)
  27.       ;
  28.         val()
  29.  
  30.  
  31.  
  32.       }*/
  33.  
  34.    /* val groups = Map(
  35.       0 -> (0, 1),
  36.       1 -> (1, 1),
  37.       2 -> (2, 2),
  38.       3 -> (3, 3),
  39.       4 -> (4, 3),
  40.       5 -> (5, 2),
  41.       6 -> (6, 1)
  42.     )
  43.     val n = 1000000
  44.     val random = Random
  45.     val data = sc.parallelize(
  46.       for (i <- 1 to n) yield {
  47.         val g = random.nextInt(7)
  48.         val (mu,sigma) = groups(g)
  49.  
  50.         (g, mu + sigma * random.nextGaussian())
  51.       }
  52.     )
  53.  
  54.     println("------------------ DATA : ----------------")
  55.     data.take(20).foreach(println)
  56.     println("---------------- END OF DATA ---------------")
  57.     val records = (data
  58.       .map(t => (t._1, (1, t._2, t._2 * t._2)))
  59.       .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2, a._3 + b._3))
  60.       )
  61.  
  62.     records.cache()
  63.  
  64.     records.take(10).foreach(println)
  65.     // Aggregate in format (Group ID, (Sum count this group in all records, Average value, Variance))
  66.     val groups_aggregated_stats = records.map(r => (r._1, (r._2._1, r._2._2 / r._2._1, ((r._2._3 - ((r._2._2 * r._2._2) / r._2._1)) / r._2._1))))
  67.     // Print in new line each group stats
  68.     groups_aggregated_stats.collect().foreach(println)
  69.  
  70.     // Aggregate results in all groups to single group "all records"
  71.     val all_records_stats = records.reduce((a, b) => (0, (a._2._1 + b._2._1, a._2._2 + b._2._2, a._2._3 + b._2._3)))
  72.     val results = (all_records_stats._2._1, all_records_stats._2._2 / all_records_stats._2._1, ((all_records_stats._2._3 - ((all_records_stats._2._2 * all_records_stats._2._2) / all_records_stats._2._1)) / all_records_stats._2._1))
  73.  
  74.     // Return result as (Total count, Total average, Total variance)
  75.     println(results)*/
  76.   }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement