Advertisement
Guest User

Untitled

a guest
May 25th, 2019
77
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.04 KB | None | 0 0
  1. import com.rabbitmq.client.{AMQP, Channel, ConnectionFactory, DefaultConsumer, Envelope}
  2. import scalaz.zio._
  3. import scalaz.zio.console.Console
  4.  
  5. object Runner extends App with DefaultRuntime {
  6.  
  7. case class AmqpMessage(consumerTag: String,
  8. envelope: Envelope,
  9. properties: AMQP.BasicProperties,
  10. body: Array[Byte],
  11. ack: Task[Unit])
  12.  
  13. type State = Int
  14.  
  15. def consumerUsing(channel:Channel, name:String, queue:ZQueue[Any,Nothing,Any,Nothing,AmqpMessage,AmqpMessage]): Task[String] =
  16. ZIO.effect(channel.basicConsume(name, new DefaultConsumer(channel) {
  17. override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]):Unit =
  18. unsafeRun(queue.offer(AmqpMessage(consumerTag,envelope,properties,body,Task.effect(channel.basicAck(envelope.getDeliveryTag,false)))))
  19. }))
  20.  
  21. def processMessage(n:State, queue: ZQueue[Any,Nothing,Any,Nothing, AmqpMessage, AmqpMessage]):ZIO[Console, Throwable, State] =
  22. ZIO.bracket(queue.take)(_.ack.option)(businessLogic(n,_)).flatMap(processMessage(_, queue))
  23.  
  24. def businessLogic(n:State, msg:AmqpMessage): ZIO[Console, Nothing, State] =
  25. console.putStrLn(s"$n - ${new String(msg.body)}") *> Task.succeed(n + 1)
  26.  
  27. def makeChannel:Task[Channel] =
  28. for {
  29. connFactory <- Task.effect(new ConnectionFactory)
  30. connection <- Task.effect(connFactory.newConnection())
  31. channel <- Task.effect(connection.createChannel())
  32. } yield channel
  33.  
  34. def createQueue(channel:Channel): Task[com.rabbitmq.client.AMQP.Queue.DeclareOk] =
  35. Task.effect(channel.queueDeclare())
  36.  
  37. val init: ZIO[Console, Throwable, Int] =
  38. for {
  39. channel <- makeChannel
  40. declareOk <- createQueue(channel)
  41. queue <- Queue.bounded[AmqpMessage](32)
  42. _ <- consumerUsing(channel, declareOk.getQueue, queue)
  43. _ <- processMessage(0,queue)
  44. } yield 0
  45.  
  46. override def run(args: List[String]): ZIO[Runner.Environment, Nothing, State] =
  47. init.orDie
  48.  
  49. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement