Guest User

Untitled

a guest
May 4th, 2016
52
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.07 KB | None | 0 0
  1. import spark.streaming.StreamingContext._
  2. import spark.streaming.{Seconds, StreamingContext}
  3. import spark.SparkContext._
  4. import spark.storage.StorageLevel
  5. import spark.streaming.examples.twitter.TwitterInputDStream
  6. import com.twitter.algebird.HyperLogLog._
  7. import com.twitter.algebird._
  8.  
  9. /**
  10. * Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
  11. * TwitterInputDStream
  12. */
  13. object StreamingHLL {
  14. def main(args: Array[String]) {
  15. if (args.length < 3) {
  16. System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" +
  17. " [filter1] [filter2] ... [filter n]")
  18. System.exit(1)
  19. }
  20.  
  21. val Array(master, username, password) = args.slice(0, 3)
  22. val filters = args.slice(3, args.length)
  23.  
  24. val ssc = new StreamingContext(master, "TwitterStreamingHLL", Seconds(2))
  25. val stream = new TwitterInputDStream(ssc, username, password, filters,
  26. StorageLevel.MEMORY_ONLY_SER)
  27. ssc.registerInputStream(stream)
  28.  
  29. val users = stream.map(status => status.getUser.getId)
  30.  
  31. val globalHll = new HyperLogLogMonoid(12)
  32. var userSet: Set[Long] = Set()
  33.  
  34. val approxUsers = users.mapPartitions(ids => {
  35. val hll = new HyperLogLogMonoid(12)
  36. ids.map(id => hll(id))
  37. }).reduce(_ + _)
  38.  
  39. val exactUsers = users.map(id => Set(id)).reduce(_ ++ _)
  40.  
  41. var h = globalHll.zero
  42. approxUsers.foreach(rdd => {
  43. if (rdd.count() != 0) {
  44. val partial = rdd.first()
  45. h += partial
  46. println("Approx distinct users this batch: %d".format(partial.estimatedSize.toInt))
  47. println("Approx distinct users overall: %d".format(globalHll.estimateSize(h).toInt))
  48. }
  49. })
  50.  
  51. exactUsers.foreach(rdd => {
  52. if (rdd.count() != 0) {
  53. val partial = rdd.first()
  54. userSet ++= partial
  55. println("Exact distinct users this batch: %d".format(partial.size))
  56. println("Exact distinct users overall: %d".format(userSet.size))
  57. println("Error rate: %2.5f%%".format(((globalHll.estimateSize(h) / userSet.size.toDouble) - 1) * 100))
  58. }
  59. })
  60.  
  61. ssc.start()
  62. }
  63. }
Add Comment
Please, Sign In to add comment