Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
- import org.apache.flink.util.Collector
- import scala.collection.JavaConversions._
- class BroadcastProcess extends BroadcastProcessFunction[Int, Int, Int] {
- override def processElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#ReadOnlyContext, out: Collector[Int]) = {
- val currentBroadcastState = ctx.getBroadcastState(State.mapState).immutableEntries()
- if (currentBroadcastState.isEmpty) {
- out.collect(value)
- } else {
- out.collect(currentBroadcastState.last.getValue * value)
- }
- }
- override def processBroadcastElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#Context, out: Collector[Int]) = {
- // Keep only last state
- ctx.getBroadcastState(State.mapState).clear()
- // Add state
- ctx.getBroadcastState(State.mapState).put("key", value)
- }
- }
- import org.apache.flink.api.common.state.MapStateDescriptor
- import org.apache.flink.api.scala._
- object State {
- val mapState: MapStateDescriptor[String, Int] =
- new MapStateDescriptor(
- "State",
- createTypeInformation[String],
- createTypeInformation[Int]
- )
- }
- import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
- import org.apache.flink.api.scala._
- object Broadcast {
- def main(args: Array[String]): Unit = {
- val numberElements = 100
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
- val broadcastStream = env.fromElements(2).broadcast(State.mapState)
- val input = (1 to numberElements).toList
- val inputStream = env.fromCollection(input)
- val outputStream = inputStream
- .connect(broadcastStream)
- .process(new BroadcastProcess())
- outputStream.print()
- env.execute()
- }
- }
Add Comment
Please, Sign In to add comment