Advertisement
Guest User

Untitled

a guest
Oct 19th, 2019
161
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.64 KB | None | 0 0
  1. import java.nio.charset.{ Charset, StandardCharsets }
  2.  
  3. import org.apache.spark.sql._
  4. import org.apache.spark.sql.types._
  5.  
  6. object SparkDataLoad {
  7. def fromCsv[A : Encoder](
  8. path: Set[String],
  9. encoding: Charset = StandardCharsets.UTF_8,
  10. useHeader: Boolean = false,
  11. delimiter: Char = '|',
  12. quote: Char = '"',
  13. escape: Char = '\\',
  14. skipLinesStartingWith: Option[Char] = None,
  15. dateFormat: String = "yyyyMMdd",
  16. timestampFormat: String = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX",
  17. representEmptyValueAs: String = "",
  18. treatAsNull: String = "",
  19. treatAsNaN: String = "NaN",
  20. treatAsPositiveInf: String = "Inf",
  21. treatAsNegativeInf: String = "-Inf",
  22. ignoreLeadingWhiteSpace: Boolean = true,
  23. ignoreTrailingWhiteSpace: Boolean = true,
  24. inputFileNameColumn: String = "_source_file"
  25. )(implicit spark: SparkSession): DataFrame = {
  26. spark.read
  27. .option("mode", "PERMISSIVE")
  28. .option("encoding", encoding.name())
  29. .option("header", useHeader)
  30. .option("delimiter", delimiter.toString)
  31. .option("quote", quote.toString)
  32. .option("escape", escape.toString)
  33. .option("dateFormat", dateFormat)
  34. .option("timestampFormat", timestampFormat)
  35. .option("emptyValue", representEmptyValueAs)
  36. .option("nullValue", treatAsNull)
  37. .option("nanValue", treatAsNaN)
  38. .option("positiveInf", treatAsPositiveInf)
  39. .option("negativeInf", treatAsNegativeInf)
  40. .option("comment", skipLinesStartingWith.map(_.toString).orNull)
  41. .option("ignoreLeadingWhiteSpace", ignoreLeadingWhiteSpace)
  42. .option("ignoreTrailingWhiteSpace", ignoreTrailingWhiteSpace)
  43. .schema(implicitly[Encoder[A]].schema)
  44. .csv(path.toSeq: _*)
  45. .withColumn(inputFileNameColumn, input_file_name())
  46. }
  47.  
  48. def toSnowflake(
  49. account: String = "*****.us-east-1.snowflakecomputing.com",
  50. user: String = "dev",
  51. password: String = "***************",
  52. warehouse: String = "dev",
  53. db: String = "dev",
  54. grantReads: Seq[String] = Seq("dev", "prod", "dashboards"), //roles to grant read access to after loading
  55. schema: String,
  56. table: String,
  57. clusterBy: Seq[String] = Nil,
  58. dataset: Dataset[_],
  59. isAppend: Boolean = false
  60. ): Unit = {
  61. def toSnowflakeColumn(field: StructField): String = {
  62. val col = field.dataType match {
  63. case _: BooleanType => "BOOLEAN"
  64. case _: ByteType | _: ShortType | _: IntegerType | _: LongType => "INTEGER"
  65. case _: DecimalType | _: FloatType | _: DoubleType => "REAL"
  66. case _: DateType => "DATE"
  67. case _: TimestampType => "TIMESTAMP_TZ"
  68. case _: StringType | _: VarcharType => "TEXT"
  69. case _: ArrayType => "ARRAY"
  70. case _ => throw new UnsupportedOperationException(s"Unsupported field = ${field}")
  71. }
  72. s"${field.name.toLowerCase} ${if (field.nullable) s"$col" else s"$col NOT NULL"}"
  73. }
  74. val tempTable = s"${table}_stage"
  75.  
  76. val clusterStmt = if (clusterBy.isEmpty) "" else clusterBy.mkString(" CLUSTER BY(", ", ", ")");
  77.  
  78. val createTable = dataset.schema.fields
  79. .map(toSnowflakeColumn)
  80. .mkString(s"CREATE OR REPLACE TRANSIENT TABLE $schema.$tempTable(\n\t", ",\n\t", s") $clusterStmt")
  81.  
  82. val preActions = Seq(
  83. s"USE DATABASE $db",
  84. s"USE WAREHOUSE $warehouse",
  85. s"CREATE SCHEMA IF NOT EXISTS $schema",
  86. s"USE SCHEMA $schema",
  87. createTable
  88. )
  89.  
  90. val postActions = Seq(
  91. s"DROP TABLE IF EXISTS $schema.$table",
  92. s"ALTER TABLE $schema.$tempTable RENAME TO $table"
  93. ) ++ grantReads.flatMap({ role =>
  94. Seq(
  95. s"GRANT USAGE ON SCHEMA $schema TO ROLE $role",
  96. s"GRANT SELECT ON ALL TABLES IN SCHEMA $schema TO ROLE $role",
  97. s"GRANT SELECT ON ALL VIEWS IN SCHEMA $schema TO ROLE $role"
  98. )
  99. })
  100.  
  101. println(((preActions :+ s"COPY DATAFRAME TO ${schema}.${tempTable}") ++ postActions).mkString("", ";\n\n", ";"))
  102.  
  103. dataset
  104. .write
  105. .format("snowflake")
  106. .options(Map(
  107. "sfUrl" -> account,
  108. "sfUser" -> user,
  109. "sfPassword" -> password,
  110. "sfDatabase" -> db,
  111. "sfWarehouse" -> warehouse,
  112. "dbtable" -> table,
  113. "preactions" -> preActions.mkString("", ";", ";"),
  114. "postactions" -> postActions.mkString("", ";", ";")
  115. ))
  116. .mode(if (isAppend) SaveMode.Append else SaveMode.Overwrite)
  117. .save()
  118. }
  119. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement