Advertisement
Guest User

Untitled

a guest
Feb 8th, 2016
49
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.14 KB | None | 0 0
  1. public class Example {
  2.  
  3. public static void main(String... args){
  4. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  5.  
  6. DataStreamSource<Tuple2> source = env.addSource(new SourceFunction<Tuple2>() {
  7. private static final long serialVersionUID = 1L;
  8.  
  9. private volatile boolean running = true;
  10.  
  11. @Override
  12. public void run(SourceContext<Tuple2> ctx) throws Exception {
  13. for (int i = 0; i < 20 && running; i++) {
  14. Tuple2 t = new Tuple2<>();
  15. t.f0 = i;
  16. t.f1 = "message #" + i;
  17. ctx.collect(t);
  18. }
  19. }
  20.  
  21. @Override
  22. public void cancel() {
  23. running = false;
  24. }
  25. });
  26. JedisPoolConfig.Builder builder = new JedisPoolConfig.Builder();
  27. source.addSink(new RedisSink1<>(builder.build(), new RedisSinkMapper<Tuple2>() {
  28. @Override
  29. public RedisDataTypeDescription getDataTypeDescription() {
  30. return new RedisDataTypeDescriptionSink(RedisDataType.HASH, "database");
  31. }
  32.  
  33. @Override
  34. public String getKeyFromTuple(Tuple2 tuple) {
  35. return String.valueOf(tuple.f0);
  36. }
  37.  
  38. @Override
  39. public String getValueFromTuple(Tuple2 tuple) {
  40. return String.valueOf(tuple.f1);
  41. }
  42. }));
  43. }
  44. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement