Advertisement
Guest User

Untitled

a guest
Aug 3rd, 2015
187
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.62 KB | None | 0 0
  1. import akka.actor._
  2. import akka.routing.{SmallestMailboxPool}
  3. import akka.testkit.{TestKit, ImplicitSender}
  4. import com.typesafe.config.ConfigFactory
  5. import jp.trifort.ifu.StopSystemAfterAll
  6. import org.scalatest.{MustMatchers, WordSpecLike}
  7. import scala.concurrent.{Await, Future}
  8. import akka.util.Timeout
  9. import scala.concurrent.duration._
  10.  
  11. class CountAActor extends Actor {
  12. var totalA = 0
  13. def receive: Receive = {
  14. case "How many?" => sender ! totalA
  15. case text: String => totalA += text.toUpperCase().count(_ == 'A')
  16. }
  17. }
  18.  
  19. object CountAActor {
  20. def props = Props(new CountAActor)
  21. }
  22.  
  23. trait RouterCreator {
  24. def createRouter = SmallestMailboxPool(100).props(CountAActor.props)
  25. }
  26.  
  27. class CountARouter extends Actor with RouterCreator {
  28. val countARouter = context.actorOf(createRouter)
  29. def receive: Receive = {
  30. case hm@"How many?" => {
  31. import akka.routing.Broadcast
  32. val reducer = context.actorOf(Reducer.props(sender(), 100))
  33. countARouter.tell(Broadcast(hm), reducer)
  34. }
  35. case msg => countARouter forward msg
  36. }
  37. }
  38.  
  39. object CountARouter {
  40. def props = Props(new CountARouter)
  41. }
  42.  
  43. class Reducer(sendTo: ActorRef, maxCount: Int) extends Actor {
  44. var total = 0; var count = 0
  45. def receive: Receive = {
  46. case sum: Int => {
  47. total += sum; count += 1
  48. if (count == maxCount) {
  49. sendTo ! total
  50. self ! PoisonPill
  51. }
  52. }
  53. }
  54. }
  55.  
  56. object Reducer {
  57. def props(sendTo: ActorRef, maxCount: Int) = Props(new Reducer(sendTo, maxCount))
  58. }
  59.  
  60.  
  61. class CrashActor extends Actor with ActorLogging {
  62. def receive: Receive = {
  63. case "Crash!!!" => throw new Exception("crashed!")
  64. }
  65.  
  66. override def preStart() {
  67. log.info("preStart")
  68. }
  69.  
  70. override def preRestart(reason: Throwable, message: Option[Any]) = {
  71. log.info("preRestart")
  72. super.preRestart(reason, message)
  73. }
  74.  
  75. override def postRestart(reason: Throwable) {
  76. log.info("postRestart")
  77. super.postRestart(reason)
  78. }
  79.  
  80. override def postStop() {
  81. log.info("postStop")
  82. }
  83. }
  84.  
  85. object CrashActor {
  86. def props = Props(new CrashActor)
  87. }
  88.  
  89. class Supervisor extends Actor {
  90. val crashActor = context.actorOf(CrashActor.props)
  91.  
  92. def receive: Receive = {
  93. case msg => crashActor forward msg
  94. }
  95. }
  96.  
  97. object Supervisor {
  98. def props = Props(new Supervisor)
  99. }
  100.  
  101. class SampleTest extends TestKit(ActorSystem("SampleSystem", ConfigFactory.empty()))
  102. with WordSpecLike with MustMatchers with ImplicitSender with StopSystemAfterAll {
  103.  
  104. "Future" must {
  105. import scala.concurrent.ExecutionContext.Implicits.global
  106. "map and flatMap" in {
  107. val futureMessage = Future {
  108. Thread.sleep(1000); 1
  109. }.flatMap(value => Future {
  110. Thread.sleep(1000); value + 1
  111. }).map(s => s"This is a value of future after $s seconds")
  112. Await.result(futureMessage, 5 seconds) must be("This is a value of future after 2 seconds")
  113. }
  114. "for comprehension" in {
  115. val futureMessage = for {
  116. s1 <- Future {
  117. Thread.sleep(1000); 1
  118. }
  119. s2 <- Future {
  120. Thread.sleep(1000); s1 + 1
  121. }
  122. } yield s"This is a value of future after $s2 seconds"
  123. Await.result(futureMessage, 5 seconds) must be("This is a value of future after 2 seconds")
  124. }
  125. }
  126.  
  127. "parallel collection" must {
  128. "behave same as standard one" in {
  129. val list = (0 to 9999).toList
  130. list.map(_ + 1).filter(_ % 2 == 0).fold(0)(_ + _) must be
  131. list.par.map(_ + 1).filter(_ % 2 == 0).fold(0)(_ + _)
  132. }
  133. }
  134.  
  135. "CountAActor" must {
  136. "count A and a" in {
  137. val countAActor = system.actorOf(CountAActor.props, "countAActor")
  138. countAActor ! "na" * 16
  139. countAActor ! "BATMAN!"
  140. countAActor ! "How many?"
  141. expectMsg(18)
  142. }
  143.  
  144. "count A and a in parallel" in {
  145. implicit val dispatcher = system.dispatcher
  146. implicit val timeout = Timeout(5 seconds)
  147. import akka.pattern.ask
  148. val countAActor1 = system.actorOf(CountAActor.props, "countAActor1")
  149. val countAActor2 = system.actorOf(CountAActor.props, "countAActor2")
  150. countAActor1 ! "na" * 16
  151. countAActor2 ! "BATMAN!"
  152. val futures = Seq(countAActor1, countAActor2).map(_ ? "How many?").map(_.mapTo[Int])
  153. val result = Future.sequence(futures).map(_.reduce(_ + _))
  154. Await.result(result, 5 seconds) must be(18)
  155. }
  156. }
  157.  
  158. "Router" must {
  159. "route messages" in {
  160. val router = system.actorOf(CountARouter.props, "CountARouter")
  161. Stream.fill(10000)("BATMAN!").foreach(router ! _)
  162. router ! "How many?"
  163. expectMsg(10000 * 2)
  164. }
  165. }
  166.  
  167. "Supervisor" must {
  168. "crush CrashActor" in {
  169. val supervisor = system.actorOf(Supervisor.props, "supervisor")
  170. supervisor ! "Crash!!!"
  171. Thread.sleep(1000)
  172. }
  173. }
  174. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement