Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- public static void main(String[] args) throws Exception {
- DataGetter dg = new DataGetter();
- final int maxEventDelay = 60; // events are out of order by max 60 seconds
- final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
- // set up streaming execution environment
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
- // start the data generator
- DataStream<DataContainer> datacontainers = env.addSource(
- new DataContainerSource(maxEventDelay, servingSpeedFactor));
- // print the filtered stream
- //filteredMeasurements.print();
- DataStream<DataContainer> average = datacontainers
- .keyBy("eventTime")
- //.timeWindow(minutes(1), Time.seconds(30)) // sliding time window of 1 minute length and 30 secs trigger interval
- .timeWindow(minutes(30)) // exactly 1 minute no slider
- .allowedLateness(Time.minutes(5))
- .apply(new GenericWindowFunction());
- average.print();
- env.execute("Executing");
- __________________
- Adding items to prioQueue;
- QUEUE Adding DataContainer: 2016-10-14 09:00:00
- QUEUE Adding DataContainer: 2016-10-14 09:30:00
- QUEUE Adding DataContainer: 2016-10-14 16:00:00
- QUEUE Adding DataContainer: 2016-10-14 17:00:00
- QUEUE Adding DataContainer: 2016-10-14 17:30:00
- QUEUE Adding DataContainer: 2016-10-15 13:30:00
- QUEUE Adding DataContainer: 2016-10-15 14:00:00
- QUEUE Adding DataContainer: 2016-10-15 14:30:00
- Output from flink
- 8> measurements/-internal-measurements-20161019-0530-.txt Files: 1 1476855000000 printCount: 0
- 4> measurements/-internal-measurements-20161014-0900-.txt Files: 1 1476435600000 printCount: 1
- 6> measurements/-internal-measurements-20161015-1530-.txt Files: 1 1476545400000 printCount: 2
- 6> measurements/-internal-measurements-20161020-1500-.txt Files: 1 1476975600000 printCount: 3
- 6> measurements/-internal-measurements-20161020-1800-.txt Files: 1 1476986400000 printCount: 4
- 6> measurements/-internal-measurements-20161021-1330-.txt Files: 1 1477056600000 printCount: 5
- 6> measurements/-internal-measurements-20161022-0700-.txt Files: 1 1477119600000 printCount: 6
- 6> measurements/-internal-measurements-20161022-1500-.txt Files: 1 1477148400000 printCount: 7
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement