Guest User

Untitled

a guest
Dec 21st, 2017
88
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.60 KB | None | 0 0
  1. // Imports
  2. import org.apache.spark.sql.hive.HiveContext
  3. import org.apache.spark.{SparkConf, SparkContext}
  4. import org.apache.spark.sql.SaveMode
  5. import scala.concurrent.ExecutionContext.Implicits.global
  6. import java.util.Properties
  7. import scala.concurrent.Future
  8.  
  9. // Set up spark on local with 2 threads
  10. val conf = new SparkConf().setMaster("local[2]").setAppName("app")
  11. val sc = new SparkContext(conf)
  12. val sqlCtx = new HiveContext(sc)
  13.  
  14. // Create fake dataframe
  15. import sqlCtx.implicits._
  16. var df = sc.parallelize(1 to 50000).map { i => (i, i, i, i, i, i, i) }.toDF("a", "b", "c", "d", "e", "f", "g").repartition(2)
  17. // Write it as a parquet file
  18. df.write.parquet("/tmp/parquet1")
  19. df = sqlCtx.read.parquet("/tmp/parquet1")
  20.  
  21. // JDBC connection
  22. val url = s"jdbc:postgresql://localhost:5432/tempdb"
  23. val prop = new Properties()
  24. prop.setProperty("user", "admin")
  25. prop.setProperty("password", "")
  26.  
  27. // 4 futures - at least one of them has been consistently failing for
  28. val x1 = Future { df.write.jdbc(url, "temp1", prop) }
  29. val x2 = Future { df.write.jdbc(url, "temp2", prop) }
  30. val x3 = Future { df.write.jdbc(url, "temp3", prop) }
  31. val x4 = Future { df.write.jdbc(url, "temp4", prop) }
  32.  
  33. org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
  34. at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
  35. at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1482) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
  36. at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:247) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
  37. at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:306) ~[org.apache.spark.spark-sql_2.11-1.6.0.jar:1.6.0]
  38. at writer.SQLWriter$.writeDf(Writer.scala:75) ~[temple.temple-1.0-sans-externalized.jar:na]
  39. at writer.Writer$.writeDf(Writer.scala:33) ~[temple.temple-1.0-sans-externalized.jar:na]
  40. at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:460) ~[temple.temple-1.0-sans-externalized.jar:2.4.6]
  41. at controllers.Api$$anonfun$downloadTable$1$$anonfun$apply$25.apply(Api.scala:452) ~[temple.temple-1.0-sans-externalized.jar:2.4.6]
  42. at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) ~[org.scala-lang.scala-library-2.11.7.jar:na]
  43.  
  44. import java.util.concurrent.Executors
  45. import concurrent.ExecutionContext
  46. val executorService = Executors.newFixedThreadPool(4)
  47. implicit val ec = ExecutionContext.fromExecutorService(executorService)
Add Comment
Please, Sign In to add comment