Advertisement
hivefans

spark_kafka_json.scala

Aug 6th, 2020
2,914
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.83 KB | None | 0 0
  1. // spark读取kafka json嵌套数组数据
  2. // json数据格式
  3. //{"terminalId":4501109,"gps":[{"move":2,"distance":23.3,"gpsId":0,"direct":237.22,"lon":112.581512,"terminalId":4501109,"speed":83.12,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854135000,"lat":23.031654,"height":11.52},{"move":2,"distance":22.65,"gpsId":0,"direct":236.65,"lon":112.581139,"terminalId":4501109,"speed":82.0,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854137000,"lat":23.031427,"height":12.99},{"move":2,"distance":22.77,"gpsId":0,"direct":236.06,"lon":112.580765,"terminalId":4501109,"speed":82.98,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854139000,"lat":23.031197,"height":12.47},{"move":2,"distance":21.41,"gpsId":0,"direct":236.95,"lon":112.580406,"terminalId":4501109,"speed":79.32,"acceleration":0.0,"satelliteNum":24,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854141000,"lat":23.030968,"height":12.03}]}
  4.  
  5. package com.empgo
  6.  
  7. import org.apache.hadoop.conf.Configuration
  8. import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
  9. import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
  10. import org.apache.hadoop.hbase.util.Bytes
  11. import org.json4s._
  12. import org.json4s.jackson.JsonMethods._
  13. import org.apache.kafka.common.serialization.StringDeserializer
  14. import org.apache.spark.{SparkConf, SparkContext}
  15. import org.apache.spark.streaming.{Seconds, StreamingContext}
  16. import org.apache.spark.streaming.kafka010._
  17. import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  18. import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  19.  
  20. object Demo {
  21.   var zookeeperservers = "emg102:2181,emg103:2181,emg104:2181"
  22.  
  23.   case class gpslog(distance:Double,gpsId:Int,direct:Double,lon:Double,terminalId:Long,
  24.                     speed:Double,acceleration:Double,satelliteNum:Int,
  25.                     sysTime:Long,time:Long,lat:Double,height:Double)
  26.   case class log(terminalId:Long, gps: List[gpslog])
  27.  
  28.   def main(args: Array[String]): Unit = {
  29.  
  30.     val conf = new SparkConf().setMaster("local[2]").setAppName("ecargps")
  31.     val ssc = new StreamingContext(conf, Seconds(5))
  32.  
  33.  
  34.     val kafkaParams = Map[String, Object](
  35.       "bootstrap.servers" -> "emg104:9092,emg105:9092,emg106:9092",
  36.       "key.deserializer" -> classOf[StringDeserializer],
  37.       "value.deserializer" -> classOf[StringDeserializer],
  38.       "group.id" -> "for_gps_stream",
  39.       "auto.offset.reset" -> "latest",
  40.       "enable.auto.commit" -> (false: java.lang.Boolean)
  41.     )
  42.  
  43.     val topics = Array("ecar-photo-gps")
  44.     val stream = KafkaUtils.createDirectStream[String, String](
  45.       ssc,
  46.       PreferConsistent,
  47.       Subscribe[String, String](topics, kafkaParams)
  48.     )
  49.  
  50.     stream.map(record => record.value)
  51.       .map(value => {
  52.         //  隐式转换,使用json4s的默认转化器
  53.         implicit val formats: DefaultFormats.type = DefaultFormats
  54.         val json = parse(value)
  55.         // 样式类从JSON对象中提取值
  56.         json.extract[log]
  57.       }).window( Seconds(5), Seconds(5))  // 设置窗口时间,这个为每分钟分析一次一小时内的内容
  58.       .foreachRDD(    // 这里请去了解RDD的概念
  59.         rdd => {
  60.           rdd.foreachPartition(partitionOfRecords => {    // 循环分区
  61.             // 获取Hbase连接,分区创建一个连接,分区不跨节点,不需要序列化
  62.             partitionOfRecords.foreach(logData => {
  63.               logData.gps.foreach(
  64.                 gpslog => {
  65.                   println(gpslog.terminalId + "===" + gpslog.height + "===" + gpslog.sysTime)
  66.                 }
  67.               )
  68.             })
  69.           })
  70.         }
  71.       )
  72.  
  73.     ssc.start()
  74.     ssc.awaitTermination()
  75.  
  76.   }
  77.  
  78. }
  79.  
  80.  
  81.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement