Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // Simple AMQP App on RabbitMQ
- // Consumer.scala:
- import akka.NotUsed
- import akka.stream.{ClosedShape, Materializer}
- import akka.stream.alpakka.amqp._
- import akka.stream.alpakka.amqp.scaladsl.AmqpSource
- import akka.stream.scaladsl.{GraphDSL, RestartSource, RunnableGraph, Sink, Source}
- import akka.util.ByteString
- import scala.concurrent.duration._
- import scala.language.postfixOps
- class Consumer {
- def start(implicit mat: Materializer): NotUsed = {
- RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
- import GraphDSL.Implicits._
- source ~> Sink.foreach(println)
- ClosedShape
- }).run()
- }
- private def source: Source[ByteString, NotUsed] = {
- val qos = 128
- val host = "127.0.0.1"
- val port = 5672
- val user = "guest"
- val pass = "guest"
- val connectionTimeoutSec = 10
- val exchangeName = "fox"
- val queueName = "fox-queue"
- val connection = AmqpDetailsConnectionProvider(host, port)
- .withCredentials(AmqpCredentials(user, pass))
- .withConnectionTimeout(connectionTimeoutSec)
- .withAutomaticRecoveryEnabled(false)
- .withTopologyRecoveryEnabled(false)
- val exchangeDeclaration = ExchangeDeclaration(exchangeName, exchangeType = "direct")
- .withDurable(true).withInternal(false).withAutoDelete(false)
- val queueDeclaration = QueueDeclaration(queueName)
- .withDurable(true).withAutoDelete(false).withArguments(Map[String, AnyRef]("x-message-ttl" -> Integer.valueOf(10000)))
- val bindingDeclaration = BindingDeclaration(queueName, exchangeName)
- val settings = NamedQueueSourceSettings(connection, queueName).withDeclarations(List(exchangeDeclaration, queueDeclaration, bindingDeclaration))
- val source = AmqpSource.atMostOnceSource(settings, qos).map(_.bytes)
- RestartSource.withBackoff(minBackoff = 5 seconds, maxBackoff = 1 minute, randomFactor = 0.2)(() => source)
- }
- }
- // Producer.scala:
- import akka.NotUsed
- import akka.stream.alpakka.amqp._
- import akka.stream.alpakka.amqp.scaladsl.AmqpSink
- import akka.stream.scaladsl.{GraphDSL, RestartSink, RunnableGraph, Sink, Source}
- import akka.stream.{ClosedShape, Materializer}
- import akka.util.ByteString
- import scala.concurrent.duration._
- import scala.language.postfixOps
- class Producer {
- def publish(msg: String)(implicit mat: Materializer): NotUsed = {
- RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
- import GraphDSL.Implicits._
- Source.single(ByteString(msg)) ~> sink
- ClosedShape
- }).run()
- }
- private lazy val sink: Sink[ByteString, NotUsed] = {
- val host = "127.0.0.1"
- val port = 5672
- val user = "guest"
- val pass = "guest"
- val connectionTimeoutSec = 10
- val exchangeName = "fox"
- val connection = AmqpDetailsConnectionProvider(host, port)
- .withCredentials(AmqpCredentials(user, pass))
- .withConnectionTimeout(connectionTimeoutSec)
- .withAutomaticRecoveryEnabled(false)
- .withTopologyRecoveryEnabled(false)
- val settings = AmqpWriteSettings(connection).withExchange(exchangeName)
- val sink = AmqpSink.simple(settings)
- RestartSink.withBackoff(minBackoff = 5 seconds, maxBackoff = 1 minute, randomFactor = 0.2)(() => sink)
- }
- }
- // Starter.scala:
- import akka.actor.ActorSystem
- import akka.stream.{ActorMaterializer, Materializer}
- import scala.io.StdIn
- object Starter extends App {
- if (args.isEmpty) {
- Console.err.println("Usage java -jar <program> <consumer|producer>")
- System.exit(8)
- }
- implicit val as: ActorSystem = ActorSystem()
- implicit val mat: Materializer = ActorMaterializer()
- args.head.toLowerCase.trim match {
- case "consumer" =>
- new Consumer().start
- println("Consumer started...")
- case "producer" =>
- val producer = new Producer
- println("Producer started...")
- while (true) {
- print("> ")
- producer.publish(StdIn.readLine())
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement