Advertisement
Guest User

Untitled

a guest
Sep 30th, 2016
58
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.21 KB | None | 0 0
  1. package net.petitviolet.ex.persistence.task
  2.  
  3. import akka.NotUsed
  4. import akka.actor._
  5. import akka.pattern.ask
  6. import akka.stream.actor.ActorSubscriberMessage.{ OnComplete, OnNext }
  7. import akka.stream.actor.{ ActorPublisher, ActorSubscriber, OneByOneRequestStrategy, RequestStrategy }
  8. import akka.stream.{ ActorMaterializer, ClosedShape }
  9. import akka.util.Timeout
  10. import org.reactivestreams.Publisher
  11.  
  12. import scala.concurrent.Future
  13. import scala.io.StdIn
  14.  
  15. private object AkkaStreamPracWithActor extends App {
  16. import akka.stream.scaladsl._
  17. implicit val system = ActorSystem("akka-stream-prac")
  18. implicit val executor = system.dispatcher
  19. implicit val materializer = ActorMaterializer()
  20.  
  21. case class Reply(value: String) extends AnyVal
  22. case object Finish
  23.  
  24. class PublishActor extends ActorPublisher[Reply] {
  25. // publish [[Reply]] or OnComplete
  26. override def receive: Actor.Receive = {
  27. case s: String =>
  28. onNext(Reply(s"Nice: $s"))
  29. case i: Int =>
  30. onNext(Reply(s"Great: ${i * 100}"))
  31. case Finish =>
  32. onComplete()
  33. }
  34. }
  35.  
  36. class FlowActor extends Actor {
  37. // subscribe and publish
  38. override def receive: Actor.Receive = {
  39. case Reply(msg) => sender() ! Reply(s"(Mapped: $msg)")
  40. case any => println(s"??? => $any")
  41. }
  42. }
  43.  
  44. class SubscribeActor extends ActorSubscriber {
  45. override protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy
  46.  
  47. // just subscribe
  48. override def receive: Actor.Receive = {
  49. case OnNext(any) => println(s"subscribed: $any")
  50. case OnComplete => println(s"finish process!")
  51. }
  52. }
  53.  
  54. // publisher actor
  55. val actorRef = system.actorOf(Props[PublishActor])
  56.  
  57. // source with actor
  58. val source: Source[Reply, NotUsed] = {
  59. val publisher: Publisher[Reply] = ActorPublisher(actorRef)
  60. Source.fromPublisher(publisher)
  61. }
  62.  
  63. // flow
  64. val flow: Flow[Reply, Reply, NotUsed] = {
  65. import scala.concurrent.duration._
  66. implicit val timeout: Timeout = 1.second
  67. val flowActor = system.actorOf(Props[FlowActor])
  68. def flowWithActor(reply: Reply): Future[Reply] = (flowActor ? reply).mapTo[Reply]
  69.  
  70. Flow[Reply].mapAsync[Reply](3)(flowWithActor)
  71. }
  72.  
  73. // simple implementation without actor
  74. val _flow: Flow[Reply, Reply, NotUsed] = Flow[Reply].map { r => r.copy(value = s"(Mapped: ${r.value})") }
  75.  
  76. // another flow without actor
  77. val accumulater: Flow[Reply, String, NotUsed] =
  78. Flow[Reply].fold("init") { (acc, rep) => s"$acc :: ${rep.value}" }
  79.  
  80. // sink with actor
  81. val sink: Sink[String, NotUsed] = {
  82. val printActor = system.actorOf(Props[SubscribeActor])
  83. Sink.fromSubscriber[String](ActorSubscriber[String](printActor))
  84. }
  85.  
  86. // simple graph
  87. val _graph: RunnableGraph[NotUsed] =
  88. RunnableGraph.fromGraph(source via flow via accumulater to sink)
  89.  
  90. // written by DSL
  91. val graph: RunnableGraph[NotUsed] = RunnableGraph.fromGraph {
  92. GraphDSL.create() { implicit builder =>
  93. import GraphDSL.Implicits._
  94. source ~> flow ~> accumulater ~> sink
  95. ClosedShape
  96. }
  97. }
  98.  
  99. graph.run
  100.  
  101. // wait preparing graph
  102. Thread.sleep(100L)
  103.  
  104. actorRef ! "hello!"
  105.  
  106. actorRef ! 100
  107.  
  108. actorRef ! "good"
  109.  
  110. actorRef ! Finish
  111.  
  112. println("push Enter to shutdown process.")
  113.  
  114. StdIn.readLine()
  115.  
  116. system.terminate()
  117. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement