Advertisement
Guest User

Untitled

a guest
Aug 28th, 2015
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.32 KB | None | 0 0
  1. package scheduling_backend.read_model.reports.flows
  2.  
  3. import akka.actor.{Actor, ActorRef, ActorSystem, Props}
  4. import akka.stream.OverflowStrategy
  5. import akka.stream.scaladsl._
  6. import cqrs.command.DomainEvent
  7. import scheduling_backend.read_model.reports.Probe
  8. import scheduling_backend.write_model.domain.RotationEvents.LegAddedToRotation
  9.  
  10. import scala.concurrent.Future
  11. import scala.util.{Failure, Success}
  12.  
  13. /**
  14.  * @author mciolek
  15.  */
  16. object AsyncMapFlow {
  17.   def apply[T <: DomainEvent](fun: T => Future[Probe])(implicit actorSystem: ActorSystem) = Flow() { implicit b ⇒
  18.  
  19.     val sinkActor = actorSystem.actorOf(Props[SomeActor]())
  20.  
  21.     val sink = Sink.actorRef(sinkActor, StreamCompleted)
  22.     val source = Source.actorRef[Probe](100, OverflowStrategy.backpressure)
  23.    
  24.     (sink.shape.inlet, source.shape.outlet)
  25.   }
  26.  
  27.   case object StreamCompleted
  28.  
  29.   class SomeActor(fun: DomainEvent => Future[Probe], target: ActorRef) extends Actor {
  30.     override def receive = {
  31.       case event: LegAddedToRotation => fun.apply(event).onComplete {
  32.         case Success(probe) => target ! probe
  33.         case Failure(e) => _
  34.       }
  35.       case StreamCompleted => println("Do sth...")
  36.     }
  37.   }
  38.  
  39. }
  40.  
  41. //Usage
  42. legAddedToRotationBroadcast ~> AsyncMapFlow[LegAddedToRotation](leg => Future.successful[Probe](null)) ~> probeSink
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement