Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.xinpinget.bolt.etl
- import com.github.nscala_time.time.Imports._
- import com.xinpinget.bolt.util.DateUtil.SmartDateTime
- import com.xinpinget.bolt.util.EnvUtil
- import org.apache.spark.sql.{DataFrame, SparkSession}
- abstract class BaseMigrater(modelName: String) {
- val HOST = if (EnvUtil.isDebug()) "106.75.120.59" else "10.10.111.246"
- val saveTableName = "sc_" + modelName
- def initSCSpark(): SparkSession = {
- var builder = SparkSession.builder
- .master(EnvUtil.getMaster())
- .appName("SensorsDataETL in sc")
- .config("spark.executor.cores", "5")
- .config("spark.cores.max", "10")
- .config("spark.executor.memory", "15g")
- builder.getOrCreate()
- }
- def initSpark(): SparkSession = {
- var builder = SparkSession.builder
- .master(EnvUtil.getMaster())
- .appName("SensorsDataETL")
- .config("spark.executor.cores", "5")
- .config("spark.cores.max", "10")
- .config("spark.executor.memory", "15g")
- .enableHiveSupport()
- builder.getOrCreate()
- }
- def getModel(): DataFrame = {
- val scSpark = initSCSpark()
- val sql = if (EnvUtil
- .isDebug()) s"(/*SA(default)*/ SELECT * FROM $modelName limit 3) $modelName" else s"(/*SA(default)*/ SELECT * FROM $modelName) $modelName"
- var df = scSpark.sqlContext.read.format("jdbc").option("url", s"jdbc:hive2://$HOST:21050/rawdata;auth=noSasl")
- .option("driver", "org.apache.hive.jdbc.HiveDriver")
- .option("dbtable", sql).load()
- df
- }
- def transformAndSave(df: DataFrame): Unit
- def migrate(): Unit = {
- val scDf = getModel()
- scDf.write.mode("overwrite").save("/tmp/" + saveTableName)
- scDf.count()
- // scDf.sparkSession.stop()
- // val df = initSpark().read.load("/tmp/" + saveTableName)
- // df.show()
- // transformAndSave(df)
- // df.sparkSession.stop()
- }
- }
- class EventMigrater extends BaseMigrater("events") {
- override def transformAndSave(df: DataFrame): Unit = {
- df.withColumn("date", df("date").cast("date")).write.partitionBy("date").mode("overwrite")
- .saveAsTable("sc.sc_events")
- }
- }
- object SensorsDataETL extends App {
- // def castToDate(df: Dataset[Row], columns: String*): _root_.org.apache.spark.sql.Dataset[_root_.org.apache.spark.sql.Row] = {
- // columns.foldLeft(df)((df, col) => {
- // df.withColumn(col, to_date(from_unixtime(df(col) / 1000)))
- // })
- // }
- //
- // def migrateUsers(): Unit = {
- // var users = spark.read.option("inferSchema", true).option("header", true).csv("/tmp/sc_users.csv").repartition(200)
- // users = castToDate(users, "$first_visit_time")
- // users.write.partitionBy("$first_visit_time").mode("overwrite").saveAsTable("sc.users")
- // }
- //
- // def migrateEvents(): Unit = {
- // var events = spark.read.option("inferSchema", true).option("header", true).csv("/tmp/sc_events.csv")
- // .repartition(200)
- // events = events.withColumn("date", events("date").cast("date"))
- // events.write.partitionBy("date").mode("overwrite").saveAsTable("sc.events")
- // }
- new EventMigrater().migrate()
- }
Add Comment
Please, Sign In to add comment