Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import akka.actor._
- import akka.routing.{SmallestMailboxPool}
- import akka.testkit.{TestKit, ImplicitSender}
- import com.typesafe.config.ConfigFactory
- import jp.trifort.ifu.StopSystemAfterAll
- import org.scalatest.{MustMatchers, WordSpecLike}
- import scala.concurrent.{Await, Future}
- import akka.util.Timeout
- import scala.concurrent.duration._
- class CountAActor extends Actor {
- var totalA = 0
- def receive: Receive = {
- case "How many?" => sender ! totalA
- case text: String => totalA += text.toUpperCase().count(_ == 'A')
- }
- }
- object CountAActor {
- def props = Props(new CountAActor)
- }
- trait RouterCreator {
- def createRouter = SmallestMailboxPool(100).props(CountAActor.props)
- }
- class CountARouter extends Actor with RouterCreator {
- val countARouter = context.actorOf(createRouter)
- def receive: Receive = {
- case hm@"How many?" => {
- import akka.routing.Broadcast
- val reducer = context.actorOf(Reducer.props(sender(), 100))
- countARouter.tell(Broadcast(hm), reducer)
- }
- case msg => countARouter forward msg
- }
- }
- object CountARouter {
- def props = Props(new CountARouter)
- }
- class Reducer(sendTo: ActorRef, maxCount: Int) extends Actor {
- var total = 0; var count = 0
- def receive: Receive = {
- case sum: Int => {
- total += sum; count += 1
- if (count == maxCount) {
- sendTo ! total
- self ! PoisonPill
- }
- }
- }
- }
- object Reducer {
- def props(sendTo: ActorRef, maxCount: Int) = Props(new Reducer(sendTo, maxCount))
- }
- class CrashActor extends Actor with ActorLogging {
- def receive: Receive = {
- case "Crash!!!" => throw new Exception("crashed!")
- }
- override def preStart() {
- log.info("preStart")
- }
- override def preRestart(reason: Throwable, message: Option[Any]) = {
- log.info("preRestart")
- super.preRestart(reason, message)
- }
- override def postRestart(reason: Throwable) {
- log.info("postRestart")
- super.postRestart(reason)
- }
- override def postStop() {
- log.info("postStop")
- }
- }
- object CrashActor {
- def props = Props(new CrashActor)
- }
- class Supervisor extends Actor {
- val crashActor = context.actorOf(CrashActor.props)
- def receive: Receive = {
- case msg => crashActor forward msg
- }
- }
- object Supervisor {
- def props = Props(new Supervisor)
- }
- class SampleTest extends TestKit(ActorSystem("SampleSystem", ConfigFactory.empty()))
- with WordSpecLike with MustMatchers with ImplicitSender with StopSystemAfterAll {
- "Future" must {
- import scala.concurrent.ExecutionContext.Implicits.global
- "map and flatMap" in {
- val futureMessage = Future {
- Thread.sleep(1000); 1
- }.flatMap(value => Future {
- Thread.sleep(1000); value + 1
- }).map(s => s"This is a value of future after $s seconds")
- Await.result(futureMessage, 5 seconds) must be("This is a value of future after 2 seconds")
- }
- "for comprehension" in {
- val futureMessage = for {
- s1 <- Future {
- Thread.sleep(1000); 1
- }
- s2 <- Future {
- Thread.sleep(1000); s1 + 1
- }
- } yield s"This is a value of future after $s2 seconds"
- Await.result(futureMessage, 5 seconds) must be("This is a value of future after 2 seconds")
- }
- }
- "parallel collection" must {
- "behave same as standard one" in {
- val list = (0 to 9999).toList
- list.map(_ + 1).filter(_ % 2 == 0).fold(0)(_ + _) must be
- list.par.map(_ + 1).filter(_ % 2 == 0).fold(0)(_ + _)
- }
- }
- "CountAActor" must {
- "count A and a" in {
- val countAActor = system.actorOf(CountAActor.props, "countAActor")
- countAActor ! "na" * 16
- countAActor ! "BATMAN!"
- countAActor ! "How many?"
- expectMsg(18)
- }
- "count A and a in parallel" in {
- implicit val dispatcher = system.dispatcher
- implicit val timeout = Timeout(5 seconds)
- import akka.pattern.ask
- val countAActor1 = system.actorOf(CountAActor.props, "countAActor1")
- val countAActor2 = system.actorOf(CountAActor.props, "countAActor2")
- countAActor1 ! "na" * 16
- countAActor2 ! "BATMAN!"
- val futures = Seq(countAActor1, countAActor2).map(_ ? "How many?").map(_.mapTo[Int])
- val result = Future.sequence(futures).map(_.reduce(_ + _))
- Await.result(result, 5 seconds) must be(18)
- }
- }
- "Router" must {
- "route messages" in {
- val router = system.actorOf(CountARouter.props, "CountARouter")
- Stream.fill(10000)("BATMAN!").foreach(router ! _)
- router ! "How many?"
- expectMsg(10000 * 2)
- }
- }
- "Supervisor" must {
- "crush CrashActor" in {
- val supervisor = system.actorOf(Supervisor.props, "supervisor")
- supervisor ! "Crash!!!"
- Thread.sleep(1000)
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement