Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class MyFlinkJDBCJob {
- public static void main(String[] args) throws Exception {
- FlinkKafkaConsumer<JsonObject> flinkKafkaConsumer = KafkaConsumerFactory.createVertxJsonConsumer(kafkaTopic, kafkaBoostrapServers, kafkaConsumerGroup);
- flinkKafkaConsumer.setStartFromLatest();
- JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
- .setDrivername("...")
- .setDBUrl("...")
- .setUsername("...")
- .setPassword("...")
- .setQuery("INSERT INTO flink_test (foo, bar, baz) VALUES (?, ?, ?)")
- .setParameterTypes(STRING_TYPE_INFO,STRING_TYPE_INFO,STRING_TYPE_INFO)
- .build();
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRestartStrategy(RestartStrategies.fixedDelayRestart(jobRestartAttempts, Time.of(jobRestartDelay, TimeUnit.valueOf(jobRestartTimeUnit))));
- final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
- tableEnv.registerTableSink(
- "FlinkTest",
- // specify table schema
- new String[]{"foo", "bar", "baz"},
- new TypeInformation[]{Types.STRING, Types.STRING, Types.STRING},
- sink);
- DataStream<MobileOriginated> moStream = env.addSource(flinkKafkaConsumer)
- .setParallelism(kafkaParallelism)
- .name(kafkaTopic)
- .flatMap(new MapToMyPojo());
- Table table = tableEnv.fromDataStream(moStream);
- // This doesn't seem to work
- table.insertInto("FlinkTest");
- DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
- // This prints out ok.
- dsRow.print("row");
- env.execute(jobName);
- }
- public static class MapToMyPojo extends RichFlatMapFunction<JsonObject, MobileOriginated> {
- private static Logger logger = LoggerFactory.getLogger(MapToMobileOriginated.class);
- @Override
- public void flatMap(JsonObject value, Collector<MobileOriginated> out) {
- try {
- JsonObject record = value.getJsonObject("record");
- MapToMyPojo myPojo = new MapToMyPojo();
- mo.setFoo(value.getString("foo"));
- mo.setBar(value.getString("bar"));
- mo.setBaz(record.getString("baz"));
- out.collect(myPojo);
- } catch (Exception ex) {
- // If we are here it's because the Json is bad.
- logger.warn("Invalid Json record. Skipping it.", ex);
- }
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement