Advertisement
Guest User

Untitled

a guest
Nov 20th, 2018
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 0.59 KB | None | 0 0
  1. Source
  2.           .single(enrichedJson)
  3.           .map(_.toString)
  4.           .map(value => new ProducerRecord[String, String](kafkaTopic, value))
  5.           .map(throw new RuntimeException)
  6.           .watchTermination() { (_, done) =>
  7.             done.onComplete {
  8.               case Success(_) => logger.debug("Payload successfully streamed to Kafka sink")
  9.               case Failure(e) =>
  10.                 logger.error(s"Something went wrong while streaming payload to Kafka sink: ${e.toString}")
  11.             }
  12.           }
  13.           .runWith(kafkaSink)
  14.           .map(_ => committableMessage)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement