Guest User

Untitled

a guest
Sep 2nd, 2018
113
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.78 KB | None | 0 0
  1. val spark = SparkSession.builder().master("local[*]")
  2. .appName("sample_data_testing").getOrCreate()
  3.  
  4.  
  5. val schema = new StructType()
  6. .add("name","String")
  7. .add("serial_number","Long")
  8.  
  9. import spark.implicits._
  10. spark.sparkContext.setLogLevel("ERROR")
  11.  
  12. val df = spark
  13. .readStream
  14. .format("kafka")
  15. .option("kafka.bootstrap.servers", "localhost:9092")
  16. .option("subscribe", "topic1")
  17. .option("startingOffsets", "earliest")
  18. .load()
  19.  
  20.  
  21. val data = df.select($"value" cast "string" as "json")
  22. .select(from_json($"json", schema) as "data")
  23. .select("data.*")
  24.  
  25. data.printSchema()
  26.  
  27.  
  28. val pgdata = data.writeStream
  29. .format("jdbc")
  30. .option("url", "jdbc:postgresql://localhost:5432/spark_db")
  31. .option("dbtable", "spark_data")
  32. .option("user", "username")
  33. .option("password", "password")
  34. .start().awaitTermination()
  35. }
  36. }
Add Comment
Please, Sign In to add comment