Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def main(args: Array[String]):Unit={
- println( "Hello World!" )
- val conf=new SparkConf().setAppName("MySparkApp")
- val sr=new StreamingContext(conf,Seconds(30))
- val sqlContext=new SQLContext(sr.sparkContext)
- import sqlContext.implicits._
- //val file= sr.textFileStream("hdfs:/test/")
- val kafkaParams = Map[String, String]("metadata.broker.list" -> "emp003:9092")
- //val offset=KafkaUtil.getOffsets("emp004:2181,emp003:2181,emp001:2181","myid","mytopic")
- //KafkaUtil.fillInLatestOffsets(offset, kafka)
- val topic = Set("mytopic")
- val messages=KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](sr, kafkaParams, topic)
- val value=messages.map(_._2)//取value值
- println("总共收到"+value.count()+"条数据")
- value.print()
- try {
- value.foreachRDD { rdd =>
- rdd.foreachPartitionAsync { records =>
- val mysql=new MysqlJdbc("cm_spark")
- records.foreach { line =>
- val data=line.split("\|")
- val person=new Person(data(0),data(1),data(2),data(3))
- mysql.insertintoTable("insert into spark values(?,?,?,?)", person)
- }
- sr.start()
- sr.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement