Advertisement
Guest User

Untitled

a guest
Dec 22nd, 2014
184
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.95 KB | None | 0 0
  1. import org.apache.spark.api.java.*;
  2. import org.apache.spark.api.java.function.*;
  3. import org.apache.spark.streaming.*;
  4. import org.apache.spark.SparkConf;
  5. import org.apache.spark.streaming.api.java.*;
  6. import org.apache.spark.streaming.twitter.*;
  7. import twitter4j.*;
  8. import org.apache.spark.streaming.kafka.*;
  9. import java.util.Arrays;
  10. import java.util.*;
  11. import scala.Tuple2;
  12. import java.util.Map;
  13. import java.util.HashMap;
  14. import java.util.regex.Pattern;
  15. import com.google.common.collect.Lists;
  16.  
  17. public class TwitterStreaming {
  18.  
  19. private static final Pattern SPACE = Pattern.compile(" ");
  20.  
  21. public static void main(String[] args) throws Exception {
  22. // Location of the Spark directory
  23. String sparkHome = "/usr/local/spark";
  24. // URL of the Spark cluster
  25. String sparkUrl = "local[4]";
  26. // Location of the required JAR files
  27. //String jarFile = "../streaming-1.0.jar";
  28.  
  29. SparkConf conf = new SparkConf().setMaster(sparkUrl).setAppName("Streaming");
  30. JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
  31. HashMap<String, Integer> topics = new HashMap();
  32. topics.put("tweets", 1);
  33. JavaPairReceiverInputDStream messages = KafkaUtils.createStream(ssc, "localhost:2181", "1", topics);
  34.  
  35.  
  36. JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
  37. @Override
  38. public String call(Tuple2<String, String> tuple2) {
  39. return tuple2._2();
  40. }
  41. });
  42.  
  43. JavaDStream<String> words = lines.flatMap(
  44. new FlatMapFunction<String, String>() {
  45. public Iterable<String> call(String in) {
  46. return Arrays.asList(in.split(" "));
  47. }
  48. }
  49. );
  50.  
  51. // Seperate the hashtags
  52. JavaDStream<String> hashTags = words.filter(
  53. new Function<String, Boolean>() {
  54. public Boolean call(String word) { return word.startsWith("#"); }
  55. }
  56. );
  57.  
  58. // Map hashtags to their respective counts
  59. JavaPairDStream<String, Integer> tuples = hashTags.mapToPair(
  60. new PairFunction<String, String, Integer>() {
  61. public Tuple2<String, Integer> call(String in) {
  62. return new Tuple2<String, Integer>(in, 1);
  63. }
  64. }
  65. );
  66.  
  67. // Reduce hashtag maps to aggregate their counts in a sliding window fashion
  68. // reduceByKeyAndWindow counts hashtags in a 5-minute window that shifts every second
  69. JavaPairDStream<String, Integer> counts = tuples.reduceByKeyAndWindow(
  70. new Function2<Integer, Integer, Integer>() {
  71. public Integer call(Integer i1, Integer i2) { return i1 + i2; }
  72. },
  73. new Function2<Integer, Integer, Integer>() {
  74. public Integer call(Integer i1, Integer i2) { return i1 - i2; }
  75. },
  76. new Duration(60 * 10 * 1000),
  77. new Duration(60 * 1000)
  78. );
  79.  
  80. //Swap the key-value pairs for the counts (in order to sort hashtags by their counts)
  81. JavaPairDStream<Integer, String> swappedCounts = counts.mapToPair(
  82. new PairFunction<Tuple2<String, Integer>, Integer, String>() {
  83. public Tuple2<Integer, String> call(Tuple2<String, Integer> in) {
  84. return in.swap();
  85. }
  86. }
  87. );
  88.  
  89. //Sort swapped map from highest to lowest
  90. JavaPairDStream<Integer, String> sortedCounts = swappedCounts.transformToPair(
  91. new Function<JavaPairRDD<Integer, String>, JavaPairRDD<Integer, String>>() {
  92. public JavaPairRDD<Integer, String> call(JavaPairRDD<Integer, String> in) throws Exception {
  93. return in.sortByKey(false);
  94. }
  95. });
  96.  
  97. //Print top 10 hashtags
  98. sortedCounts.foreach(
  99. new Function<JavaPairRDD<Integer, String>, Void> () {
  100. public Void call(JavaPairRDD<Integer, String> rdd) {
  101. String out = "\nTop 10 hashtags:\n";
  102. for (Tuple2<Integer, String> t: rdd.take(10)) {
  103. out = out + t.toString() + "\n";
  104. }
  105. System.out.println(out);
  106. return null;
  107. }
  108. }
  109. );
  110. ssc.checkpoint("checkpoints");
  111. ssc.start();
  112. }
  113. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement