Guest User

Untitled

a guest
Jun 25th, 2018
71
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.75 KB | None | 0 0
  1. import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
  2. import org.apache.flink.util.Collector
  3. import scala.collection.JavaConversions._
  4.  
  5. class BroadcastProcess extends BroadcastProcessFunction[Int, Int, Int] {
  6. override def processElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#ReadOnlyContext, out: Collector[Int]) = {
  7. val currentBroadcastState = ctx.getBroadcastState(State.mapState).immutableEntries()
  8. if (currentBroadcastState.isEmpty) {
  9. out.collect(value)
  10. } else {
  11. out.collect(currentBroadcastState.last.getValue * value)
  12. }
  13. }
  14.  
  15. override def processBroadcastElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#Context, out: Collector[Int]) = {
  16. // Keep only last state
  17. ctx.getBroadcastState(State.mapState).clear()
  18. // Add state
  19. ctx.getBroadcastState(State.mapState).put("key", value)
  20. }
  21. }
  22.  
  23. import org.apache.flink.api.common.state.MapStateDescriptor
  24. import org.apache.flink.api.scala._
  25.  
  26. object State {
  27. val mapState: MapStateDescriptor[String, Int] =
  28. new MapStateDescriptor(
  29. "State",
  30. createTypeInformation[String],
  31. createTypeInformation[Int]
  32. )
  33. }
  34.  
  35. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  36. import org.apache.flink.api.scala._
  37.  
  38. object Broadcast {
  39. def main(args: Array[String]): Unit = {
  40. val numberElements = 100
  41. val env = StreamExecutionEnvironment.getExecutionEnvironment
  42. env.setParallelism(1)
  43. val broadcastStream = env.fromElements(2).broadcast(State.mapState)
  44. val input = (1 to numberElements).toList
  45. val inputStream = env.fromCollection(input)
  46. val outputStream = inputStream
  47. .connect(broadcastStream)
  48. .process(new BroadcastProcess())
  49. outputStream.print()
  50. env.execute()
  51. }
  52. }
Add Comment
Please, Sign In to add comment