Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import com.rabbitmq.client._
- import monix.eval.Task
- import monix.execution.Scheduler
- import monix.reactive.Observable
- import monix.reactive.observers.Subscriber
- import org.slf4j.Logger
- class RabbitCancellationException(message: String) extends Throwable(message)
- case class RabbitAckableMessage(message: Delivery, channel: Channel)
- object RabbitConnection {
- def connectUnsafe(host: String, port: Int, user: String, password: String): Connection = {
- val cf = new ConnectionFactory()
- cf.setHost(host)
- cf.setPort(port)
- cf.setUsername(user)
- cf.setPassword(password)
- cf.newConnection()
- }
- def connect(host: String, port: Int, user: String, password: String): Task[Connection] =
- Task.eval(connectUnsafe(host,port, user, password))
- }
- object RabbitChannel {
- def apply(connection: Connection, maxUnacknowledged: Int = 3000)(implicit logger : Logger): Task[Channel] = Task.eval {
- val channel = connection.createChannel()
- channel.basicQos(100, maxUnacknowledged, false)
- channel
- }
- }
- object RabbitQueuePushBasedObservable {
- // we should only ack after successful sink to a persistent store to guarantee message delivery
- val autoAck: Boolean = false
- def createPushBasedConsumer(subscriber: Subscriber[RabbitAckableMessage], channel: Channel, queue: String)(implicit logger : Logger): Task[Unit] =
- Task.eval(
- channel.basicConsume(queue, autoAck,
- // New message handler
- new DeliverCallback {
- override def handle(consumerTag: String, message: Delivery): Unit = {
- subscriber.onNext(RabbitAckableMessage(message, channel))
- }
- },
- // Consumer cancelled handler
- new CancelCallback {
- override def handle(consumerTag: String): Unit = {
- logger.warn(s"Observable channel closed by rabbit. Will attempt to restart. Consumer tag: ${consumerTag}")
- Task.raiseError(
- new RabbitCancelationException(consumerTag)
- )
- }
- }
- )
- )
- def createRecoverableConsumer(subscriber: Subscriber[RabbitAckableMessage], connection: Connection, queue: String)(implicit logger : Logger): Task[Unit] =
- RabbitChannel(connection).flatMap[Unit] {
- channel => createPushBasedConsumer(subscriber, channel, queue)
- }.onErrorRecoverWith[Unit] {
- case a: RabbitCancelationException =>
- logger.warn(s"Recovering from channel closed: ", a)
- createRecoverableConsumer(subscriber, connection, queue)
- }
- def apply(queue: String, connection: Connection)(implicit logger : Logger): Observable[RabbitAckableMessage] = Observable.unsafeCreate {
- subscriber =>
- implicit val s: Scheduler = subscriber.scheduler
- logger.info("Subscribing to observable")
- createRecoverableConsumer(subscriber, connection, queue).runToFuture
- }
- }
Add Comment
Please, Sign In to add comment