Advertisement
Guest User

Untitled

a guest
Jan 16th, 2018
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.25 KB | None | 0 0
  1. // set up the streaming execution environment
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment
  3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  4.  
  5. val mapper = new ObjectMapper()
  6. case class Count (id: Int, count: Int)
  7.  
  8. val file = env.readTextFile("C:/Users/Boris/Documents/flink-scala-project/git.gz").map(x => {
  9. val reader = mapper.readTree(x)
  10. Tuple6(reader.get("id").asInt(), reader.get("type").textValue(), reader.get("payload"), reader.get("repo"), reader.get("created_at").textValue(), 1)
  11. }).filter(x => {
  12. x._2 == "PushEvent" || x._2 == "IssuesEvent" || x._2 == "PullRequestEvent"
  13. })
  14.  
  15. file.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Int, String, JsonNode, JsonNode, String, Int)](Time.seconds(10)) {
  16. override def extractTimestamp(t: (Int, String, JsonNode, JsonNode, String, Int)): Long = {
  17. Instant.parse(t._5).getEpochSecond
  18. }
  19. }).filter(x => x._2 == "PushEvent")
  20. .map(x => (x._4.get("name").textValue(), x._3.get("size").asInt()))
  21. .keyBy(0)
  22. .window(TumblingEventTimeWindows.of(Time.days(1)))
  23. .sum(1)
  24. .print()
  25.  
  26.  
  27.  
  28. // execute program
  29. env.execute("Flink Streaming Scala API Skeleton")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement