Advertisement
Guest User

Gnip Streamer

a guest
Apr 24th, 2016
124
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.07 KB | None | 0 0
  1. package org.css.java.gnipStreaming;
  2.  
  3. import java.util.Iterator;
  4. import java.util.Properties;
  5.  
  6. import kafka.javaapi.producer.Producer;
  7. import kafka.producer.KeyedMessage;
  8. import kafka.producer.ProducerConfig;
  9.  
  10. import org.apache.spark.SparkConf;
  11. import org.apache.spark.api.java.JavaRDD;
  12. import org.apache.spark.api.java.function.Function;
  13. import org.apache.spark.api.java.function.VoidFunction;
  14. import org.apache.spark.streaming.Durations;
  15. import org.apache.spark.streaming.api.java.JavaDStream;
  16. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  17.  
  18. public class GnipSparkStreamer {
  19.  
  20.     // args will contain the following
  21.     // 1- stream type
  22.     // 2- kafka hosts
  23.     // 3- kafka topic
  24.     // 4- period in seconds
  25.     @SuppressWarnings("deprecation")
  26.     public static void main(String[] args) {
  27.         if(args.length < 5){
  28.             System.out.println("Check the parameter list. The application takes exactly four parameters (stream type, kafka hosts, kafka topic, time period in seconds)");
  29.         }else{
  30.             String streamType = args[0];
  31.             String hosts = args[1];
  32.             String topic = args[2];
  33.             int seconds = Integer.parseInt(args[3]);
  34.             String maxNumberOfCores = args[4];
  35.            
  36.             SparkConf conf = new SparkConf().setAppName("GnipSparkStreamer["
  37.                     + streamType + "]");
  38.            
  39.             conf.set("spark.cores.max", maxNumberOfCores);
  40.            
  41.             JavaStreamingContext streamingContext = new JavaStreamingContext(conf,
  42.                     Durations.seconds(seconds));
  43.            
  44.             JavaDStream<String> tweets = streamingContext
  45.                     .receiverStream(new GnipReceiver(getStreamUrl(streamType),
  46.                             "USER_NAME", "PASSWORD"));
  47.            
  48.             tweets.foreachRDD(new Function<JavaRDD<String>, Void>() {
  49.                 @Override
  50.                 public Void call(JavaRDD<String> rdd) throws Exception {
  51.                     rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
  52.                        
  53.                         @Override
  54.                         public void call(Iterator<String> itr) throws Exception {
  55.                             Producer<String, String> producer = getProducer(hosts);
  56.                             while(itr.hasNext()){
  57.                                 try{
  58.                                     KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic, itr.next());
  59.                                     producer.send(message);
  60.                                 } catch (Exception e) {
  61.                                     e.printStackTrace();
  62.                                 }
  63.                             }
  64.                         }
  65.                     });
  66.                     return null;
  67.                 }
  68.             });
  69.            
  70.             JavaDStream<Long> count = tweets.count();
  71.             count.print();
  72.            
  73.             streamingContext.start();
  74.            
  75.             streamingContext.awaitTermination();
  76.         }
  77.        
  78.     }
  79.  
  80.     public static Producer<String, String> getProducer(String hosts) {
  81.        
  82.         String brokers = hosts.replaceAll(",", ":9092,") + ":9092";
  83.        
  84.         Properties props = new Properties();
  85.  
  86.         props.put("metadata.broker.list", brokers);
  87.  
  88.         props.put("serializer.class", "kafka.serializer.StringEncoder");
  89.  
  90.         props.put("request.required.acks", "1");
  91.  
  92.         ProducerConfig config = new ProducerConfig(props);
  93.  
  94.         Producer<String, String> producer = new Producer<String, String>(config);
  95.  
  96.         return producer;
  97.     }
  98.    
  99.     public static String getStreamUrl(String streamType) {
  100.        
  101.         return "https://stream.gnip.com:443/accounts/Account_name/publishers/twitter/streams/track/prod.json";
  102.     }
  103. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement