Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public static void main(String[] args) {
- conf = new SparkConf().setAppName("Inventory-Streaming");
- // .setMaster("spark://192.168.56.1:7077");
- sc = new JavaSparkContext(conf);
- ssc = new JavaStreamingContext(sc, new Duration(1000));
- kafkaParams.put("metadata.broker.list",
- "ip-1us-eastompute.internal:9092");
- Set<String> topics = Collections.singleton("inventory");
- JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class,
- String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
- JavaDStream<String> invStream = directKafkaStream.map(rdd -> {
- logger.info("Input JSON String ------->" + rdd._2);
- System.out.println("Input JSON String ------->" + rdd._2);
- return rdd._2;
- });
- invStream.foreachRDD(invrdd -> {
- if (!invrdd.isEmpty()) {
- SparkSession spark = JavaSparkSessionSingleton.getInstance(invrdd.context().getConf());
- Dataset<Row> invDataset = spark.read().option("mode", "PERMISSIVE").json(invrdd);
- /*
- * Created temporary view for the inventory json.
- */
- invDataset.createOrReplaceTempView("inventory");
- invDataset.printSchema();
- /*
- * writing the keys (PartyID,SourceNeID,SourcePartyID) to the hive table.
- */
- Dataset<Row> reducedInventory = spark.sql("select partyId,sourceNeId,sourcePartyId from inventory")
- .withColumn("inv_devices_documentid",
- concat(col("partyId"), lit("_"), col("sourceNeId"), lit("_"), col("sourcePartyId")));
- logger.info("Inserting data into the Hive table inv_devices_incr");
- reducedInventory.write().mode(SaveMode.Append).insertInto("cdx_network.inv_devices_incr");
- reducedInventory.toJavaRDD().foreach(row -> {
- /*
- * Writing the dataset to the HDFS formulated path.
- */
- StringBuilder pathBuilder = new StringBuilder();
- pathBuilder.append(row.getString(0)).append("_").append(row.getString(1)).append("_")
- .append(row.getString(2));
- logger.info("Creating HDFS directory with the path /user/creando/cdx/inv_devices/" + pathBuilder.toString());
- System.out.println("Creating HDFS directory with the path " + "/user/creando/cdx/inv_devices/"+pathBuilder.toString());
- invDataset.write().mode(SaveMode.Append).json("/user/creando/cdx/inv_devices/"+pathBuilder.toString());
- });
- }
- });
- ssc.start();
- try
- {
- ssc.awaitTermination();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- ssc.stop();
- }
- }
- 2018-04-20 10:21:33,020 INFO [Executor task launch worker for task 95] InventoryStreamingJob.InventoryStreaming: Creating HDFS directory with the path /user
- /creando/cdx/inv_devices/700001#596970dba94c040001381a71#700001
- 2018-04-20 10:21:33,023 ERROR [Executor task launch worker for task 95] executor.Executor: Exception in task 0.2 in stage 93.0 (TID 95)
- java.lang.NullPointerException
- at org.apache.spark.sql.Dataset.isStreaming(Dataset.scala:485)
- at org.apache.spark.sql.Dataset.write(Dataset.scala:2660)
- at com.cisco.sdp.cdx.InventoryStreamingJob.InventoryStreaming.lambda$2(InventoryStreaming.java:82)
- at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
- at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
- at scala.collection.Iterator$class.foreach(Iterator.scala:893)
- at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
- at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
- at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
- at o
Add Comment
Please, Sign In to add comment