Guest User

Untitled

a guest
Mar 20th, 2018
82
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.04 KB | None | 0 0
  1. package com.xinpinget.bolt.etl
  2.  
  3. import com.github.nscala_time.time.Imports._
  4. import com.xinpinget.bolt.util.DateUtil.SmartDateTime
  5. import com.xinpinget.bolt.util.EnvUtil
  6. import org.apache.spark.sql.{DataFrame, SparkSession}
  7.  
  8. abstract class BaseMigrater(modelName: String) {
  9.  
  10. val HOST = if (EnvUtil.isDebug()) "106.75.120.59" else "10.10.111.246"
  11.  
  12. val saveTableName = "sc_" + modelName
  13.  
  14. def initSCSpark(): SparkSession = {
  15. var builder = SparkSession.builder
  16. .master(EnvUtil.getMaster())
  17. .appName("SensorsDataETL in sc")
  18. .config("spark.executor.cores", "5")
  19. .config("spark.cores.max", "10")
  20. .config("spark.executor.memory", "15g")
  21. builder.getOrCreate()
  22. }
  23.  
  24. def initSpark(): SparkSession = {
  25. var builder = SparkSession.builder
  26. .master(EnvUtil.getMaster())
  27. .appName("SensorsDataETL")
  28. .config("spark.executor.cores", "5")
  29. .config("spark.cores.max", "10")
  30. .config("spark.executor.memory", "15g")
  31. .enableHiveSupport()
  32. builder.getOrCreate()
  33. }
  34.  
  35. def getModel(): DataFrame = {
  36. val scSpark = initSCSpark()
  37. val sql = if (EnvUtil
  38. .isDebug()) s"(/*SA(default)*/ SELECT * FROM $modelName limit 3) $modelName" else s"(/*SA(default)*/ SELECT * FROM $modelName) $modelName"
  39. var df = scSpark.sqlContext.read.format("jdbc").option("url", s"jdbc:hive2://$HOST:21050/rawdata;auth=noSasl")
  40. .option("driver", "org.apache.hive.jdbc.HiveDriver")
  41. .option("dbtable", sql).load()
  42. df
  43. }
  44.  
  45.  
  46. def transformAndSave(df: DataFrame): Unit
  47.  
  48. def migrate(): Unit = {
  49. val scDf = getModel()
  50. scDf.write.mode("overwrite").save("/tmp/" + saveTableName)
  51. scDf.count()
  52. // scDf.sparkSession.stop()
  53.  
  54. // val df = initSpark().read.load("/tmp/" + saveTableName)
  55. // df.show()
  56. // transformAndSave(df)
  57. // df.sparkSession.stop()
  58. }
  59.  
  60. }
  61.  
  62. class EventMigrater extends BaseMigrater("events") {
  63.  
  64. override def transformAndSave(df: DataFrame): Unit = {
  65. df.withColumn("date", df("date").cast("date")).write.partitionBy("date").mode("overwrite")
  66. .saveAsTable("sc.sc_events")
  67. }
  68. }
  69.  
  70. object SensorsDataETL extends App {
  71.  
  72.  
  73. // def castToDate(df: Dataset[Row], columns: String*): _root_.org.apache.spark.sql.Dataset[_root_.org.apache.spark.sql.Row] = {
  74. // columns.foldLeft(df)((df, col) => {
  75. // df.withColumn(col, to_date(from_unixtime(df(col) / 1000)))
  76. // })
  77. // }
  78. //
  79. // def migrateUsers(): Unit = {
  80. // var users = spark.read.option("inferSchema", true).option("header", true).csv("/tmp/sc_users.csv").repartition(200)
  81. // users = castToDate(users, "$first_visit_time")
  82. // users.write.partitionBy("$first_visit_time").mode("overwrite").saveAsTable("sc.users")
  83. // }
  84. //
  85. // def migrateEvents(): Unit = {
  86. // var events = spark.read.option("inferSchema", true).option("header", true).csv("/tmp/sc_events.csv")
  87. // .repartition(200)
  88. // events = events.withColumn("date", events("date").cast("date"))
  89. // events.write.partitionBy("date").mode("overwrite").saveAsTable("sc.events")
  90. // }
  91.  
  92.  
  93. new EventMigrater().migrate()
  94. }
Add Comment
Please, Sign In to add comment