Advertisement
josephxsxn

spark hbase kafka

Feb 27th, 2018
125
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Bash 1.91 KB | None | 0 0
  1.  
  2. #had to get aache spark 2.2.1 to fix Kafka batch write issue
  3. #needed to ln -s all major hadoop jars into jars for spark, and added
  4. #spark.driver.extraJavaOptions -Dhdp.version=current
  5. #spark.yarn.am.extraJavaOptions -Dhdp.version=current
  6. #to the spark defaults.configuration
  7. #ln hbase-site into spark conf to resolve zk issues
  8.  
  9. ./spark-shell --master yarn --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11,org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.1 --repositories http://repo.hortonworks.com/content/groups/public/ --files /etc/hbase/conf/hbase-site.xml --conf "spark.hadoop.yarn.timeline-service.enabled=false"
  10.  
  11.  
  12.  
  13. import org.apache.spark.sql.execution.datasources.hbase.{HBaseRelation, HBaseTableCatalog}
  14. import org.apache.spark.sql.{DataFrame, SparkSession}
  15. import org.apache.spark.sql.SQLContext
  16.  
  17. import org.apache.kafka.clients.consumer.ConsumerRecord
  18. import org.apache.kafka.common.serialization.StringDeserializer
  19.  
  20. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  21.  
  22. //Define the table schema for the RDD // DataFrame
  23. def catalog = s"""{
  24.        |"table":{"namespace":"default", "name":"TencentNews_Users"},
  25.        |"rowkey":"key",
  26.        |"columns":{
  27.          |"key":{"cf":"rowkey", "col":"key", "type":"string"},
  28.          |"value":{"cf":"cfInfo", "col":"tcUser", "type":"string"}
  29.        |}
  30.      |}""".stripMargin
  31.  
  32. def  withCatalog(cat: String): DataFrame = {
  33.   sqlContext
  34.   .read
  35.   .options(Map(HBaseTableCatalog.tableCatalog->cat))
  36.   .format("org.apache.spark.sql.execution.datasources.hbase")
  37.   .load()
  38. }
  39.  
  40. val df = withCatalog(catalog)
  41. val oneDF = df.take(1)
  42.  
  43.  
  44.   // Write key-value data from a DataFrame to a specific Kafka topic specified in an option
  45. query = df.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value").write.format("kafka").option("kafka.bootstrap.servers", "hdp-prod-slave05:9092,hdp-prod-slave06:9092").option("topic","joe.spark.test").save()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement