Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.flink.api.common.io.RichInputFormat
- import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
- import org.apache.flink.api.java.typeutils.ResultTypeQueryable
- import org.apache.flink.api.scala._
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo
- import org.apache.flink.api.table.Row
- import org.apache.flink.api.table.typeutils.RowTypeInfo
- import org.apache.flink.core.io.InputSplit
- /**
- * Created by ms on 21/06/16.
- */
- object JDBCIssue {
- val DB_DRIVER = "org.postgresql.Driver"
- val DB_URI = "jdbc:postgresql://localhost:5432/postgres"
- val DB_USER = "postgres"
- val DB_PASS = ""
- val DB_QUERY = "SELECT id FROM events LIMIT 10;"
- val DB_ROWTYPE = new RowTypeInfo(
- Seq(BasicTypeInfo.INT_TYPE_INFO),
- Seq("id"))
- def jdbcIssue = JDBCInputFormat.buildJDBCInputFormat()
- .setDrivername(DB_DRIVER)
- .setDBUrl(DB_URI)
- .setUsername(DB_USER)
- .setPassword(DB_PASS)
- .setQuery(DB_QUERY)
- .setRowTypeInfo(DB_ROWTYPE)
- .finish()
- class MyRow extends Row(1)
- def jdbcNoIssue =
- jdbcIssue
- .asInstanceOf[RichInputFormat[MyRow, InputSplit] with ResultTypeQueryable[Row]]
- def main(args: Array[String]): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- env.createInput(jdbcIssue).print()
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement