Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // set up the streaming execution environment
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- val mapper = new ObjectMapper()
- case class Count (id: Int, count: Int)
- val file = env.readTextFile("C:/Users/Boris/Documents/flink-scala-project/git.gz").map(x => {
- val reader = mapper.readTree(x)
- Tuple6(reader.get("id").asInt(), reader.get("type").textValue(), reader.get("payload"), reader.get("repo"), reader.get("created_at").textValue(), 1)
- }).filter(x => {
- x._2 == "PushEvent" || x._2 == "IssuesEvent" || x._2 == "PullRequestEvent"
- })
- file.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(Int, String, JsonNode, JsonNode, String, Int)](Time.seconds(10)) {
- override def extractTimestamp(t: (Int, String, JsonNode, JsonNode, String, Int)): Long = {
- Instant.parse(t._5).getEpochSecond
- }
- }).filter(x => x._2 == "PushEvent")
- .map(x => (x._4.get("name").textValue(), x._3.get("size").asInt()))
- .keyBy(0)
- .window(TumblingEventTimeWindows.of(Time.days(1)))
- .sum(1)
- .print()
- // execute program
- env.execute("Flink Streaming Scala API Skeleton")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement