Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- Source
- .single(enrichedJson)
- .map(_.toString)
- .map(value => new ProducerRecord[String, String](kafkaTopic, value))
- .map(throw new RuntimeException)
- .watchTermination() { (_, done) =>
- done.onComplete {
- case Success(_) => logger.debug("Payload successfully streamed to Kafka sink")
- case Failure(e) =>
- logger.error(s"Something went wrong while streaming payload to Kafka sink: ${e.toString}")
- }
- }
- .runWith(kafkaSink)
- .map(_ => committableMessage)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement