SHARE
TWEET

Untitled

a guest Jun 17th, 2019 87 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. dstream.foreachRDD { rdd =>
  2.   rdd.foreachPartition { partitionOfRecords =>
  3.     // ConnectionPool is a static, lazily initialized pool of connections
  4.     val connection = ConnectionPool.getConnection()
  5.     partitionOfRecords.foreach(record => connection.send(record))
  6.     ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  7.   }
  8. }
  9.      
  10. def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
  11.         val sb = mutable.StringBuilder.newBuilder
  12.         val now = System.currentTimeMillis()
  13.  
  14.         rdd.collect().foreach(itr => {
  15.             itr.foreach(_.createCSV(sb, now).append("n"))
  16.         })
  17.  
  18.         copyIn("myTable",  new StringReader(sb.toString), "statement")
  19.         sb.clear
  20.     }
  21.  
  22.  
  23. def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
  24.         val conn = connectionPool.getConnection()
  25.         try {
  26.             conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
  27.         } catch {
  28.             case se: SQLException => logWarning(se.getMessage)
  29.             case t: Throwable => logWarning(t.getMessage)
  30.         } finally {
  31.             conn.close()
  32.         }
  33.     }
  34.      
  35. jdbcDF2.write.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
  36.           properties={"user": "username", "password": "password"})
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top