Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- val spark = SparkSession.builder().master("local[*]")
- .appName("sample_data_testing").getOrCreate()
- val schema = new StructType()
- .add("name","String")
- .add("serial_number","Long")
- import spark.implicits._
- spark.sparkContext.setLogLevel("ERROR")
- val df = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", "localhost:9092")
- .option("subscribe", "topic1")
- .option("startingOffsets", "earliest")
- .load()
- val data = df.select($"value" cast "string" as "json")
- .select(from_json($"json", schema) as "data")
- .select("data.*")
- data.printSchema()
- val pgdata = data.writeStream
- .format("jdbc")
- .option("url", "jdbc:postgresql://localhost:5432/spark_db")
- .option("dbtable", "spark_data")
- .option("user", "username")
- .option("password", "password")
- .start().awaitTermination()
- }
- }
Add Comment
Please, Sign In to add comment