Advertisement
Guest User

Untitled

a guest
May 25th, 2016
51
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.19 KB | None | 0 0
  1. def main(args: Array[String]):Unit={
  2. println( "Hello World!" )
  3. val conf=new SparkConf().setAppName("MySparkApp")
  4. val sr=new StreamingContext(conf,Seconds(30))
  5. val sqlContext=new SQLContext(sr.sparkContext)
  6. import sqlContext.implicits._
  7.  
  8. //val file= sr.textFileStream("hdfs:/test/")
  9.  
  10. val kafkaParams = Map[String, String]("metadata.broker.list" -> "emp003:9092")
  11.  
  12. //val offset=KafkaUtil.getOffsets("emp004:2181,emp003:2181,emp001:2181","myid","mytopic")
  13. //KafkaUtil.fillInLatestOffsets(offset, kafka)
  14. val topic = Set("mytopic")
  15. val messages=KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](sr, kafkaParams, topic)
  16. val value=messages.map(_._2)//取value值
  17. println("总共收到"+value.count()+"条数据")
  18. value.print()
  19. try {
  20. value.foreachRDD { rdd =>
  21.  
  22. rdd.foreachPartitionAsync { records =>
  23. val mysql=new MysqlJdbc("cm_spark")
  24. records.foreach { line =>
  25. val data=line.split("\|")
  26. val person=new Person(data(0),data(1),data(2),data(3))
  27. mysql.insertintoTable("insert into spark values(?,?,?,?)", person)
  28. }
  29. sr.start()
  30. sr.awaitTermination()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement