Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package com.alefeducation.dataconnector
- import akka.Done
- import akka.actor.ActorSystem
- import akka.kafka.ProducerSettings
- import akka.kafka.scaladsl.Producer
- import akka.stream.ActorMaterializer
- import akka.stream.alpakka.amqp.{
- AmqpDetailsConnectionProvider,
- AmqpSinkSettings,
- NamedQueueSourceSettings,
- QueueDeclaration
- }
- import akka.stream.alpakka.amqp.scaladsl.{AmqpSink, AmqpSource}
- import akka.stream.scaladsl.{RestartSink, RestartSource, Sink, Source}
- import akka.http.scaladsl.server.Directives._
- import akka.http.scaladsl.Http
- import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
- import akka.util.ByteString
- import com.typesafe.scalalogging.StrictLogging
- import org.apache.kafka.clients.producer.ProducerRecord
- import org.apache.kafka.common.serialization.StringSerializer
- import play.api.libs.json.{JsObject, Json}
- import java.time._
- import scala.concurrent.{ExecutionContextExecutor, Future}
- import scala.concurrent.duration._
- object Application extends StrictLogging {
- private val SERVICE_NAME = "alef-data-connector-akka-streams"
- private implicit val system: ActorSystem = ActorSystem(SERVICE_NAME)
- private implicit val ec: ExecutionContextExecutor = system.dispatcher
- private implicit val materializer: ActorMaterializer = ActorMaterializer()
- def main(args: Array[String]): Unit = {
- // https://github.com/akka/alpakka/issues/1270
- val connectionProvider = AmqpDetailsConnectionProvider("localhost", 5672)
- .withAutomaticRecoveryEnabled(false)
- .withTopologyRecoveryEnabled(false)
- val queueName = SERVICE_NAME
- val queueDeclaration = QueueDeclaration(SERVICE_NAME).withDurable(true)
- val amqpSource = RestartSource
- .withBackoff(
- minBackoff = 5.seconds,
- maxBackoff = 60.seconds,
- randomFactor = .2
- ) { () =>
- AmqpSource.committableSource(
- NamedQueueSourceSettings(connectionProvider, queueName).withDeclaration(queueDeclaration),
- bufferSize = 1)
- }
- val amqpSink = RestartSink
- .withBackoff(
- minBackoff = 5.seconds,
- maxBackoff = 60.seconds,
- randomFactor = .2
- ) { () =>
- AmqpSink.simple(
- AmqpSinkSettings(connectionProvider).withRoutingKey(queueName).withDeclaration(queueDeclaration))
- }
- .mapMaterializedValue[Future[Done]](_ => Future.successful(Done))
- val kafkaProducerSetting = ProducerSettings(system, new StringSerializer, new StringSerializer)
- .withBootstrapServers("localhost:9092")
- val kafkaSink = RestartSink
- .withBackoff(
- minBackoff = 5.seconds,
- maxBackoff = 60.seconds,
- randomFactor = .2
- ) { () =>
- Producer.plainSink(kafkaProducerSetting)
- }
- .mapMaterializedValue[Future[Done]](_ => Future.successful(Done))
- amqpSource
- .mapAsync(1) { committableMessage =>
- val json: JsObject = Json.parse(committableMessage.message.bytes.utf8String).as[JsObject]
- logger.debug(s"Original payload=$json")
- val enrichedJson = enrichStatement(json)
- logger.debug(s"Enriched payload=$enrichedJson")
- Source
- .single(enrichedJson)
- .map(_.toString)
- .map(value => new ProducerRecord[String, String](SERVICE_NAME, value))
- .runWith(kafkaSink)
- .map(_ => committableMessage)
- }
- .mapAsync(1) { _.ack() }
- .runWith(Sink.ignore)
- val route = path("api" / "statements") {
- post {
- entity(as[ByteString]) { payload =>
- complete(
- Source
- .single(payload)
- .runWith(amqpSink)
- .map(_ => HttpResponse(StatusCodes.OK))
- )
- }
- }
- }
- Http().bindAndHandle(route, "localhost", 8080)
- }
- private def timestamp =
- ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).toString.replace("[UTC]", "")
- private def enrichStatement(json: JsObject) = {
- val extensions: JsObject = (json \ "context" \ "extensions" \ "http://alefeducation.com").get.as[JsObject]
- val enrichedExtensions = extensions ++ Json.obj("tenant" -> Json.obj("id" -> "31337"))
- json ++ Json.obj("context" -> Json.obj("extensions" -> Json.obj("http://alefeducation.com" -> enrichedExtensions)),
- "timestamp" -> timestamp)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement