Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- ## Test Case
- @Test
- def userSource(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val userEvents:DataStream[UserEvent] = env.fromCollection(Seq(
- UserEvent(id = "customer-1", lat = 1.0 , lng = 0.0 , ts = "2018-10-16T00:00:00.01Z"),
- UserEvent(id = "customer-1", lat = 2.0 , lng = 0.0 , ts = "2018-10-16T03:00:00.01Z"),
- UserEvent(id = "customer-1", lat = 3.0 , lng = 0.0, ts = "2018-10-16T04:00:00.01Z"),
- UserEvent(id = "customer-1", lat = 4.0 , lng = 0.0 , ts = "2018-10-16T06:00:00.01Z")
- )).assignTimestampsAndWatermarks(EventMessage.TimestampExtractor)
- val activeUsers = userEvents.keyBy(_.id).process(new TimeTriggerProc())
- activeUsers.print()
- env.execute()
- val output = Seq(("customer-1", "2018-09-11T00:00:00.01Z"), ("customer-1", "2018-09-11T12:00:00.01Z"))
- }
- ## Process Function
- import org.apache.flink.streaming.api.functions.KeyedProcessFunction
- import org.apache.flink.streaming.api.functions.ProcessFunction._
- import org.apache.flink.util.Collector
- import com.zoi.iot.models.{SchedulerEvent, UserEvent}
- import org.apache.flink.api.common.state.ValueStateDescriptor
- import org.apache.flink.api.common.time.Time
- import java.time.Instant
- case class UserWithTimestamp(lastModified: Long, userMeta: UserEvent)
- class TimeTriggerProc extends KeyedProcessFunction[String, UserEvent, (String, UserEvent)] {
- private lazy val usersState = getRuntimeContext.getState(new ValueStateDescriptor("usersState", classOf[UserWithTimestamp]))
- private lazy val rollingThresholdTime = getRuntimeContext.getState(new ValueStateDescriptor("thresholdTime", classOf[Long]))
- private val fourHours = Time.hours(4).toMilliseconds
- def updateState(value: UserEvent, eventTime: Long) = {
- if (usersState.value != null) {
- }
- else{
- usersState.update(UserWithTimestamp(eventTime, value))
- rollingThresholdTime.update(eventTime + fourHours)
- }
- }
- override def processElement(value: UserEvent, ctx: KeyedProcessFunction[String, UserEvent, (String, UserEvent)]#Context,
- out: Collector[(String, UserEvent)]): Unit = {
- updateState(value, ctx.timestamp)
- val eventTime = ctx.timestamp
- if(eventTime < rollingThresholdTime.value){
- usersState.update(UserWithTimestamp(eventTime, value))
- }else{
- rollingThresholdTime.update(rollingThresholdTime.value + fourHours)
- }
- ctx.timerService.registerEventTimeTimer(rollingThresholdTime.value)
- }
- override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, UserEvent, (String, UserEvent)]#OnTimerContext,
- out: Collector[(String, UserEvent)]): Unit = {
- out.collect((timestamp.toString, usersState.value.userMeta) )
- ctx.timerService.registerEventTimeTimer(timestamp + fourHours)
- }
- }
Add Comment
Please, Sign In to add comment