Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package scheduling_backend.read_model.reports.flows
- import akka.actor.{Actor, ActorRef, ActorSystem, Props}
- import akka.stream.OverflowStrategy
- import akka.stream.scaladsl._
- import cqrs.command.DomainEvent
- import scheduling_backend.read_model.reports.Probe
- import scheduling_backend.write_model.domain.RotationEvents.LegAddedToRotation
- import scala.concurrent.Future
- import scala.util.{Failure, Success}
- /**
- * @author mciolek
- */
- object AsyncMapFlow {
- def apply[T <: DomainEvent](fun: T => Future[Probe])(implicit actorSystem: ActorSystem) = Flow() { implicit b ⇒
- val sinkActor = actorSystem.actorOf(Props[SomeActor]())
- val sink = Sink.actorRef(sinkActor, StreamCompleted)
- val source = Source.actorRef[Probe](100, OverflowStrategy.backpressure)
- (sink.shape.inlet, source.shape.outlet)
- }
- case object StreamCompleted
- class SomeActor(fun: DomainEvent => Future[Probe], target: ActorRef) extends Actor {
- override def receive = {
- case event: LegAddedToRotation => fun.apply(event).onComplete {
- case Success(probe) => target ! probe
- case Failure(e) => _
- }
- case StreamCompleted => println("Do sth...")
- }
- }
- }
- //Usage
- legAddedToRotationBroadcast ~> AsyncMapFlow[LegAddedToRotation](leg => Future.successful[Probe](null)) ~> probeSink
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement