Advertisement
mitrakov

Amqp: Ackable Sources and Sinks

Aug 27th, 2019
773
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.19 KB | None | 0 0
  1. import akka.Done
  2. import akka.stream.alpakka.amqp.scaladsl.CommittableReadResult
  3. import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
  4.  
  5. import scala.concurrent.{ExecutionContext, Future}
  6.  
  7. case class AckableMessage[+T](value: T, crr: CommittableReadResult, context: Any = ()) {
  8.   def ack(): Future[Done] = crr.ack()
  9.  
  10.   def nack(requeue: Boolean): Future[Done] = crr.nack(requeue = requeue)
  11.  
  12.   def map[B](f: T => B): AckableMessage[B] = copy(value = f(value))
  13.  
  14.   def mapAsync[B](f: T => Future[B])(implicit ec: ExecutionContext): Future[AckableMessage[B]] = {
  15.     f(value).map(result => copy(value = result))
  16.   }
  17. }
  18.  
  19. object Amqp {
  20.   def ackableSource[T, Mat](
  21.     underlying: Source[CommittableReadResult, Mat]
  22.   )(f: CommittableReadResult => T)(context: CommittableReadResult => Any): Source[AckableMessage[T], Mat] = {
  23.     underlying.map(t => AckableMessage(f(t), t, context(t)))
  24.   }
  25.  
  26.   def ackableSink[T, Mat](underlying: Sink[T, Mat])(context: Any => Unit): Sink[AckableMessage[T], Mat] = {
  27.     val nestedFlow = Flow[AckableMessage[T]].map(message => {
  28.       message.ack()
  29.       context(message.context)
  30.       message.value
  31.     })
  32.     nestedFlow.toMat(underlying)(Keep.right)
  33.   }
  34. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement