Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import com.rabbitmq.client.{AMQP, Channel, ConnectionFactory, DefaultConsumer, Envelope}
- import scalaz.zio._
- import scalaz.zio.console.Console
- object Runner extends App with DefaultRuntime {
- case class AmqpMessage(consumerTag: String,
- envelope: Envelope,
- properties: AMQP.BasicProperties,
- body: Array[Byte],
- ack: Task[Unit])
- type State = Int
- def consumerUsing(channel:Channel, name:String, queue:ZQueue[Any,Nothing,Any,Nothing,AmqpMessage,AmqpMessage]): Task[String] =
- ZIO.effect(channel.basicConsume(name, new DefaultConsumer(channel) {
- override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]):Unit =
- unsafeRun(queue.offer(AmqpMessage(consumerTag,envelope,properties,body,Task.effect(channel.basicAck(envelope.getDeliveryTag,false)))))
- }))
- def processMessage(n:State, queue: ZQueue[Any,Nothing,Any,Nothing, AmqpMessage, AmqpMessage]):ZIO[Console, Throwable, State] =
- ZIO.bracket(queue.take)(_.ack.option)(businessLogic(n,_)).flatMap(processMessage(_, queue))
- def businessLogic(n:State, msg:AmqpMessage): ZIO[Console, Nothing, State] =
- console.putStrLn(s"$n - ${new String(msg.body)}") *> Task.succeed(n + 1)
- def makeChannel:Task[Channel] =
- for {
- connFactory <- Task.effect(new ConnectionFactory)
- connection <- Task.effect(connFactory.newConnection())
- channel <- Task.effect(connection.createChannel())
- } yield channel
- def createQueue(channel:Channel): Task[com.rabbitmq.client.AMQP.Queue.DeclareOk] =
- Task.effect(channel.queueDeclare())
- val init: ZIO[Console, Throwable, Int] =
- for {
- channel <- makeChannel
- declareOk <- createQueue(channel)
- queue <- Queue.bounded[AmqpMessage](32)
- _ <- consumerUsing(channel, declareOk.getQueue, queue)
- _ <- processMessage(0,queue)
- } yield 0
- override def run(args: List[String]): ZIO[Runner.Environment, Nothing, State] =
- init.orDie
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement