Guest User

Untitled

a guest
Feb 3rd, 2019
142
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.78 KB | None | 0 0
  1. import com.rabbitmq.client._
  2. import monix.eval.Task
  3. import monix.execution.Scheduler
  4. import monix.reactive.Observable
  5. import monix.reactive.observers.Subscriber
  6. import org.slf4j.Logger
  7.  
  8. class RabbitCancellationException(message: String) extends Throwable(message)
  9.  
  10. case class RabbitAckableMessage(message: Delivery, channel: Channel)
  11.  
  12. object RabbitConnection {
  13. def connectUnsafe(host: String, port: Int, user: String, password: String): Connection = {
  14. val cf = new ConnectionFactory()
  15. cf.setHost(host)
  16. cf.setPort(port)
  17. cf.setUsername(user)
  18. cf.setPassword(password)
  19. cf.newConnection()
  20. }
  21.  
  22. def connect(host: String, port: Int, user: String, password: String): Task[Connection] =
  23. Task.eval(connectUnsafe(host,port, user, password))
  24. }
  25.  
  26. object RabbitChannel {
  27. def apply(connection: Connection, maxUnacknowledged: Int = 3000)(implicit logger : Logger): Task[Channel] = Task.eval {
  28. val channel = connection.createChannel()
  29. channel.basicQos(100, maxUnacknowledged, false)
  30. channel
  31. }
  32. }
  33.  
  34. object RabbitQueuePushBasedObservable {
  35. // we should only ack after successful sink to a persistent store to guarantee message delivery
  36. val autoAck: Boolean = false
  37.  
  38. def createPushBasedConsumer(subscriber: Subscriber[RabbitAckableMessage], channel: Channel, queue: String)(implicit logger : Logger): Task[Unit] =
  39. Task.eval(
  40. channel.basicConsume(queue, autoAck,
  41. // New message handler
  42. new DeliverCallback {
  43. override def handle(consumerTag: String, message: Delivery): Unit = {
  44. subscriber.onNext(RabbitAckableMessage(message, channel))
  45. }
  46. },
  47. // Consumer cancelled handler
  48. new CancelCallback {
  49. override def handle(consumerTag: String): Unit = {
  50. logger.warn(s"Observable channel closed by rabbit. Will attempt to restart. Consumer tag: ${consumerTag}")
  51. Task.raiseError(
  52. new RabbitCancelationException(consumerTag)
  53. )
  54. }
  55. }
  56. )
  57. )
  58.  
  59. def createRecoverableConsumer(subscriber: Subscriber[RabbitAckableMessage], connection: Connection, queue: String)(implicit logger : Logger): Task[Unit] =
  60. RabbitChannel(connection).flatMap[Unit] {
  61. channel => createPushBasedConsumer(subscriber, channel, queue)
  62. }.onErrorRecoverWith[Unit] {
  63. case a: RabbitCancelationException =>
  64. logger.warn(s"Recovering from channel closed: ", a)
  65. createRecoverableConsumer(subscriber, connection, queue)
  66. }
  67.  
  68. def apply(queue: String, connection: Connection)(implicit logger : Logger): Observable[RabbitAckableMessage] = Observable.unsafeCreate {
  69. subscriber =>
  70. implicit val s: Scheduler = subscriber.scheduler
  71.  
  72. logger.info("Subscribing to observable")
  73. createRecoverableConsumer(subscriber, connection, queue).runToFuture
  74. }
  75. }
Add Comment
Please, Sign In to add comment