Guest User

Untitled

a guest
Jul 19th, 2018
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.97 KB | None | 0 0
  1. public class test {
  2.  
  3. public static void main(final String[] args) {
  4.  
  5. final String checkPointDir="/test/chkDir";
  6.  
  7. Function0<JavaStreamingContext> createContextFun=new Function0<JavaStreamingContext>() {
  8. private static final long serialVersionUID = 1L;
  9. @Override
  10. public JavaStreamingContext call() throws Exception {
  11.  
  12. return createContext(checkPointDir,args);
  13. }
  14. };
  15.  
  16. JavaStreamingContext ssc =JavaStreamingContext.getOrCreate(checkPointDir, createContextFun) ;
  17. ssc.start();
  18. ssc.awaitTermination();
  19.  
  20. }
  21.  
  22.  
  23. protected static JavaStreamingContext createContext(String checkPointDir,String[] vals) {
  24. // initialize spark configuration
  25. SparkConf sparkConfiguration = new SparkConf().set("spark.streaming.receiver.writeAheadLog.enable", "true");
  26.  
  27. JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
  28. JavaStreamingContext streamingSparkContext = new JavaStreamingContext(sparkContext,new Duration(batchInterval));
  29.  
  30.  
  31. final HiveContext hiveContext = new HiveContext(sparkContext);
  32.  
  33. // Pulling data from Kafka creating a Stream
  34. JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(streamingSparkContext,
  35. String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
  36.  
  37. directKafkaStream.foreachRDD(---);
  38.  
  39. // Applying Business transformation on stream data and joining for enriching
  40. JavaDStream<SMessage> enriched = directKafkaStream.map(--);
  41.  
  42.  
  43. JavaPairDStream<String, SMessage> pairSwitchMsg = enriched.mapToPair(--);
  44.  
  45. Function3<String, Optional<SMessage>, State<dingState>, Tuple2<String, dingState>> mappingFunc---
  46.  
  47.  
  48.  
  49. JavaMapWithStateDStream<String, SMessage, dingState, Tuple2<String, dingState>> sWithState = pairSwitchMsg.mapWithState(StateSpec.function(mappingFunc).initialState(crnStateMap));
  50.  
  51.  
  52. JavaPairRDD<String, dingState> test = sWithState.stateSnapshots().compute(new Time(2*batchInterval*1000));
  53. test.saveAsObjectFile("/test/snap/");
  54.  
  55.  
  56. streamingSparkContext.checkpoint(checkPointDir);
  57. return streamingSparkContext;
  58.  
  59. }
Add Comment
Please, Sign In to add comment