Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import java.io.File
- import org.apache.spark.sql.SaveMode
- case class FooBar(foo: Option[String], bar: Option[Int], baz: Option[Long], dou: Option[Double])
- object MyThing extends App {
- val spark = createSparkSession ... from spark configuration
- import spark.implicits._
- val dataPath = "/my/path/to/file"
- val df = spark.read
- .option("header", true)
- .option("inferSchema", true)
- .option("delimiter", ";")
- .csv(dataPath)
- df.printSchema
- //df.show
- // check that driver is available
- Class.forName("oracle.jdbc.driver.OracleDriver")
- val jdbcHostname = "host.foo.com"
- val jdbcPort = 1522
- val jdbcDatabase = "dbname"
- val jdbcUrl = s"jdbc:oracle:thin:@${jdbcHostname}:${jdbcPort}:${jdbcDatabase}"
- print(s"using JDBC URL: ${jdbcUrl}")
- val connectionProperties = new java.util.Properties()
- connectionProperties.setProperty("user", "username")
- connectionProperties.setProperty("password", "password")
- // find columns where the names are too long for oracle
- df.columns.map(c => (c, c.length)).filter(_._2 > 25).foreach(println)
- df
- .write
- .mode(SaveMode.Overwrite)
- .jdbc(jdbcUrl, "schema.fulltablename", connectionProperties)
- val minimal = Seq(
- FooBar(Some("first"), Some(1), Some(1L), Some(1.0)),
- FooBar(Some("second"), Some(2), Some(2L), Some(2.0))
- ).toDS
- val minimalIntLong = Seq(
- FooBar(Some("first"), Some(1), Some(1), Some(1)),
- FooBar(Some("second"), Some(2), Some(2), Some(2))
- ).toDS
- val minimalNulls = Seq(
- FooBar(Some("first"), null, null, null),
- FooBar(Some("second"), Some(2), null, Some(2.0))
- ).toDS
- val minimalNullsNone = Seq(FooBar(None, None, None, None)).toDS
- val minimalIntLongNulls = Seq(
- FooBar(Some("first"), Some(1), null, null),
- FooBar(Some("second"), Some(2), Some(2), null)
- ).toDS
- minimal.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, "schema.fulltablename1", connectionProperties)
- minimalIntLong.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, "schema.fulltablename2", connectionProperties)
- minimalNulls.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, "schema.fulltablename3", connectionProperties)
- minimalNullsNone.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, "schema.fulltablename4", connectionProperties)
- minimalIntLongNulls.write.mode(SaveMode.Overwrite).jdbc(jdbcUrl, "schema.fulltablename5", connectionProperties)
- // now try to find the problematic column
- for (col <- df.columns) {
- println(s"using column: ${col}")
- df.select(col).write
- .mode(SaveMode.Overwrite)
- .jdbc(jdbcUrl, "HADOOPDATA.sampleDataxx", connectionProperties)
- }
- spark.stop
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement