Advertisement
Guest User

Apache Flink JDBC Sink

a guest
Oct 15th, 2019
193
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 2.65 KB | None | 0 0
  1. public class MyFlinkJDBCJob {
  2.  
  3.     public static void main(String[] args) throws Exception {
  4.         FlinkKafkaConsumer<JsonObject> flinkKafkaConsumer = KafkaConsumerFactory.createVertxJsonConsumer(kafkaTopic, kafkaBoostrapServers, kafkaConsumerGroup);
  5.         flinkKafkaConsumer.setStartFromLatest();
  6.  
  7.         JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
  8.                 .setDrivername("...")
  9.                 .setDBUrl("...")
  10.                 .setUsername("...")
  11.                 .setPassword("...")
  12.                 .setQuery("INSERT INTO flink_test (foo, bar, baz) VALUES (?, ?, ?)")
  13.                 .setParameterTypes(STRING_TYPE_INFO,STRING_TYPE_INFO,STRING_TYPE_INFO)
  14.                 .build();
  15.  
  16.         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17.         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(jobRestartAttempts, Time.of(jobRestartDelay, TimeUnit.valueOf(jobRestartTimeUnit))));
  18.  
  19.         final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  20.         tableEnv.registerTableSink(
  21.                 "FlinkTest",
  22.                 // specify table schema
  23.                 new String[]{"foo", "bar", "baz"},
  24.                 new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING},
  25.                 sink);
  26.  
  27.         DataStream<MobileOriginated> moStream = env.addSource(flinkKafkaConsumer)
  28.                 .setParallelism(kafkaParallelism)
  29.                 .name(kafkaTopic)
  30.                 .flatMap(new MapToMyPojo());
  31.  
  32.         Table table = tableEnv.fromDataStream(moStream);
  33.         // This doesn't seem to work
  34.         table.insertInto("FlinkTest");
  35.  
  36.         DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
  37.         // This prints out ok.
  38.         dsRow.print("row");
  39.  
  40.         env.execute(jobName);
  41.     }
  42.  
  43.     public static class MapToMyPojo extends RichFlatMapFunction<JsonObject, MobileOriginated> {
  44.         private static Logger logger = LoggerFactory.getLogger(MapToMobileOriginated.class);
  45.  
  46.         @Override
  47.         public void flatMap(JsonObject value, Collector<MobileOriginated> out) {
  48.             try {
  49.                 JsonObject record = value.getJsonObject("record");
  50.  
  51.                 MapToMyPojo myPojo = new MapToMyPojo();
  52.                 mo.setFoo(value.getString("foo"));
  53.                 mo.setBar(value.getString("bar"));
  54.                 mo.setBaz(record.getString("baz"));
  55.  
  56.                 out.collect(myPojo);
  57.             } catch (Exception ex) {
  58.                 // If we are here it's because the Json is bad.
  59.                 logger.warn("Invalid Json record. Skipping it.", ex);
  60.             }
  61.         }
  62.     }
  63. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement