Advertisement
mitrakov

AMQP on Alpakka

Aug 14th, 2019
491
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.94 KB | None | 0 0
  1. // Simple AMQP App on RabbitMQ
  2.  
  3. // Consumer.scala:
  4. import akka.NotUsed
  5. import akka.stream.{ClosedShape, Materializer}
  6. import akka.stream.alpakka.amqp._
  7. import akka.stream.alpakka.amqp.scaladsl.AmqpSource
  8. import akka.stream.scaladsl.{GraphDSL, RestartSource, RunnableGraph, Sink, Source}
  9. import akka.util.ByteString
  10.  
  11. import scala.concurrent.duration._
  12. import scala.language.postfixOps
  13.  
  14. class Consumer {
  15.   def start(implicit mat: Materializer): NotUsed = {
  16.     RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  17.       import GraphDSL.Implicits._
  18.  
  19.       source ~> Sink.foreach(println)
  20.  
  21.       ClosedShape
  22.     }).run()
  23.   }
  24.  
  25.   private def source: Source[ByteString, NotUsed] = {
  26.     val qos = 128
  27.     val host = "127.0.0.1"
  28.     val port = 5672
  29.     val user = "guest"
  30.     val pass = "guest"
  31.     val connectionTimeoutSec = 10
  32.     val exchangeName = "fox"
  33.     val queueName = "fox-queue"
  34.  
  35.     val connection = AmqpDetailsConnectionProvider(host, port)
  36.       .withCredentials(AmqpCredentials(user, pass))
  37.       .withConnectionTimeout(connectionTimeoutSec)
  38.       .withAutomaticRecoveryEnabled(false)
  39.       .withTopologyRecoveryEnabled(false)
  40.     val exchangeDeclaration = ExchangeDeclaration(exchangeName, exchangeType = "direct")
  41.       .withDurable(true).withInternal(false).withAutoDelete(false)
  42.     val queueDeclaration = QueueDeclaration(queueName)
  43.       .withDurable(true).withAutoDelete(false).withArguments(Map[String, AnyRef]("x-message-ttl" -> Integer.valueOf(10000)))
  44.     val bindingDeclaration = BindingDeclaration(queueName, exchangeName)
  45.  
  46.     val settings = NamedQueueSourceSettings(connection, queueName).withDeclarations(List(exchangeDeclaration, queueDeclaration, bindingDeclaration))
  47.  
  48.     val source = AmqpSource.atMostOnceSource(settings, qos).map(_.bytes)
  49.     RestartSource.withBackoff(minBackoff = 5 seconds, maxBackoff = 1 minute, randomFactor = 0.2)(() => source)
  50.   }
  51. }
  52.  
  53.  
  54.  
  55. // Producer.scala:
  56. import akka.NotUsed
  57. import akka.stream.alpakka.amqp._
  58. import akka.stream.alpakka.amqp.scaladsl.AmqpSink
  59. import akka.stream.scaladsl.{GraphDSL, RestartSink, RunnableGraph, Sink, Source}
  60. import akka.stream.{ClosedShape, Materializer}
  61. import akka.util.ByteString
  62.  
  63. import scala.concurrent.duration._
  64. import scala.language.postfixOps
  65.  
  66. class Producer {
  67.   def publish(msg: String)(implicit mat: Materializer): NotUsed = {
  68.     RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
  69.       import GraphDSL.Implicits._
  70.  
  71.       Source.single(ByteString(msg)) ~> sink
  72.  
  73.       ClosedShape
  74.     }).run()
  75.   }
  76.  
  77.   private lazy val sink: Sink[ByteString, NotUsed] = {
  78.     val host = "127.0.0.1"
  79.     val port = 5672
  80.     val user = "guest"
  81.     val pass = "guest"
  82.     val connectionTimeoutSec = 10
  83.     val exchangeName = "fox"
  84.  
  85.     val connection = AmqpDetailsConnectionProvider(host, port)
  86.       .withCredentials(AmqpCredentials(user, pass))
  87.       .withConnectionTimeout(connectionTimeoutSec)
  88.       .withAutomaticRecoveryEnabled(false)
  89.       .withTopologyRecoveryEnabled(false)
  90.  
  91.     val settings = AmqpWriteSettings(connection).withExchange(exchangeName)
  92.  
  93.     val sink = AmqpSink.simple(settings)
  94.     RestartSink.withBackoff(minBackoff = 5 seconds, maxBackoff = 1 minute, randomFactor = 0.2)(() => sink)
  95.   }
  96. }
  97.  
  98.  
  99.  
  100. // Starter.scala:
  101. import akka.actor.ActorSystem
  102. import akka.stream.{ActorMaterializer, Materializer}
  103.  
  104. import scala.io.StdIn
  105.  
  106. object Starter extends App {
  107.   if (args.isEmpty) {
  108.     Console.err.println("Usage java -jar <program> <consumer|producer>")
  109.     System.exit(8)
  110.   }
  111.  
  112.   implicit val as: ActorSystem = ActorSystem()
  113.   implicit val mat: Materializer = ActorMaterializer()
  114.  
  115.   args.head.toLowerCase.trim match {
  116.     case "consumer" =>
  117.       new Consumer().start
  118.       println("Consumer started...")
  119.     case "producer" =>
  120.       val producer = new Producer
  121.       println("Producer started...")
  122.       while (true) {
  123.         print("> ")
  124.         producer.publish(StdIn.readLine())
  125.       }
  126.   }
  127. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement