Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.css.java.gnipStreaming;
- import java.util.Iterator;
- import java.util.Properties;
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.function.Function;
- import org.apache.spark.api.java.function.VoidFunction;
- import org.apache.spark.streaming.Durations;
- import org.apache.spark.streaming.api.java.JavaDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
- public class GnipSparkStreamer {
- // args will contain the following
- // 1- stream type
- // 2- kafka hosts
- // 3- kafka topic
- // 4- period in seconds
- @SuppressWarnings("deprecation")
- public static void main(String[] args) {
- if(args.length < 5){
- System.out.println("Check the parameter list. The application takes exactly four parameters (stream type, kafka hosts, kafka topic, time period in seconds)");
- }else{
- String streamType = args[0];
- String hosts = args[1];
- String topic = args[2];
- int seconds = Integer.parseInt(args[3]);
- String maxNumberOfCores = args[4];
- SparkConf conf = new SparkConf().setAppName("GnipSparkStreamer["
- + streamType + "]");
- conf.set("spark.cores.max", maxNumberOfCores);
- JavaStreamingContext streamingContext = new JavaStreamingContext(conf,
- Durations.seconds(seconds));
- JavaDStream<String> tweets = streamingContext
- .receiverStream(new GnipReceiver(getStreamUrl(streamType),
- "USER_NAME", "PASSWORD"));
- tweets.foreachRDD(new Function<JavaRDD<String>, Void>() {
- @Override
- public Void call(JavaRDD<String> rdd) throws Exception {
- rdd.foreachPartition(new VoidFunction<Iterator<String>>() {
- @Override
- public void call(Iterator<String> itr) throws Exception {
- Producer<String, String> producer = getProducer(hosts);
- while(itr.hasNext()){
- try{
- KeyedMessage<String, String> message = new KeyedMessage<String, String>(topic, itr.next());
- producer.send(message);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- });
- return null;
- }
- });
- JavaDStream<Long> count = tweets.count();
- count.print();
- streamingContext.start();
- streamingContext.awaitTermination();
- }
- }
- public static Producer<String, String> getProducer(String hosts) {
- String brokers = hosts.replaceAll(",", ":9092,") + ":9092";
- Properties props = new Properties();
- props.put("metadata.broker.list", brokers);
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("request.required.acks", "1");
- ProducerConfig config = new ProducerConfig(props);
- Producer<String, String> producer = new Producer<String, String>(config);
- return producer;
- }
- public static String getStreamUrl(String streamType) {
- return "https://stream.gnip.com:443/accounts/Account_name/publishers/twitter/streams/track/prod.json";
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement