hivefans

spark_kafka_json.scala

Aug 6th, 2020
1,825
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. // spark读取kafka json嵌套数组数据
  2. // json数据格式
  3. //{"terminalId":4501109,"gps":[{"move":2,"distance":23.3,"gpsId":0,"direct":237.22,"lon":112.581512,"terminalId":4501109,"speed":83.12,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854135000,"lat":23.031654,"height":11.52},{"move":2,"distance":22.65,"gpsId":0,"direct":236.65,"lon":112.581139,"terminalId":4501109,"speed":82.0,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854137000,"lat":23.031427,"height":12.99},{"move":2,"distance":22.77,"gpsId":0,"direct":236.06,"lon":112.580765,"terminalId":4501109,"speed":82.98,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854139000,"lat":23.031197,"height":12.47},{"move":2,"distance":21.41,"gpsId":0,"direct":236.95,"lon":112.580406,"terminalId":4501109,"speed":79.32,"acceleration":0.0,"satelliteNum":24,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854141000,"lat":23.030968,"height":12.03}]}
  4.  
  5. package com.empgo
  6.  
  7. import org.apache.hadoop.conf.Configuration
  8. import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
  9. import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
  10. import org.apache.hadoop.hbase.util.Bytes
  11. import org.json4s._
  12. import org.json4s.jackson.JsonMethods._
  13. import org.apache.kafka.common.serialization.StringDeserializer
  14. import org.apache.spark.{SparkConf, SparkContext}
  15. import org.apache.spark.streaming.{Seconds, StreamingContext}
  16. import org.apache.spark.streaming.kafka010._
  17. import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  18. import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  19.  
  20. object Demo {
  21.   var zookeeperservers = "emg102:2181,emg103:2181,emg104:2181"
  22.  
  23.   case class gpslog(distance:Double,gpsId:Int,direct:Double,lon:Double,terminalId:Long,
  24.                     speed:Double,acceleration:Double,satelliteNum:Int,
  25.                     sysTime:Long,time:Long,lat:Double,height:Double)
  26.   case class log(terminalId:Long, gps: List[gpslog])
  27.  
  28.   def main(args: Array[String]): Unit = {
  29.  
  30.     val conf = new SparkConf().setMaster("local[2]").setAppName("ecargps")
  31.     val ssc = new StreamingContext(conf, Seconds(5))
  32.  
  33.  
  34.     val kafkaParams = Map[String, Object](
  35.       "bootstrap.servers" -> "emg104:9092,emg105:9092,emg106:9092",
  36.       "key.deserializer" -> classOf[StringDeserializer],
  37.       "value.deserializer" -> classOf[StringDeserializer],
  38.       "group.id" -> "for_gps_stream",
  39.       "auto.offset.reset" -> "latest",
  40.       "enable.auto.commit" -> (false: java.lang.Boolean)
  41.     )
  42.  
  43.     val topics = Array("ecar-photo-gps")
  44.     val stream = KafkaUtils.createDirectStream[String, String](
  45.       ssc,
  46.       PreferConsistent,
  47.       Subscribe[String, String](topics, kafkaParams)
  48.     )
  49.  
  50.     stream.map(record => record.value)
  51.       .map(value => {
  52.         //  隐式转换,使用json4s的默认转化器
  53.         implicit val formats: DefaultFormats.type = DefaultFormats
  54.         val json = parse(value)
  55.         // 样式类从JSON对象中提取值
  56.         json.extract[log]
  57.       }).window( Seconds(5), Seconds(5))  // 设置窗口时间,这个为每分钟分析一次一小时内的内容
  58.       .foreachRDD(    // 这里请去了解RDD的概念
  59.         rdd => {
  60.           rdd.foreachPartition(partitionOfRecords => {    // 循环分区
  61.             // 获取Hbase连接,分区创建一个连接,分区不跨节点,不需要序列化
  62.             partitionOfRecords.foreach(logData => {
  63.               logData.gps.foreach(
  64.                 gpslog => {
  65.                   println(gpslog.terminalId + "===" + gpslog.height + "===" + gpslog.sysTime)
  66.                 }
  67.               )
  68.             })
  69.           })
  70.         }
  71.       )
  72.  
  73.     ssc.start()
  74.     ssc.awaitTermination()
  75.  
  76.   }
  77.  
  78. }
  79.  
  80.  
  81.  
RAW Paste Data

Adblocker detected! Please consider disabling it...

We've detected AdBlock Plus or some other adblocking software preventing Pastebin.com from fully loading.

We don't have any obnoxious sound, or popup ads, we actively block these annoying types of ads!

Please add Pastebin.com to your ad blocker whitelist or disable your adblocking software.

×