Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class test {
- public static void main(final String[] args) {
- final String checkPointDir="/test/chkDir";
- Function0<JavaStreamingContext> createContextFun=new Function0<JavaStreamingContext>() {
- private static final long serialVersionUID = 1L;
- @Override
- public JavaStreamingContext call() throws Exception {
- return createContext(checkPointDir,args);
- }
- };
- JavaStreamingContext ssc =JavaStreamingContext.getOrCreate(checkPointDir, createContextFun) ;
- ssc.start();
- ssc.awaitTermination();
- }
- protected static JavaStreamingContext createContext(String checkPointDir,String[] vals) {
- // initialize spark configuration
- SparkConf sparkConfiguration = new SparkConf().set("spark.streaming.receiver.writeAheadLog.enable", "true");
- JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
- JavaStreamingContext streamingSparkContext = new JavaStreamingContext(sparkContext,new Duration(batchInterval));
- final HiveContext hiveContext = new HiveContext(sparkContext);
- // Pulling data from Kafka creating a Stream
- JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(streamingSparkContext,
- String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
- directKafkaStream.foreachRDD(---);
- // Applying Business transformation on stream data and joining for enriching
- JavaDStream<SMessage> enriched = directKafkaStream.map(--);
- JavaPairDStream<String, SMessage> pairSwitchMsg = enriched.mapToPair(--);
- Function3<String, Optional<SMessage>, State<dingState>, Tuple2<String, dingState>> mappingFunc---
- JavaMapWithStateDStream<String, SMessage, dingState, Tuple2<String, dingState>> sWithState = pairSwitchMsg.mapWithState(StateSpec.function(mappingFunc).initialState(crnStateMap));
- JavaPairRDD<String, dingState> test = sWithState.stateSnapshots().compute(new Time(2*batchInterval*1000));
- test.saveAsObjectFile("/test/snap/");
- streamingSparkContext.checkpoint(checkPointDir);
- return streamingSparkContext;
- }
Add Comment
Please, Sign In to add comment