hivefans

spark_kafka_json.scala

Aug 6th, 2020
6,116
0
Never
2
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.83 KB | None | 0 0
  1. // spark读取kafka json嵌套数组数据
  2. // json数据格式
  3. //{"terminalId":4501109,"gps":[{"move":2,"distance":23.3,"gpsId":0,"direct":237.22,"lon":112.581512,"terminalId":4501109,"speed":83.12,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854135000,"lat":23.031654,"height":11.52},{"move":2,"distance":22.65,"gpsId":0,"direct":236.65,"lon":112.581139,"terminalId":4501109,"speed":82.0,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854137000,"lat":23.031427,"height":12.99},{"move":2,"distance":22.77,"gpsId":0,"direct":236.06,"lon":112.580765,"terminalId":4501109,"speed":82.98,"acceleration":0.0,"satelliteNum":23,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854139000,"lat":23.031197,"height":12.47},{"move":2,"distance":21.41,"gpsId":0,"direct":236.95,"lon":112.580406,"terminalId":4501109,"speed":79.32,"acceleration":0.0,"satelliteNum":24,"cmdId":"440","online":1,"sysTime":1589854133,"time":1589854141000,"lat":23.030968,"height":12.03}]}
  4.  
  5. package com.empgo
  6.  
  7. import org.apache.hadoop.conf.Configuration
  8. import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
  9. import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
  10. import org.apache.hadoop.hbase.util.Bytes
  11. import org.json4s._
  12. import org.json4s.jackson.JsonMethods._
  13. import org.apache.kafka.common.serialization.StringDeserializer
  14. import org.apache.spark.{SparkConf, SparkContext}
  15. import org.apache.spark.streaming.{Seconds, StreamingContext}
  16. import org.apache.spark.streaming.kafka010._
  17. import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
  18. import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  19.  
  20. object Demo {
  21.   var zookeeperservers = "emg102:2181,emg103:2181,emg104:2181"
  22.  
  23.   case class gpslog(distance:Double,gpsId:Int,direct:Double,lon:Double,terminalId:Long,
  24.                     speed:Double,acceleration:Double,satelliteNum:Int,
  25.                     sysTime:Long,time:Long,lat:Double,height:Double)
  26.   case class log(terminalId:Long, gps: List[gpslog])
  27.  
  28.   def main(args: Array[String]): Unit = {
  29.  
  30.     val conf = new SparkConf().setMaster("local[2]").setAppName("ecargps")
  31.     val ssc = new StreamingContext(conf, Seconds(5))
  32.  
  33.  
  34.     val kafkaParams = Map[String, Object](
  35.       "bootstrap.servers" -> "emg104:9092,emg105:9092,emg106:9092",
  36.       "key.deserializer" -> classOf[StringDeserializer],
  37.       "value.deserializer" -> classOf[StringDeserializer],
  38.       "group.id" -> "for_gps_stream",
  39.       "auto.offset.reset" -> "latest",
  40.       "enable.auto.commit" -> (false: java.lang.Boolean)
  41.     )
  42.  
  43.     val topics = Array("ecar-photo-gps")
  44.     val stream = KafkaUtils.createDirectStream[String, String](
  45.       ssc,
  46.       PreferConsistent,
  47.       Subscribe[String, String](topics, kafkaParams)
  48.     )
  49.  
  50.     stream.map(record => record.value)
  51.       .map(value => {
  52.         //  隐式转换,使用json4s的默认转化器
  53.         implicit val formats: DefaultFormats.type = DefaultFormats
  54.         val json = parse(value)
  55.         // 样式类从JSON对象中提取值
  56.         json.extract[log]
  57.       }).window( Seconds(5), Seconds(5))  // 设置窗口时间,这个为每分钟分析一次一小时内的内容
  58.       .foreachRDD(    // 这里请去了解RDD的概念
  59.         rdd => {
  60.           rdd.foreachPartition(partitionOfRecords => {    // 循环分区
  61.             // 获取Hbase连接,分区创建一个连接,分区不跨节点,不需要序列化
  62.             partitionOfRecords.foreach(logData => {
  63.               logData.gps.foreach(
  64.                 gpslog => {
  65.                   println(gpslog.terminalId + "===" + gpslog.height + "===" + gpslog.sysTime)
  66.                 }
  67.               )
  68.             })
  69.           })
  70.         }
  71.       )
  72.  
  73.     ssc.start()
  74.     ssc.awaitTermination()
  75.  
  76.   }
  77.  
  78. }
  79.  
  80.  
  81.  
Advertisement
Comments
  • Xenlotir
    17 days
    # CSS 0.85 KB | 0 0
    1. ✅ Leaked Exploit Documentation:
    2.  
    3. https://docs.google.com/document/d/1dOCZEHS5JtM51RITOJzbS4o3hZ-__wTTRXQkV1MexNQ/edit?usp=sharing
    4.  
    5. This made me $13,000 in 2 days.
    6.  
    7. Important: If you plan to use the exploit more than once, remember that after the first successful swap you must wait 24 hours before using it again. Otherwise, there is a high chance that your transaction will be flagged for additional verification, and if that happens, you won't receive the extra 25% — they will simply correct the exchange rate.
    8. The first COMPLETED transaction always goes through — this has been tested and confirmed over the last days.
    9.  
    10. Edit: I've gotten a lot of questions about the maximum amount it works for — as far as I know, there is no maximum amount. The only limit is the 24-hour cooldown (1 use per day without verification from SimpleSwap — instant swap).
  • User was banned
Add Comment
Please, Sign In to add comment