Advertisement
Guest User

Untitled

a guest
Nov 19th, 2018
85
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 4.30 KB | None | 0 0
  1. package com.alefeducation.dataconnector
  2.  
  3. import akka.Done
  4. import akka.actor.ActorSystem
  5. import akka.kafka.ProducerSettings
  6. import akka.kafka.scaladsl.Producer
  7. import akka.stream.ActorMaterializer
  8. import akka.stream.alpakka.amqp.{
  9.   AmqpDetailsConnectionProvider,
  10.   AmqpSinkSettings,
  11.   NamedQueueSourceSettings,
  12.   QueueDeclaration
  13. }
  14. import akka.stream.alpakka.amqp.scaladsl.{AmqpSink, AmqpSource}
  15. import akka.stream.scaladsl.{RestartSink, RestartSource, Sink, Source}
  16. import akka.http.scaladsl.server.Directives._
  17. import akka.http.scaladsl.Http
  18. import akka.http.scaladsl.model.{HttpResponse, StatusCodes}
  19. import akka.util.ByteString
  20. import com.typesafe.scalalogging.StrictLogging
  21. import org.apache.kafka.clients.producer.ProducerRecord
  22. import org.apache.kafka.common.serialization.StringSerializer
  23. import play.api.libs.json.{JsObject, Json}
  24. import java.time._
  25.  
  26. import scala.concurrent.{ExecutionContextExecutor, Future}
  27. import scala.concurrent.duration._
  28.  
  29. object Application extends StrictLogging {
  30.   private val SERVICE_NAME = "alef-data-connector-akka-streams"
  31.  
  32.   private implicit val system: ActorSystem = ActorSystem(SERVICE_NAME)
  33.   private implicit val ec: ExecutionContextExecutor = system.dispatcher
  34.   private implicit val materializer: ActorMaterializer = ActorMaterializer()
  35.  
  36.   def main(args: Array[String]): Unit = {
  37.     // https://github.com/akka/alpakka/issues/1270
  38.     val connectionProvider = AmqpDetailsConnectionProvider("localhost", 5672)
  39.       .withAutomaticRecoveryEnabled(false)
  40.       .withTopologyRecoveryEnabled(false)
  41.     val queueName = SERVICE_NAME
  42.     val queueDeclaration = QueueDeclaration(SERVICE_NAME).withDurable(true)
  43.     val amqpSource = RestartSource
  44.       .withBackoff(
  45.         minBackoff = 5.seconds,
  46.         maxBackoff = 60.seconds,
  47.         randomFactor = .2
  48.       ) { () =>
  49.         AmqpSource.committableSource(
  50.           NamedQueueSourceSettings(connectionProvider, queueName).withDeclaration(queueDeclaration),
  51.           bufferSize = 1)
  52.       }
  53.     val amqpSink = RestartSink
  54.       .withBackoff(
  55.         minBackoff = 5.seconds,
  56.         maxBackoff = 60.seconds,
  57.         randomFactor = .2
  58.       ) { () =>
  59.         AmqpSink.simple(
  60.           AmqpSinkSettings(connectionProvider).withRoutingKey(queueName).withDeclaration(queueDeclaration))
  61.       }
  62.       .mapMaterializedValue[Future[Done]](_ => Future.successful(Done))
  63.  
  64.     val kafkaProducerSetting = ProducerSettings(system, new StringSerializer, new StringSerializer)
  65.       .withBootstrapServers("localhost:9092")
  66.     val kafkaSink = RestartSink
  67.       .withBackoff(
  68.         minBackoff = 5.seconds,
  69.         maxBackoff = 60.seconds,
  70.         randomFactor = .2
  71.       ) { () =>
  72.         Producer.plainSink(kafkaProducerSetting)
  73.       }
  74.       .mapMaterializedValue[Future[Done]](_ => Future.successful(Done))
  75.  
  76.     amqpSource
  77.       .mapAsync(1) { committableMessage =>
  78.         val json: JsObject = Json.parse(committableMessage.message.bytes.utf8String).as[JsObject]
  79.         logger.debug(s"Original payload=$json")
  80.         val enrichedJson = enrichStatement(json)
  81.         logger.debug(s"Enriched payload=$enrichedJson")
  82.         Source
  83.           .single(enrichedJson)
  84.           .map(_.toString)
  85.           .map(value => new ProducerRecord[String, String](SERVICE_NAME, value))
  86.           .runWith(kafkaSink)
  87.           .map(_ => committableMessage)
  88.       }
  89.       .mapAsync(1) { _.ack() }
  90.       .runWith(Sink.ignore)
  91.  
  92.     val route = path("api" / "statements") {
  93.       post {
  94.         entity(as[ByteString]) { payload =>
  95.           complete(
  96.             Source
  97.               .single(payload)
  98.               .runWith(amqpSink)
  99.               .map(_ => HttpResponse(StatusCodes.OK))
  100.           )
  101.         }
  102.       }
  103.     }
  104.     Http().bindAndHandle(route, "localhost", 8080)
  105.   }
  106.  
  107.   private def timestamp =
  108.     ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).toString.replace("[UTC]", "")
  109.  
  110.   private def enrichStatement(json: JsObject) = {
  111.     val extensions: JsObject = (json \ "context" \ "extensions" \ "http://alefeducation.com").get.as[JsObject]
  112.     val enrichedExtensions = extensions ++ Json.obj("tenant" -> Json.obj("id" -> "31337"))
  113.     json ++ Json.obj("context" -> Json.obj("extensions" -> Json.obj("http://alefeducation.com" -> enrichedExtensions)),
  114.                      "timestamp" -> timestamp)
  115.   }
  116.  
  117. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement