Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- object Actor {
- import java.util.concurrent.{ConcurrentLinkedQueue, Executor}
- import java.util.concurrent.atomic.{AtomicBoolean}
- sealed trait Instr
- case object Continue extends Instr
- case class Become(b: Any => Instr) extends Instr
- case object Dead extends Instr
- trait Address {
- def !(msg: Any): Unit
- }
- def apply(initial: Any => Instr)(implicit e: Executor): Address = {
- val a: Address = new Address with Runnable {
- private final val mbox = new ConcurrentLinkedQueue[Any]
- private final val on = new AtomicBoolean(false)
- private var behavior: Any => Instr = {
- case i: Instr => i
- case _ => println("Unknown message"); Continue
- }
- final def trySchedule(): Unit = if(!mbox.isEmpty && on.compareAndSet(false, true)) {
- try e.execute(this) catch {
- case anything => assert(on.getAndSet(false)); throw anything
- }
- }
- final def reactTo(msg: Any): Unit = behavior(msg) match {
- case Become(newBeh) => behavior = newBeh
- case Dead => println("Dead, cannot deal with: " + msg)
- case Continue => //Just remain as you were
- }
- final def run() = try reactTo(mbox.poll()) finally {
- assert(on.getAndSet(false))
- trySchedule()
- }
- final override def !(msg: Any): Unit = {
- assert(mbox.offer(msg))
- trySchedule()
- }
- }
- a ! Become(initial)
- a
- }
- }
Add Comment
Please, Sign In to add comment