Advertisement
Guest User

Untitled

a guest
Feb 3rd, 2015
161
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 1.33 KB | None | 0 0
  1. package clustertest.actor
  2.  
  3. import akka.actor.{Terminated, Actor, ActorRef, RootActorPath}
  4. import akka.cluster.Cluster
  5. import akka.cluster.ClusterEvent.MemberUp
  6.  
  7. import scala.concurrent.ExecutionContext.Implicits.global
  8. import scala.concurrent.duration._
  9.  
  10. case class TellFriends()
  11. case class FindFriend()
  12. case class RegisterFriend()
  13. case class Message(text: String)
  14.  
  15. class MessageActor extends Actor {
  16.   val cluster = Cluster(context.system)
  17.  
  18.   var friends = IndexedSeq.empty[ActorRef]
  19.  
  20.   override def preStart() = cluster.subscribe(self, classOf[MemberUp])
  21.  
  22.   override def postStop() = cluster.unsubscribe(self)
  23.  
  24.   context.system.scheduler.schedule(Duration.Zero, 1.seconds, self, TellFriends())
  25.  
  26.   def receive = {
  27.     case Message(text) =>
  28.       println(text)
  29.  
  30.     case TellFriends() =>
  31.       friends.foreach(_ ! Message("Hai from " + cluster.selfAddress.port.get))
  32.  
  33.     case FindFriend() => sender() ! RegisterFriend()
  34.  
  35.     case RegisterFriend() =>
  36.       if (sender() != self) {
  37.         context.watch(sender())
  38.         friends = friends :+ sender()
  39.       }
  40.  
  41.     case MemberUp(friend) =>
  42.       context.actorSelection(RootActorPath(friend.address) / "user" / "message") ! FindFriend()
  43.  
  44.     case Terminated(friend) =>
  45.       println(friend.path + " terminated!")
  46.       friends = friends.filterNot(_ == friend)
  47.   }
  48. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement