Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #had to get aache spark 2.2.1 to fix Kafka batch write issue
- #needed to ln -s all major hadoop jars into jars for spark, and added
- #spark.driver.extraJavaOptions -Dhdp.version=current
- #spark.yarn.am.extraJavaOptions -Dhdp.version=current
- #to the spark defaults.configuration
- #ln hbase-site into spark conf to resolve zk issues
- ./spark-shell --master yarn --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11,org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.1 --repositories http://repo.hortonworks.com/content/groups/public/ --files /etc/hbase/conf/hbase-site.xml --conf "spark.hadoop.yarn.timeline-service.enabled=false"
- import org.apache.spark.sql.execution.datasources.hbase.{HBaseRelation, HBaseTableCatalog}
- import org.apache.spark.sql.{DataFrame, SparkSession}
- import org.apache.spark.sql.SQLContext
- import org.apache.kafka.clients.consumer.ConsumerRecord
- import org.apache.kafka.common.serialization.StringDeserializer
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- //Define the table schema for the RDD // DataFrame
- def catalog = s"""{
- |"table":{"namespace":"default", "name":"TencentNews_Users"},
- |"rowkey":"key",
- |"columns":{
- |"key":{"cf":"rowkey", "col":"key", "type":"string"},
- |"value":{"cf":"cfInfo", "col":"tcUser", "type":"string"}
- |}
- |}""".stripMargin
- def withCatalog(cat: String): DataFrame = {
- sqlContext
- .read
- .options(Map(HBaseTableCatalog.tableCatalog->cat))
- .format("org.apache.spark.sql.execution.datasources.hbase")
- .load()
- }
- val df = withCatalog(catalog)
- val oneDF = df.take(1)
- // Write key-value data from a DataFrame to a specific Kafka topic specified in an option
- query = df.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value").write.format("kafka").option("kafka.bootstrap.servers", "hdp-prod-slave05:9092,hdp-prod-slave06:9092").option("topic","joe.spark.test").save()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement