Advertisement
Guest User

Untitled

a guest
Nov 14th, 2018
102
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 0.86 KB | None | 0 0
  1. RestartSource
  2.       .withBackoff(
  3.         minBackoff = 3.seconds,
  4.         maxBackoff = 30.seconds,
  5.         randomFactor = 0.2,
  6.         maxRestarts = 20
  7.       ) { () =>
  8.         source
  9.       }
  10.      .log(s"INCOMING")
  11.         .withAttributes(Attributes
  12.           .logLevels(onFinish = Logging.InfoLevel, onFailure = Logging.InfoLevel, onElement = Logging.InfoLevel))
  13.       .mapAsync(1) { cm =>
  14.         val json: JsObject = Json.parse(cm.message.bytes.utf8String).as[JsObject]
  15.         logger.debug(s"Original payload=$json")
  16.         val enrichedJson = json ++ Json.obj(
  17.           "timestamp" -> ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).toString.replace("[UTC]", ""))
  18.         logger.debug(s"Enriched payload=$enrichedJson")
  19.         publishToKafka(sink, enrichedJson).map(_ => cm)
  20.       }
  21.       .
  22.       .mapAsync(1) { _.ack() }
  23.       .runWith(Sink.ignore)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement