Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // Imports
- import org.apache.spark.sql.hive.HiveContext
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.sql.SaveMode
- import scala.concurrent.ExecutionContext.Implicits.global
- import java.util.Properties
- import scala.concurrent.Future
- // Set up spark on local with 2 threads
- val conf = new SparkConf().setMaster("local[2]").setAppName("app").set("spark.sql.parquet.cacheMetadata", "false")
- val sc = new SparkContext(conf)
- val sqlCtx = new HiveContext(sc)
- sqlCtx.setConf("spark.sql.parquet.cacheMetadata", "false")
- // Create fake dataframe
- import sqlCtx.implicits._
- var df = sc.parallelize(1 to 50000).map { i => (i, i, i, i, i, i, i) }.toDF("a", "b", "c", "d", "e", "f", "g").repartition(2)
- // Write it as a parquet file
- df.write.parquet("/tmp/parquet1")
- df = sqlCtx.read.parquet("/tmp/parquet1")
- // JDBC connection
- val url = s"jdbc:postgresql://localhost:5432/temple"
- val prop = new Properties()
- prop.setProperty("user", "admin")
- prop.setProperty("password", "")
- // 4 futures - at least one of them has been consistently failing for
- val x1 = Future { df.write.mode(SaveMode.Overwrite).jdbc(url, "tmp5", prop)}
- val x2 = Future { df.write.mode(SaveMode.Overwrite).jdbc(url, "tmp5", prop)}
- val x3 = Future { df.write.mode(SaveMode.Overwrite).jdbc(url, "tmp5", prop)}
- val x4 = Future { df.write.mode(SaveMode.Overwrite).jdbc(url, "tmp5", prop)}
Add Comment
Please, Sign In to add comment