Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import akka.Done
- import akka.stream.alpakka.amqp.scaladsl.CommittableReadResult
- import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
- import scala.concurrent.{ExecutionContext, Future}
- case class AckableMessage[+T](value: T, crr: CommittableReadResult, context: Any = ()) {
- def ack(): Future[Done] = crr.ack()
- def nack(requeue: Boolean): Future[Done] = crr.nack(requeue = requeue)
- def map[B](f: T => B): AckableMessage[B] = copy(value = f(value))
- def mapAsync[B](f: T => Future[B])(implicit ec: ExecutionContext): Future[AckableMessage[B]] = {
- f(value).map(result => copy(value = result))
- }
- }
- object Amqp {
- def ackableSource[T, Mat](
- underlying: Source[CommittableReadResult, Mat]
- )(f: CommittableReadResult => T)(context: CommittableReadResult => Any): Source[AckableMessage[T], Mat] = {
- underlying.map(t => AckableMessage(f(t), t, context(t)))
- }
- def ackableSink[T, Mat](underlying: Sink[T, Mat])(context: Any => Unit): Sink[AckableMessage[T], Mat] = {
- val nestedFlow = Flow[AckableMessage[T]].map(message => {
- message.ack()
- context(message.context)
- message.value
- })
- nestedFlow.toMat(underlying)(Keep.right)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement