Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package org.bianqi.spark
- import java.util.Properties
- import org.apache.spark.sql.{Row, SQLContext}
- import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
- import org.apache.spark.{SparkConf, SparkContext}
- /**
- * Created by BQ
- * School www.qqhru.edu.cn
- * Date 2017/8/11
- */
- object JdbcRDD {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setAppName("MYSQL-DEMO")
- val sc = new SparkContext(conf)
- val sqlContext = new SQLContext(sc)
- val personRDD = sc.parallelize(Array("1 tom 4",
- "2 jerry 3","3 kitty 6")).map(_.split(" "))
- val schema = StructType(
- List(
- StructField("id",IntegerType,true),
- StructField("name",StringType,true),
- StructField("age",IntegerType,true)
- )
- )
- val rowRDD = personRDD.map(p=>Row(p(0).toInt,
- p(1).trim,p(2).toInt))
- val personDataFrame = sqlContext.createDataFrame(rowRDD,schema)
- val prop = new Properties()
- prop.put("user","root")
- prop.put("password","123")
- personDataFrame.write.mode("append")
- .jdbc("jdbc:mysql://localhost:3306/user","bigdata.person", prop)
- sc.stop()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement