Advertisement
Guest User

Untitled

a guest
Jun 22nd, 2016
119
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.25 KB | None | 0 0
  1. import org.apache.flink.api.common.io.RichInputFormat
  2. import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
  3. import org.apache.flink.api.java.typeutils.ResultTypeQueryable
  4. import org.apache.flink.api.scala._
  5. import org.apache.flink.api.common.typeinfo.BasicTypeInfo
  6. import org.apache.flink.api.table.Row
  7. import org.apache.flink.api.table.typeutils.RowTypeInfo
  8. import org.apache.flink.core.io.InputSplit
  9.  
  10. /**
  11. * Created by ms on 21/06/16.
  12. */
  13. object JDBCIssue {
  14. val DB_DRIVER = "org.postgresql.Driver"
  15. val DB_URI = "jdbc:postgresql://localhost:5432/postgres"
  16. val DB_USER = "postgres"
  17. val DB_PASS = ""
  18. val DB_QUERY = "SELECT id FROM events LIMIT 10;"
  19. val DB_ROWTYPE = new RowTypeInfo(
  20. Seq(BasicTypeInfo.INT_TYPE_INFO),
  21. Seq("id"))
  22.  
  23. def jdbcIssue = JDBCInputFormat.buildJDBCInputFormat()
  24. .setDrivername(DB_DRIVER)
  25. .setDBUrl(DB_URI)
  26. .setUsername(DB_USER)
  27. .setPassword(DB_PASS)
  28. .setQuery(DB_QUERY)
  29. .setRowTypeInfo(DB_ROWTYPE)
  30. .finish()
  31.  
  32.  
  33. class MyRow extends Row(1)
  34.  
  35. def jdbcNoIssue =
  36. jdbcIssue
  37. .asInstanceOf[RichInputFormat[MyRow, InputSplit] with ResultTypeQueryable[Row]]
  38.  
  39. def main(args: Array[String]): Unit = {
  40. val env = ExecutionEnvironment.getExecutionEnvironment
  41. env.createInput(jdbcIssue).print()
  42. }
  43.  
  44. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement