Advertisement
aironman

Kafka

Apr 3rd, 2019
154
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package chapter9
  2.  
  3. import org.apache.kafka.clients.consumer.ConsumerConfig
  4. import org.apache.log4j.{Level, Logger}
  5. import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  6. import org.apache.spark.sql.{Row, SQLContext}
  7. import org.apache.spark.streaming._
  8. import org.apache.spark.streaming.kafka010._
  9. import org.apache.spark.{SparkConf, TaskContext}
  10.  
  11. object KafkaAndSparkStreaming {
  12.  
  13. /*main method*/
  14. def main(args: Array[String]) {
  15. /*logging at warning level*/
  16. Logger.getRootLogger.setLevel(Level.WARN)
  17.  
  18. /*Since spark streaming is a consumer, we need to pass it topics*/
  19. val topics = Set("pharma")
  20. /*pass kafka configuration details*/
  21. val kafkaParameters = Map[String, String](
  22. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
  23. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
  24. ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  25. ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
  26. ConsumerConfig.GROUP_ID_CONFIG -> "consumerGroup",
  27. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false"
  28. )
  29.  
  30. /*create spark configurations*/
  31. val sparkConf = new SparkConf().setAppName("KafkaAndSparkStreaming").setMaster("local[*]")
  32. /*create spark streaming context*/
  33. val ssc = new StreamingContext(sparkConf, Seconds(30))
  34. /*There are multiple ways to get data from kafka topic using KafkaUtils class.
  35. * One such method to get data using createDirectStream where this method pulls data using direct stream approach*/
  36. // val rawStream = KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](ssc, kafkaParameters, topics)
  37.  
  38. val rawStream = KafkaUtils.createDirectStream[String, String](
  39. ssc,
  40. LocationStrategies.PreferConsistent,
  41. ConsumerStrategies.Subscribe[String, String](topics, kafkaParameters))
  42.  
  43. /*gets raw data from topic, explode it, and break it into words */
  44. val records = rawStream.map(_.value())
  45.  
  46. /*Get metadata of the records and perform analytics on records*/
  47. rawStream.foreachRDD { rdd =>
  48. /*get offset ranges of partitions that will be used to get partition and offset information
  49. * and also this information will be used to commit the offset*/
  50. val rangeOfOffsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  51. /*get partition information*/
  52. rdd.foreachPartition { iter =>
  53. val metadata = rangeOfOffsets(TaskContext.get.partitionId)
  54. /*print topic, partition, fromoofset and lastoffset of each partition*/
  55. println(s"${metadata.topic} ${metadata.partition} ${metadata.fromOffset} ${metadata.untilOffset}")
  56. }
  57. /*using SQL on top of streams*/
  58. /*create SQL context*/
  59. val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  60.  
  61. /*convert rdd into dataframe by providing header information*/
  62. // val pharmaAnalytics = rdd.toDF("country","time","pc_healthxp","pc_gdp","usd_cap","flag_codes","total_spend")
  63.  
  64.  
  65. def dfSchema(columnNames: List[String]): StructType =
  66. StructType(
  67. Seq(
  68. StructField(name = "country", dataType = StringType, nullable = false),
  69. StructField(name = "time", dataType = IntegerType, nullable = false),
  70. StructField(name = "pc_healthxp", dataType = IntegerType, nullable = false),
  71. StructField(name = "pc_gdp", dataType = IntegerType, nullable = false),
  72. StructField(name = "usd_cap", dataType = IntegerType, nullable = false),
  73. StructField(name = "flag_codes", dataType = IntegerType, nullable = false),
  74. StructField(name = "total_spend", dataType = IntegerType, nullable = false)
  75. )
  76. )
  77.  
  78.  
  79. def row(line: List[String]): Row = Row(line(0), line(1).toInt, line(2).toInt, line(3).toInt, line(4).toInt, line(5).toInt, line(6).toInt)
  80.  
  81. val schema = dfSchema(List("country", "time", "pc_healthxp", "pc_gdp", "usd_cap", "flag_codes", "total_spend"))
  82.  
  83. val data = rdd.map(_.value().split(",").to[List]).map(row)
  84. //val data = rdd.map(_.split(",").to[List]).map(row)
  85. val pharmaAnalyticsDF = sqlContext.createDataFrame(data, schema)
  86. /*create temporary table*/
  87. pharmaAnalyticsDF.registerTempTable("pharmaAnalytics")
  88.  
  89. /*find total gdp spending per country*/
  90. val gdpByCountryDF = sqlContext.sql("select country, sum(total_gdp) as total_gdp from pharmaAnalytics group by country")
  91. gdpByCountryDF.show(10)
  92.  
  93. /*commit the offset after all the processing is completed*/
  94. rawStream.asInstanceOf[CanCommitOffsets].commitAsync(rangeOfOffsets)
  95. }
  96. ssc.awaitTermination()
  97. }
  98. }
Advertisement
Advertisement
Advertisement
RAW Paste Data Copied
Advertisement