Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- final DataStream<ObjectNode> inputStream = env
- .addSource(new RMQSource<ObjectNode>(
- connectionConfig, // config for the RabbitMQ connection
- "start", // name of the RabbitMQ queue to consume
- true, // use correlation ids; can be false if only at-least-once is required
- new JSONDeserializationSchema())) // deserialization schema to turn messages into Java objects
- .setParallelism(1); // non-parallel source is only required for exactly-once
- stream.addSink(new RMQSink<ObjectNode>(
- connectionConfig,
- "stop",
- new JSONSerializationSchema()
- ));
Add Comment
Please, Sign In to add comment