Advertisement
aiThanet

Untitled

Feb 9th, 2021
2,600
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.84 KB | None | 0 0
  1. package com.sundogsoftware.spark
  2.  
  3. import org.apache.spark.streaming._
  4. import org.apache.spark.streaming.twitter._
  5.  
  6. /** Listens to a stream of Tweets and keeps track of the most popular
  7.  *  hashtags over a 5 minute window.
  8.  */
  9. object PopularHashtags {
  10.  
  11.     /** Makes sure only ERROR messages get logged to avoid log spam. */
  12.   def setupLogging(): Unit = {
  13.     import org.apache.log4j.{Level, Logger}  
  14.     val rootLogger = Logger.getRootLogger
  15.     rootLogger.setLevel(Level.ERROR)  
  16.   }
  17.  
  18.   /** Configures Twitter service credentials using twitter.txt in the main workspace directory */
  19.   def setupTwitter(): Unit = {
  20.     import scala.io.Source
  21.  
  22.     val lines = Source.fromFile("data/twitter.txt")
  23.     for (line <- lines.getLines) {
  24.       val fields = line.split(" ")
  25.       if (fields.length == 2) {
  26.         System.setProperty("twitter4j.oauth." + fields(0), fields(1))
  27.       }
  28.     }
  29.     lines.close()
  30.   }
  31.  
  32.   /** Our main function where the action happens */
  33.   def main(args: Array[String]) {
  34.  
  35.     // Configure Twitter credentials using twitter.txt
  36.     setupTwitter()
  37.    
  38.     // Set up a Spark streaming context named "PopularHashtags" that runs locally using
  39.     // all CPU cores and one-second batches of data
  40.     val ssc = new StreamingContext("local[*]", "PopularHashtags", Seconds(1))
  41.    
  42.     // Get rid of log spam (should be called after the context is set up)
  43.     setupLogging()
  44.  
  45.     // Create a DStream from Twitter using our streaming context
  46.     val tweets = TwitterUtils.createStream(ssc, None)
  47.    
  48.     // Now extract the text of each status update into DStreams using map()
  49.     val statuses = tweets.map(status => status.getText)
  50.    
  51.     // Blow out each word into a new DStream
  52.     val tweetwords = statuses.flatMap(tweetText => tweetText.split(" "))
  53.    
  54.     // Now eliminate anything that's not a hashtag
  55.     val hashtags = tweetwords.filter(word => word.startsWith("#"))
  56.    
  57.     // Map each hashtag to a key/value pair of (hashtag, 1) so we can count them up by adding up the values
  58.     val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1))
  59.    
  60.     // Now count them up over a 5 minute window sliding every one second
  61.     val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( (x,y) => x + y, (x,y) => x - y, Seconds(300), Seconds(1))
  62.     //  You will often see this written in the following shorthand:
  63.     //val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( _ + _, _ -_, Seconds(300), Seconds(1))
  64.    
  65.     // Sort the results by the count values
  66.     val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => x._2, ascending = false))
  67.    
  68.     // Print the top 10
  69.     sortedResults.print
  70.    
  71.     // Set a checkpoint directory, and kick it all off
  72.     // I could watch this all day!
  73.     ssc.checkpoint("C:/checkpoint/")
  74.     ssc.start()
  75.     ssc.awaitTermination()
  76.   }  
  77. }
  78.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement