Advertisement
Guest User

Untitled

a guest
Apr 28th, 2016
51
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.50 KB | None | 0 0
  1.  
  2. import org.apache.flink.api.common.ExecutionConfig;
  3. import org.apache.flink.api.common.functions.MapFunction;
  4. import org.apache.flink.api.java.tuple.Tuple;
  5. import org.apache.flink.api.java.tuple.Tuple1;
  6. import org.apache.flink.streaming.api.TimeCharacteristic;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  10. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  11. import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
  12. import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
  13. import org.apache.flink.streaming.api.windowing.triggers.Trigger;
  14. import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
  15. import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
  16. import org.apache.flink.streaming.api.windowing.windows.Window;
  17. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
  18. import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
  19. import org.apache.flink.util.Collector;
  20. import org.apache.log4j.ConsoleAppender;
  21.  
  22. import java.util.Properties;
  23. import java.util.Random;
  24.  
  25. public abstract class StreamSkeleton {
  26.  
  27.  
  28. public static void main(String[] args) throws Exception {
  29.  
  30. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  31.  
  32. DataStream<Tuple1<Integer>> stream = env.addSource(new SourceFunction<Tuple1<Integer>>() {
  33. private volatile boolean running = true;
  34.  
  35. private Random rand = new Random();
  36.  
  37. @Override
  38. public void run(SourceContext <Tuple1<Integer>> ctx) throws Exception {
  39. while(running) {
  40. int i = rand.nextInt(100);
  41. ctx.collect(new Tuple1<>(i));
  42. }
  43. }
  44.  
  45. @Override
  46. public void cancel() {
  47. running = false;
  48. }
  49. });
  50.  
  51. stream.map(new MapFunction<Tuple1<Integer>, Tuple1<Integer>>() {
  52. @Override
  53. public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
  54. return new Tuple1<Integer>(1);
  55. }
  56. }).keyBy(0).sum(0).print();
  57.  
  58. stream.keyBy(0)
  59. .window(GlobalWindows.create())
  60. .trigger(new Trigger<Tuple1<Integer>, GlobalWindow>() {
  61.  
  62. long oldTimestamp = 0;
  63.  
  64. @Override
  65. public TriggerResult onElement(Tuple1<Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
  66. long newTimestamp = System.currentTimeMillis() + 1000;
  67. ctx.deleteProcessingTimeTimer(oldTimestamp);
  68. ctx.registerProcessingTimeTimer(newTimestamp);
  69. oldTimestamp = newTimestamp;
  70. return TriggerResult.CONTINUE;
  71. }
  72.  
  73. @Override
  74. public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
  75. return TriggerResult.FIRE;
  76. }
  77.  
  78. @Override
  79. public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
  80. return null;
  81. }
  82. })
  83. .sum(0);
  84. .print();
  85. env.execute(); //Start processing
  86. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement