Advertisement
Guest User

Untitled

a guest
May 1st, 2017
127
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.54 KB | None | 0 0
  1. public static void main(String[] args) throws Exception {
  2. DataGetter dg = new DataGetter();
  3.  
  4. final int maxEventDelay = 60; // events are out of order by max 60 seconds
  5. final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
  6.  
  7. // set up streaming execution environment
  8. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  9. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  10.  
  11.  
  12. env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class);
  13.  
  14.  
  15. // start the data generator
  16. DataStream<DataContainer> datacontainers = env.addSource(
  17. new DataContainerSource(maxEventDelay, servingSpeedFactor));
  18.  
  19. // print the filtered stream
  20. //filteredMeasurements.print();
  21.  
  22.  
  23. DataStream<DataContainer> average = datacontainers
  24. .keyBy("eventTime")
  25. //.timeWindow(minutes(1), Time.seconds(30)) // sliding time window of 1 minute length and 30 secs trigger interval
  26. .timeWindow(minutes(30)) // exactly 1 minute no slider
  27. .allowedLateness(Time.minutes(5))
  28. .apply(new GenericWindowFunction());
  29.  
  30. average.print();
  31. env.execute("Executing");
  32.  
  33. __________________
  34. Adding items to prioQueue;
  35. QUEUE Adding DataContainer: 2016-10-14 09:00:00
  36. QUEUE Adding DataContainer: 2016-10-14 09:30:00
  37. QUEUE Adding DataContainer: 2016-10-14 16:00:00
  38. QUEUE Adding DataContainer: 2016-10-14 17:00:00
  39. QUEUE Adding DataContainer: 2016-10-14 17:30:00
  40. QUEUE Adding DataContainer: 2016-10-15 13:30:00
  41. QUEUE Adding DataContainer: 2016-10-15 14:00:00
  42. QUEUE Adding DataContainer: 2016-10-15 14:30:00
  43.  
  44. Output from flink
  45.  
  46. 8> measurements/-internal-measurements-20161019-0530-.txt Files: 1 1476855000000 printCount: 0
  47. 4> measurements/-internal-measurements-20161014-0900-.txt Files: 1 1476435600000 printCount: 1
  48. 6> measurements/-internal-measurements-20161015-1530-.txt Files: 1 1476545400000 printCount: 2
  49. 6> measurements/-internal-measurements-20161020-1500-.txt Files: 1 1476975600000 printCount: 3
  50. 6> measurements/-internal-measurements-20161020-1800-.txt Files: 1 1476986400000 printCount: 4
  51. 6> measurements/-internal-measurements-20161021-1330-.txt Files: 1 1477056600000 printCount: 5
  52. 6> measurements/-internal-measurements-20161022-0700-.txt Files: 1 1477119600000 printCount: 6
  53. 6> measurements/-internal-measurements-20161022-1500-.txt Files: 1 1477148400000 printCount: 7
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement