Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- object testDataFrame {
- def main(args: Array[String]) {
- val conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount")
- val sc = new SparkContext(conf)
- val ssc = new StreamingContext(sc, Seconds(20))
- val sqlContext = new org.apache.spark.sql.SQLContext(sc)
- val schema = StructType(Seq(
- StructField("word", StringType, nullable = false),
- StructField("count", IntegerType, nullable = false)
- ))
- sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "accessKeyId")
- sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "accessKey")
- val lines = ssc.socketTextStream("127.0.0.1", "9999".toInt, StorageLevel.MEMORY_AND_DISK_SER)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
- //sqlContext.createDataFrame(rowRDD, schema)
- val stream = wordCounts.foreachRDD{
- rdd =>
- if(!rdd.partitions.isEmpty)
- sqlContext.createDataFrame(
- rdd.map(x=> Row(x._1,x._2)),schema
- ).write
- .format("com.databricks.spark.redshift")
- .option("url", "jdbc:redshift://hostName:portNumber/dbName?user=#######&password=########")
- .option("dbtable", "streamingwc")
- .option("tempdir", "s3n://testBucket/spark/")
- .mode("append")
- .save()
- }
- wordCounts.print()
- ssc.start()
- ssc.awaitTermination()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement