Guest User

Untitled

a guest
Apr 20th, 2018
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.14 KB | None | 0 0
  1. public static void main(String[] args) {
  2. conf = new SparkConf().setAppName("Inventory-Streaming");
  3. // .setMaster("spark://192.168.56.1:7077");
  4. sc = new JavaSparkContext(conf);
  5. ssc = new JavaStreamingContext(sc, new Duration(1000));
  6.  
  7. kafkaParams.put("metadata.broker.list",
  8. "ip-1us-eastompute.internal:9092");
  9. Set<String> topics = Collections.singleton("inventory");
  10.  
  11. JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class,
  12. String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
  13.  
  14. JavaDStream<String> invStream = directKafkaStream.map(rdd -> {
  15. logger.info("Input JSON String ------->" + rdd._2);
  16. System.out.println("Input JSON String ------->" + rdd._2);
  17. return rdd._2;
  18. });
  19.  
  20. invStream.foreachRDD(invrdd -> {
  21. if (!invrdd.isEmpty()) {
  22. SparkSession spark = JavaSparkSessionSingleton.getInstance(invrdd.context().getConf());
  23. Dataset<Row> invDataset = spark.read().option("mode", "PERMISSIVE").json(invrdd);
  24. /*
  25. * Created temporary view for the inventory json.
  26. */
  27. invDataset.createOrReplaceTempView("inventory");
  28. invDataset.printSchema();
  29. /*
  30. * writing the keys (PartyID,SourceNeID,SourcePartyID) to the hive table.
  31. */
  32. Dataset<Row> reducedInventory = spark.sql("select partyId,sourceNeId,sourcePartyId from inventory")
  33. .withColumn("inv_devices_documentid",
  34. concat(col("partyId"), lit("_"), col("sourceNeId"), lit("_"), col("sourcePartyId")));
  35.  
  36. logger.info("Inserting data into the Hive table inv_devices_incr");
  37.  
  38. reducedInventory.write().mode(SaveMode.Append).insertInto("cdx_network.inv_devices_incr");
  39.  
  40. reducedInventory.toJavaRDD().foreach(row -> {
  41. /*
  42. * Writing the dataset to the HDFS formulated path.
  43. */
  44. StringBuilder pathBuilder = new StringBuilder();
  45. pathBuilder.append(row.getString(0)).append("_").append(row.getString(1)).append("_")
  46. .append(row.getString(2));
  47. logger.info("Creating HDFS directory with the path /user/creando/cdx/inv_devices/" + pathBuilder.toString());
  48. System.out.println("Creating HDFS directory with the path " + "/user/creando/cdx/inv_devices/"+pathBuilder.toString());
  49. invDataset.write().mode(SaveMode.Append).json("/user/creando/cdx/inv_devices/"+pathBuilder.toString());
  50. });
  51. }
  52. });
  53. ssc.start();
  54. try
  55. {
  56. ssc.awaitTermination();
  57. } catch (InterruptedException e) {
  58. e.printStackTrace();
  59. } finally {
  60. ssc.stop();
  61. }
  62. }
  63.  
  64. 2018-04-20 10:21:33,020 INFO [Executor task launch worker for task 95] InventoryStreamingJob.InventoryStreaming: Creating HDFS directory with the path /user
  65. /creando/cdx/inv_devices/700001#596970dba94c040001381a71#700001
  66. 2018-04-20 10:21:33,023 ERROR [Executor task launch worker for task 95] executor.Executor: Exception in task 0.2 in stage 93.0 (TID 95)
  67. java.lang.NullPointerException
  68. at org.apache.spark.sql.Dataset.isStreaming(Dataset.scala:485)
  69. at org.apache.spark.sql.Dataset.write(Dataset.scala:2660)
  70. at com.cisco.sdp.cdx.InventoryStreamingJob.InventoryStreaming.lambda$2(InventoryStreaming.java:82)
  71. at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
  72. at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreach$1.apply(JavaRDDLike.scala:351)
  73. at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  74. at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  75. at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
  76. at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
  77. at o
Add Comment
Please, Sign In to add comment