Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- case class LogFile(unixTimestamp: String, one: String, two: String, three: String)
- val lines = kafkaStream.map(_._2)
- lines.foreachRDD(
- rdd => {
- // get particular columns only
- val logs = rdd.map(_.split(",")).map(log => LogFile(log(0), log(2), log(3), log(4)))
- httpLogs.registerTempTable("logFile")
- // store in parquet
- val results: SchemaRDD = sqlContext.sql("SELECT * FROM logFile")
- /*if (results.count() > 1) {
- hc.sql("insert overwrite table logs_parquet select * from logFile")
- }*/
- }
- )
- CREATE TABLE logs_parquet
- (
- unixTimestamp String,
- one String,
- two String,
- three String
- )
- PARTITIONED BY (create_Datetime timestamp)
- STORED AS PARQUET;
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement