Advertisement
Guest User

Untitled

a guest
Mar 5th, 2015
204
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.68 KB | None | 0 0
  1. case class LogFile(unixTimestamp: String, one: String, two: String, three: String)
  2.  
  3. val lines = kafkaStream.map(_._2)
  4.  
  5. lines.foreachRDD(
  6. rdd => {
  7. // get particular columns only
  8. val logs = rdd.map(_.split(",")).map(log => LogFile(log(0), log(2), log(3), log(4)))
  9. httpLogs.registerTempTable("logFile")
  10.  
  11. // store in parquet
  12. val results: SchemaRDD = sqlContext.sql("SELECT * FROM logFile")
  13.  
  14. /*if (results.count() > 1) {
  15. hc.sql("insert overwrite table logs_parquet select * from logFile")
  16. }*/
  17. }
  18. )
  19.  
  20. CREATE TABLE logs_parquet
  21. (
  22. unixTimestamp String,
  23. one String,
  24. two String,
  25. three String
  26. )
  27. PARTITIONED BY (create_Datetime timestamp)
  28. STORED AS PARQUET;
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement