Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(5);
- ParameterTool parameterTool = ParameterTool.fromArgs(args);
- DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<>(parameterTool.getRequired("topic"),
- new SimpleStringSchema(), parameterTool.getProperties()));
- messageStream.map(new MapFunction<String, String>() {
- @Override
- public String map(String value) throws Exception {
- //--- do something --//
- return "KafkaJob output: " + outputBuilder.toString();
- }
- })
- .rebalance().print();
Advertisement
Add Comment
Please, Sign In to add comment