Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- RestartSource
- .withBackoff(
- minBackoff = 3.seconds,
- maxBackoff = 30.seconds,
- randomFactor = 0.2,
- maxRestarts = 20
- ) { () =>
- source
- }
- .log(s"INCOMING")
- .withAttributes(Attributes
- .logLevels(onFinish = Logging.InfoLevel, onFailure = Logging.InfoLevel, onElement = Logging.InfoLevel))
- .mapAsync(1) { cm =>
- val json: JsObject = Json.parse(cm.message.bytes.utf8String).as[JsObject]
- logger.debug(s"Original payload=$json")
- val enrichedJson = json ++ Json.obj(
- "timestamp" -> ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).toString.replace("[UTC]", ""))
- logger.debug(s"Enriched payload=$enrichedJson")
- publishToKafka(sink, enrichedJson).map(_ => cm)
- }
- .
- .mapAsync(1) { _.ack() }
- .runWith(Sink.ignore)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement