Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public class Example {
- public static void main(String... args){
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStreamSource<Tuple2> source = env.addSource(new SourceFunction<Tuple2>() {
- private static final long serialVersionUID = 1L;
- private volatile boolean running = true;
- @Override
- public void run(SourceContext<Tuple2> ctx) throws Exception {
- for (int i = 0; i < 20 && running; i++) {
- Tuple2 t = new Tuple2<>();
- t.f0 = i;
- t.f1 = "message #" + i;
- ctx.collect(t);
- }
- }
- @Override
- public void cancel() {
- running = false;
- }
- });
- JedisPoolConfig.Builder builder = new JedisPoolConfig.Builder();
- source.addSink(new RedisSink1<>(builder.build(), new RedisSinkMapper<Tuple2>() {
- @Override
- public RedisDataTypeDescription getDataTypeDescription() {
- return new RedisDataTypeDescriptionSink(RedisDataType.HASH, "database");
- }
- @Override
- public String getKeyFromTuple(Tuple2 tuple) {
- return String.valueOf(tuple.f0);
- }
- @Override
- public String getValueFromTuple(Tuple2 tuple) {
- return String.valueOf(tuple.f1);
- }
- }));
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement