Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package net.petitviolet.ex.persistence.task
- import akka.NotUsed
- import akka.actor._
- import akka.pattern.ask
- import akka.stream.actor.ActorSubscriberMessage.{ OnComplete, OnNext }
- import akka.stream.actor.{ ActorPublisher, ActorSubscriber, OneByOneRequestStrategy, RequestStrategy }
- import akka.stream.{ ActorMaterializer, ClosedShape }
- import akka.util.Timeout
- import org.reactivestreams.Publisher
- import scala.concurrent.Future
- import scala.io.StdIn
- private object AkkaStreamPracWithActor extends App {
- import akka.stream.scaladsl._
- implicit val system = ActorSystem("akka-stream-prac")
- implicit val executor = system.dispatcher
- implicit val materializer = ActorMaterializer()
- case class Reply(value: String) extends AnyVal
- case object Finish
- class PublishActor extends ActorPublisher[Reply] {
- // publish [[Reply]] or OnComplete
- override def receive: Actor.Receive = {
- case s: String =>
- onNext(Reply(s"Nice: $s"))
- case i: Int =>
- onNext(Reply(s"Great: ${i * 100}"))
- case Finish =>
- onComplete()
- }
- }
- class FlowActor extends Actor {
- // subscribe and publish
- override def receive: Actor.Receive = {
- case Reply(msg) => sender() ! Reply(s"(Mapped: $msg)")
- case any => println(s"??? => $any")
- }
- }
- class SubscribeActor extends ActorSubscriber {
- override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
- // just subscribe
- override def receive: Actor.Receive = {
- case OnNext(any) => println(s"subscribed: $any")
- case OnComplete => println(s"finish process!")
- }
- }
- // publisher actor
- val actorRef = system.actorOf(Props[PublishActor])
- // source with actor
- val source: Source[Reply, NotUsed] = {
- val publisher: Publisher[Reply] = ActorPublisher(actorRef)
- Source.fromPublisher(publisher)
- }
- // flow
- val flow: Flow[Reply, Reply, NotUsed] = {
- import scala.concurrent.duration._
- implicit val timeout: Timeout = 1.second
- val flowActor = system.actorOf(Props[FlowActor])
- def flowWithActor(reply: Reply): Future[Reply] = (flowActor ? reply).mapTo[Reply]
- Flow[Reply].mapAsync[Reply](3)(flowWithActor)
- }
- // simple implementation without actor
- val _flow: Flow[Reply, Reply, NotUsed] = Flow[Reply].map { r => r.copy(value = s"(Mapped: ${r.value})") }
- // another flow without actor
- val accumulater: Flow[Reply, String, NotUsed] =
- Flow[Reply].fold("init") { (acc, rep) => s"$acc :: ${rep.value}" }
- // sink with actor
- val sink: Sink[String, NotUsed] = {
- val printActor = system.actorOf(Props[SubscribeActor])
- Sink.fromSubscriber[String](ActorSubscriber[String](printActor))
- }
- // simple graph
- val _graph: RunnableGraph[NotUsed] =
- RunnableGraph.fromGraph(source via flow via accumulater to sink)
- // written by DSL
- val graph: RunnableGraph[NotUsed] = RunnableGraph.fromGraph {
- GraphDSL.create() { implicit builder =>
- import GraphDSL.Implicits._
- source ~> flow ~> accumulater ~> sink
- ClosedShape
- }
- }
- graph.run
- // wait preparing graph
- Thread.sleep(100L)
- actorRef ! "hello!"
- actorRef ! 100
- actorRef ! "good"
- actorRef ! Finish
- println("push Enter to shutdown process.")
- StdIn.readLine()
- system.terminate()
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement