Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package PrCo
- import akka.actor.Actor
- import akka.actor.Stash
- import akka.actor.ActorRef
- import akka.actor.Props
- import akka.event.LoggingReceive
- import akka.actor.ActorSystem
- import com.typesafe.config.ConfigFactory
- import akka.actor.actorRef2Scala
- import scala.util.Random
- // Exercise: implement solution to the producers/consumers problem using Akka actors
- // The Buffer actor and the main class are already implemented
- // You can use as many behaviors in Producer and Consumer actors as you find useful
- // object PC contains messages for all actors - add new if you need them
- object PC {
- val r = new Random()
- var globalCounter = 0 // just for verification: increment after a successful production,
- // decremented after a succesful consumption
- case class Init
- case class Put(x: Long)
- case class Get
- case class ProduceDone
- case class ConsumeDone(x: Long)
- }
- class Producer(name: String, buf: ActorRef) extends Actor {
- import PC._
- var proba = 0
- def receive = {
- case Init => {
- println("Zaczyna producent: "+name)
- buf ! Put (proba)
- }
- case ProduceDone => {
- proba = proba + 1
- globalCounter = globalCounter + 1
- println(name + " wyslal; za proba " + proba + "; globalCounter = " + globalCounter)
- Thread.sleep(2000+r.nextInt(2000))
- buf ! Put (proba)
- }
- }
- }
- class Consumer(name: String, buf: ActorRef) extends Actor {
- import PC._
- var proba = 0
- def receive = {
- case Init => {
- println("Zaczyna konsument: "+name)
- buf ! Get
- }
- case ConsumeDone(x : Long) => {
- proba = proba + 1
- globalCounter = globalCounter - 1
- println(name + " odebral; za proba " + proba + "; globalCounter = "+globalCounter)
- Thread.sleep(2000+r.nextInt(2000))
- buf ! Get
- }
- }
- }
- // Buffer uses "Stash" to temporarily stash messages which cannot be handled immediately:
- // - consuming from an empty buffer
- // - producing to a full buffer
- // Documentation: http://doc.akka.io/api/akka/2.0/akka/actor/Stash.html
- class Buffer(n: Int) extends Actor with Stash {
- import PC._
- private val buf = new Array[Long](n)
- private var count = 0
- override def receive = {
- case Put(x) if (count < n) => {
- buf(count) = x
- count += 1
- sender ! ProduceDone
- println(s"Buffer received $x, count=$count")
- unstashAll()
- }
- case Get if (count > 0) => {
- count -= 1
- val x = buf(count)
- sender ! ConsumeDone(x)
- println(s"Buffer sent $x, count=$count")
- unstashAll()
- }
- case _ => stash()
- }
- }
- class ProdConsMain extends Actor {
- import PC._
- // create the Buffer actor
- // special configuration is required for an actor to use Stash
- val customConfig = "bounded-buffer-dispatcher.mailbox-type=\"akka.dispatch.UnboundedDequeBasedMailbox\""
- val config = ConfigFactory.parseString(customConfig)
- val system = ActorSystem("bounded-buffer", ConfigFactory.load(config))
- val bb = system.actorOf(Props(new Buffer(10)).withDispatcher("bounded-buffer-dispatcher"), name = "bounded-buffer")
- // create Producer actors
- for (i <- 1 to 10) {
- val p = system.actorOf(Props(new Producer(s"producer-$i", bb)), name = s"producer-$i")
- p ! Init // initial message to kick off a producer
- }
- // create Consumer actors
- for (i <- 1 to 10) {
- val c = system.actorOf(Props(new Consumer(s"consumer-$i", bb)), name = s"consumer-$i")
- c ! Init // initial message to kick off a consumer
- }
- def receive = LoggingReceive {
- case x => println(x)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement