Advertisement
Guest User

Untitled

a guest
Jun 1st, 2016
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.33 KB | None | 0 0
  1. object testDataFrame {
  2. def main(args: Array[String]) {
  3. val conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount")
  4. val sc = new SparkContext(conf)
  5. val ssc = new StreamingContext(sc, Seconds(20))
  6. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  7. val schema = StructType(Seq(
  8. StructField("word", StringType, nullable = false),
  9. StructField("count", IntegerType, nullable = false)
  10. ))
  11. sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "accessKeyId")
  12. sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "accessKey")
  13.  
  14.  
  15. val lines = ssc.socketTextStream("127.0.0.1", "9999".toInt, StorageLevel.MEMORY_AND_DISK_SER)
  16. val words = lines.flatMap(_.split(" "))
  17.  
  18. val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
  19. //sqlContext.createDataFrame(rowRDD, schema)
  20. val stream = wordCounts.foreachRDD{
  21. rdd =>
  22. if(!rdd.partitions.isEmpty)
  23. sqlContext.createDataFrame(
  24. rdd.map(x=> Row(x._1,x._2)),schema
  25. ).write
  26. .format("com.databricks.spark.redshift")
  27. .option("url", "jdbc:redshift://hostName:portNumber/dbName?user=#######&password=########")
  28. .option("dbtable", "streamingwc")
  29. .option("tempdir", "s3n://testBucket/spark/")
  30. .mode("append")
  31. .save()
  32. }
  33.  
  34. wordCounts.print()
  35. ssc.start()
  36. ssc.awaitTermination()
  37. }
  38. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement