Guest User

Untitled

a guest
Oct 16th, 2018
101
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.78 KB | None | 0 0
  1. ## Test Case
  2.  
  3. @Test
  4. def userSource(): Unit = {
  5. val env = StreamExecutionEnvironment.getExecutionEnvironment
  6.  
  7. val userEvents:DataStream[UserEvent] = env.fromCollection(Seq(
  8. UserEvent(id = "customer-1", lat = 1.0 , lng = 0.0 , ts = "2018-10-16T00:00:00.01Z"),
  9. UserEvent(id = "customer-1", lat = 2.0 , lng = 0.0 , ts = "2018-10-16T03:00:00.01Z"),
  10. UserEvent(id = "customer-1", lat = 3.0 , lng = 0.0, ts = "2018-10-16T04:00:00.01Z"),
  11. UserEvent(id = "customer-1", lat = 4.0 , lng = 0.0 , ts = "2018-10-16T06:00:00.01Z")
  12. )).assignTimestampsAndWatermarks(EventMessage.TimestampExtractor)
  13.  
  14. val activeUsers = userEvents.keyBy(_.id).process(new TimeTriggerProc())
  15.  
  16. activeUsers.print()
  17. env.execute()
  18.  
  19. val output = Seq(("customer-1", "2018-09-11T00:00:00.01Z"), ("customer-1", "2018-09-11T12:00:00.01Z"))
  20.  
  21. }
  22.  
  23.  
  24. ## Process Function
  25.  
  26. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  27. import org.apache.flink.streaming.api.functions.ProcessFunction._
  28. import org.apache.flink.util.Collector
  29. import com.zoi.iot.models.{SchedulerEvent, UserEvent}
  30. import org.apache.flink.api.common.state.ValueStateDescriptor
  31. import org.apache.flink.api.common.time.Time
  32. import java.time.Instant
  33.  
  34.  
  35. case class UserWithTimestamp(lastModified: Long, userMeta: UserEvent)
  36. class TimeTriggerProc extends KeyedProcessFunction[String, UserEvent, (String, UserEvent)] {
  37.  
  38. private lazy val usersState = getRuntimeContext.getState(new ValueStateDescriptor("usersState", classOf[UserWithTimestamp]))
  39. private lazy val rollingThresholdTime = getRuntimeContext.getState(new ValueStateDescriptor("thresholdTime", classOf[Long]))
  40. private val fourHours = Time.hours(4).toMilliseconds
  41.  
  42.  
  43. def updateState(value: UserEvent, eventTime: Long) = {
  44. if (usersState.value != null) {
  45.  
  46. }
  47. else{
  48. usersState.update(UserWithTimestamp(eventTime, value))
  49. rollingThresholdTime.update(eventTime + fourHours)
  50. }
  51.  
  52. }
  53.  
  54.  
  55. override def processElement(value: UserEvent, ctx: KeyedProcessFunction[String, UserEvent, (String, UserEvent)]#Context,
  56. out: Collector[(String, UserEvent)]): Unit = {
  57.  
  58. updateState(value, ctx.timestamp)
  59. val eventTime = ctx.timestamp
  60.  
  61. if(eventTime < rollingThresholdTime.value){
  62. usersState.update(UserWithTimestamp(eventTime, value))
  63. }else{
  64. rollingThresholdTime.update(rollingThresholdTime.value + fourHours)
  65. }
  66.  
  67. ctx.timerService.registerEventTimeTimer(rollingThresholdTime.value)
  68. }
  69.  
  70. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, UserEvent, (String, UserEvent)]#OnTimerContext,
  71. out: Collector[(String, UserEvent)]): Unit = {
  72.  
  73. out.collect((timestamp.toString, usersState.value.userMeta) )
  74. ctx.timerService.registerEventTimeTimer(timestamp + fourHours)
  75. }
  76.  
  77. }
Add Comment
Please, Sign In to add comment