Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.flink.api.common.ExecutionConfig;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.java.tuple.Tuple;
- import org.apache.flink.api.java.tuple.Tuple1;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.source.SourceFunction;
- import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
- import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
- import org.apache.flink.streaming.api.windowing.triggers.Trigger;
- import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
- import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
- import org.apache.flink.streaming.api.windowing.windows.Window;
- import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
- import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
- import org.apache.flink.util.Collector;
- import org.apache.log4j.ConsoleAppender;
- import java.util.Properties;
- import java.util.Random;
- public abstract class StreamSkeleton {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- DataStream<Tuple1<Integer>> stream = env.addSource(new SourceFunction<Tuple1<Integer>>() {
- private volatile boolean running = true;
- private Random rand = new Random();
- @Override
- public void run(SourceContext <Tuple1<Integer>> ctx) throws Exception {
- while(running) {
- int i = rand.nextInt(100);
- ctx.collect(new Tuple1<>(i));
- }
- }
- @Override
- public void cancel() {
- running = false;
- }
- });
- stream.map(new MapFunction<Tuple1<Integer>, Tuple1<Integer>>() {
- @Override
- public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
- return new Tuple1<Integer>(1);
- }
- }).keyBy(0).sum(0).print();
- stream.keyBy(0)
- .window(GlobalWindows.create())
- .trigger(new Trigger<Tuple1<Integer>, GlobalWindow>() {
- long oldTimestamp = 0;
- @Override
- public TriggerResult onElement(Tuple1<Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
- long newTimestamp = System.currentTimeMillis() + 1000;
- ctx.deleteProcessingTimeTimer(oldTimestamp);
- ctx.registerProcessingTimeTimer(newTimestamp);
- oldTimestamp = newTimestamp;
- return TriggerResult.CONTINUE;
- }
- @Override
- public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
- return TriggerResult.FIRE;
- }
- @Override
- public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
- return null;
- }
- })
- .sum(0);
- .print();
- env.execute(); //Start processing
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement