Advertisement
Guest User

TW Lab 11

a guest
Jun 11th, 2014
204
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.60 KB | None | 0 0
  1. package PrCo
  2. import akka.actor.Actor
  3. import akka.actor.Stash
  4. import akka.actor.ActorRef
  5. import akka.actor.Props
  6. import akka.event.LoggingReceive
  7. import akka.actor.ActorSystem
  8. import com.typesafe.config.ConfigFactory
  9. import akka.actor.actorRef2Scala
  10. import scala.util.Random
  11.  
  12. // Exercise: implement solution to the producers/consumers problem using Akka actors
  13. // The Buffer actor and the main class are already implemented
  14. // You can use as many behaviors in Producer and Consumer actors as you find useful
  15. // object PC contains messages for all actors - add new if you need them
  16.  
  17. object PC {
  18.   val r = new Random()
  19.   var globalCounter = 0 // just for verification: increment after a successful production,
  20.                         //                        decremented after a succesful consumption
  21.   case class Init
  22.   case class Put(x: Long)
  23.   case class Get
  24.   case class ProduceDone
  25.   case class ConsumeDone(x: Long)
  26. }
  27.  
  28. class Producer(name: String, buf: ActorRef) extends Actor {
  29.   import PC._
  30.   var proba = 0
  31.   def receive = {
  32.     case Init => {
  33.       println("Zaczyna producent: "+name)
  34.       buf ! Put (proba)
  35.     }
  36.     case ProduceDone => {
  37.       proba = proba + 1
  38.       globalCounter = globalCounter + 1
  39.       println(name + " wyslal; za proba " + proba + "; globalCounter = " + globalCounter)
  40.       Thread.sleep(2000+r.nextInt(2000))
  41.       buf ! Put (proba)
  42.     }
  43.   }
  44. }
  45.  
  46. class Consumer(name: String, buf: ActorRef) extends Actor {
  47.   import PC._
  48.   var proba = 0
  49.   def receive = {
  50.     case Init => {
  51.       println("Zaczyna konsument: "+name)
  52.       buf ! Get
  53.     }
  54.     case ConsumeDone(x : Long) => {
  55.       proba = proba + 1
  56.       globalCounter = globalCounter - 1
  57.       println(name + " odebral; za proba " + proba + "; globalCounter = "+globalCounter)
  58.       Thread.sleep(2000+r.nextInt(2000))
  59.       buf ! Get
  60.     }
  61.   }
  62. }
  63.  
  64. // Buffer uses "Stash" to temporarily stash messages which cannot be handled immediately:
  65. // - consuming from an empty buffer
  66. // - producing to a full buffer
  67. // Documentation: http://doc.akka.io/api/akka/2.0/akka/actor/Stash.html
  68. class Buffer(n: Int) extends Actor with Stash {
  69.   import PC._
  70.  
  71.   private val buf = new Array[Long](n)
  72.   private var count = 0
  73.  
  74.   override def receive = {
  75.     case Put(x) if (count < n) => {
  76.       buf(count) = x
  77.       count += 1
  78.       sender ! ProduceDone
  79.       println(s"Buffer received $x, count=$count")
  80.       unstashAll()
  81.     }
  82.  
  83.     case Get if (count > 0) => {
  84.       count -= 1
  85.       val x = buf(count)
  86.       sender ! ConsumeDone(x)
  87.       println(s"Buffer sent $x, count=$count")
  88.       unstashAll()
  89.     }
  90.  
  91.     case _ => stash()
  92.   }
  93. }
  94.  
  95. class ProdConsMain extends Actor {
  96.   import PC._
  97.  
  98.   // create the Buffer actor
  99.   // special configuration is required for an actor to use Stash
  100.   val customConfig = "bounded-buffer-dispatcher.mailbox-type=\"akka.dispatch.UnboundedDequeBasedMailbox\""
  101.   val config = ConfigFactory.parseString(customConfig)
  102.   val system = ActorSystem("bounded-buffer", ConfigFactory.load(config))
  103.   val bb = system.actorOf(Props(new Buffer(10)).withDispatcher("bounded-buffer-dispatcher"), name = "bounded-buffer")
  104.  
  105.   // create Producer actors
  106.   for (i <- 1 to 10) {
  107.     val p = system.actorOf(Props(new Producer(s"producer-$i", bb)), name = s"producer-$i")
  108.     p ! Init // initial message to kick off a producer
  109.   }
  110.  
  111.   // create Consumer actors
  112.   for (i <- 1 to 10) {
  113.     val c = system.actorOf(Props(new Consumer(s"consumer-$i", bb)), name = s"consumer-$i")
  114.     c ! Init // initial message to kick off a consumer
  115.   }
  116.  
  117.   def receive = LoggingReceive {
  118.     case x => println(x)
  119.   }
  120. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement