aiThanet

Untitled

Feb 9th, 2021
891
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package com.sundogsoftware.spark
  2.  
  3. import org.apache.spark.streaming._
  4. import org.apache.spark.streaming.twitter._
  5.  
  6. /** Listens to a stream of Tweets and keeps track of the most popular
  7.  *  hashtags over a 5 minute window.
  8.  */
  9. object PopularHashtags {
  10.  
  11.     /** Makes sure only ERROR messages get logged to avoid log spam. */
  12.   def setupLogging(): Unit = {
  13.     import org.apache.log4j.{Level, Logger}  
  14.     val rootLogger = Logger.getRootLogger
  15.     rootLogger.setLevel(Level.ERROR)  
  16.   }
  17.  
  18.   /** Configures Twitter service credentials using twitter.txt in the main workspace directory */
  19.   def setupTwitter(): Unit = {
  20.     import scala.io.Source
  21.  
  22.     val lines = Source.fromFile("data/twitter.txt")
  23.     for (line <- lines.getLines) {
  24.       val fields = line.split(" ")
  25.       if (fields.length == 2) {
  26.         System.setProperty("twitter4j.oauth." + fields(0), fields(1))
  27.       }
  28.     }
  29.     lines.close()
  30.   }
  31.  
  32.   /** Our main function where the action happens */
  33.   def main(args: Array[String]) {
  34.  
  35.     // Configure Twitter credentials using twitter.txt
  36.     setupTwitter()
  37.    
  38.     // Set up a Spark streaming context named "PopularHashtags" that runs locally using
  39.     // all CPU cores and one-second batches of data
  40.     val ssc = new StreamingContext("local[*]", "PopularHashtags", Seconds(1))
  41.    
  42.     // Get rid of log spam (should be called after the context is set up)
  43.     setupLogging()
  44.  
  45.     // Create a DStream from Twitter using our streaming context
  46.     val tweets = TwitterUtils.createStream(ssc, None)
  47.    
  48.     // Now extract the text of each status update into DStreams using map()
  49.     val statuses = tweets.map(status => status.getText)
  50.    
  51.     // Blow out each word into a new DStream
  52.     val tweetwords = statuses.flatMap(tweetText => tweetText.split(" "))
  53.    
  54.     // Now eliminate anything that's not a hashtag
  55.     val hashtags = tweetwords.filter(word => word.startsWith("#"))
  56.    
  57.     // Map each hashtag to a key/value pair of (hashtag, 1) so we can count them up by adding up the values
  58.     val hashtagKeyValues = hashtags.map(hashtag => (hashtag, 1))
  59.    
  60.     // Now count them up over a 5 minute window sliding every one second
  61.     val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( (x,y) => x + y, (x,y) => x - y, Seconds(300), Seconds(1))
  62.     //  You will often see this written in the following shorthand:
  63.     //val hashtagCounts = hashtagKeyValues.reduceByKeyAndWindow( _ + _, _ -_, Seconds(300), Seconds(1))
  64.    
  65.     // Sort the results by the count values
  66.     val sortedResults = hashtagCounts.transform(rdd => rdd.sortBy(x => x._2, ascending = false))
  67.    
  68.     // Print the top 10
  69.     sortedResults.print
  70.    
  71.     // Set a checkpoint directory, and kick it all off
  72.     // I could watch this all day!
  73.     ssc.checkpoint("C:/checkpoint/")
  74.     ssc.start()
  75.     ssc.awaitTermination()
  76.   }  
  77. }
  78.  
RAW Paste Data

Adblocker detected! Please consider disabling it...

We've detected AdBlock Plus or some other adblocking software preventing Pastebin.com from fully loading.

We don't have any obnoxious sound, or popup ads, we actively block these annoying types of ads!

Please add Pastebin.com to your ad blocker whitelist or disable your adblocking software.

×