Advertisement
Guest User

Untitled

a guest
Jun 6th, 2017
111
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.55 KB | None | 0 0
  1. import java.io.File
  2. import org.apache.spark.sql.SaveMode
  3.  
  4. case class FooBar(foo: Option[String], bar: Option[Int], baz: Option[Long], dou: Option[Double])
  5.  
  6. object MyThing extends App {
  7. val spark = createSparkSession ... from spark configuration
  8.  
  9. import spark.implicits._
  10.  
  11. val dataPath = "/my/path/to/file"
  12. val df = spark.read
  13. .option("header", true)
  14. .option("inferSchema", true)
  15. .option("delimiter", ";")
  16. .csv(dataPath)
  17. df.printSchema
  18. //df.show
  19. // check that driver is available
  20. Class.forName("oracle.jdbc.driver.OracleDriver")
  21.  
  22. val jdbcHostname = "host.foo.com"
  23. val jdbcPort = 1522
  24. val jdbcDatabase = "dbname"
  25. val jdbcUrl = s"jdbc:oracle:thin:@${jdbcHostname}:${jdbcPort}:${jdbcDatabase}"
  26. print(s"using JDBC URL: ${jdbcUrl}")
  27. val connectionProperties = new java.util.Properties()
  28.  
  29. connectionProperties.setProperty("user", "username")
  30. connectionProperties.setProperty("password", "password")
  31.  
  32. // find columns where the names are too long for oracle
  33. df.columns.map(c => (c, c.length)).filter(_._2 > 25).foreach(println)
  34. df
  35. .write
  36. .mode(SaveMode.Overwrite)
  37. .jdbc(jdbcUrl, "schema.fulltablename", connectionProperties)
  38. val minimal = Seq(
  39. FooBar(Some("first"), Some(1), Some(1L), Some(1.0)),
  40. FooBar(Some("second"), Some(2), Some(2L), Some(2.0))
  41. ).toDS
  42. val minimalIntLong = Seq(
  43. FooBar(Some("first"), Some(1), Some(1), Some(1)),
  44. FooBar(Some("second"), Some(2), Some(2), Some(2))
  45. ).toDS
  46. val minimalNulls = Seq(
  47. FooBar(Some("first"), null, null, null),
  48. FooBar(Some("second"), Some(2), null, Some(2.0))
  49. ).toDS
  50. val minimalNullsNone = Seq(FooBar(None, None, None, None)).toDS
  51. val minimalIntLongNulls = Seq(
  52. FooBar(Some("first"), Some(1), null, null),
  53. FooBar(Some("second"), Some(2), Some(2), null)
  54. ).toDS
  55.  
  56. minimal.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, "schema.fulltablename1", connectionProperties)
  57. minimalIntLong.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, "schema.fulltablename2", connectionProperties)
  58.  
  59. minimalNulls.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, "schema.fulltablename3", connectionProperties)
  60. minimalNullsNone.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, "schema.fulltablename4", connectionProperties)
  61. minimalIntLongNulls.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, "schema.fulltablename5", connectionProperties)
  62.  
  63. // now try to find the problematic column
  64. for (col <- df.columns) {
  65. println(s"using column: ${col}")
  66. df.select(col).write
  67. .mode(SaveMode.Overwrite)
  68. .jdbc(jdbcUrl, "HADOOPDATA.sampleDataxx", connectionProperties)
  69. }
  70.  
  71. spark.stop
  72. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement