Guest User

Untitled

a guest
Jan 10th, 2016
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.34 KB | None | 0 0
  1. // Imports
  2. import org.apache.spark.sql.hive.HiveContext
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. import org.apache.spark.sql.SaveMode
  5. import scala.concurrent.ExecutionContext.Implicits.global
  6. import java.util.Properties
  7. import scala.concurrent.Future
  8.  
  9. // Set up spark on local with 2 threads
  10. val conf = new SparkConf().setMaster("local[2]").setAppName("app").set("spark.sql.parquet.cacheMetadata", "false")
  11. val sc = new SparkContext(conf)
  12. val sqlCtx = new HiveContext(sc)
  13. sqlCtx.setConf("spark.sql.parquet.cacheMetadata", "false")
  14.  
  15. // Create fake dataframe
  16. import sqlCtx.implicits._
  17. var df = sc.parallelize(1 to 50000).map { i => (i, i, i, i, i, i, i) }.toDF("a", "b", "c", "d", "e", "f", "g").repartition(2)
  18. // Write it as a parquet file
  19. df.write.parquet("/tmp/parquet1")
  20. df = sqlCtx.read.parquet("/tmp/parquet1")
  21.  
  22. // JDBC connection
  23. val url = s"jdbc:postgresql://localhost:5432/temple"
  24. val prop = new Properties()
  25. prop.setProperty("user", "admin")
  26. prop.setProperty("password", "")
  27.  
  28. // 4 futures - at least one of them has been consistently failing for
  29. val x1 = Future { df.write.mode(SaveMode.Overwrite).jdbc(url, "tmp5", prop)}
  30. val x2 = Future { df.write.mode(SaveMode.Overwrite).jdbc(url, "tmp5", prop)}
  31. val x3 = Future { df.write.mode(SaveMode.Overwrite).jdbc(url, "tmp5", prop)}
  32. val x4 = Future { df.write.mode(SaveMode.Overwrite).jdbc(url, "tmp5", prop)}
Add Comment
Please, Sign In to add comment