Advertisement
Guest User

Untitled

a guest
Nov 22nd, 2019
156
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Java 3.64 KB | None | 0 0
  1. class MyActor extends Actor {
  2.   override def receive = ???
  3. }
  4.  
  5. class PubSubMediator extends Actor {
  6.   override def receive = Actor.emptyBehavior
  7. }
  8.  
  9. val system = ActorSystem("pub-sub-mediator-spec-system")
  10.  
  11. system.actorOf(Props(new PubSubMediator), "pub-sub-mediator")
  12.  
  13. object PubSubMediator {
  14.  
  15.   final val Name = "pub-sub-mediator"
  16.  
  17.   def props: Props = Props(new PubSubMediator)
  18. }
  19.  
  20. class PubSubMediator extends Actor {
  21.   override def receive = Actor.emptyBehavior
  22. }
  23.  
  24. class PubSubMediatorSpec extends WordSpec with Matchers with BeforeAndAfterAll {
  25.  
  26.   implicit val system = ActorSystem("pub-sub-mediator-spec-system")
  27.  
  28.   "A PubSubMediator" should {
  29.     "be suited for getting started" in {
  30.       EventFilter.debug(occurrences = 1, pattern = s"started.*${classOf[PubSubMediator].getName}").intercept {
  31.         system.actorOf(PubSubMediator.props)
  32.       }
  33.     }
  34.   }
  35.  
  36.   override protected def afterAll() = {
  37.     Await.ready(system.terminate(), Duration.Inf)
  38.     super.afterAll()
  39.   }
  40. }
  41.  
  42. mediator ! GetSubscribers("topic")
  43.  
  44. override def receive = {
  45.   case Subscribe(topic) =>
  46.     // ИМЕННО ТУТ обрабатывается подписка
  47.     sender() ! Subscribed
  48. }
  49.  
  50. object PubSubMediator {
  51.  
  52.   case class Publish(topic: String, message: Any)
  53.   case class Published(publish: Publish)
  54.  
  55.   case class Subscribe(topic: String, subscriber: ActorRef)
  56.   case class Subscribed(subscribe: Subscribe)
  57.   case class AlreadySubscribed(subscribe: Subscribe)
  58.  
  59.   case class Unsubscribe(topic: String, subscriber: ActorRef)
  60.   case class Unsubscribed(unsubscribe: Unsubscribe)
  61.   case class NotSubscribed(unsubscribe: Unsubscribe)
  62.  
  63.   case class GetSubscribers(topic: String)
  64.  
  65.   final val Name = "pub-sub-mediator"
  66.  
  67.   def props: Props = Props(new PubSubMediator)
  68. }
  69.  
  70. class PubSubMediator extends Actor {
  71.   import PubSubMediator._
  72.  
  73.   private var subscribers = Map.empty[String, Set[ActorRef]].withDefaultValue(Set.empty)
  74.  
  75.   override def receive = {
  76.     case publish @ Publish(topic, message) =>
  77.       subscribers(topic).foreach(_ ! message)
  78.       sender() ! Published(publish)
  79.  
  80.     case subscribe @ Subscribe(topic, subscriber) if subscribers(topic).contains(subscriber) =>
  81.       sender() ! AlreadySubscribed(subscribe)
  82.  
  83.     case subscribe @ Subscribe(topic, subscriber) =>
  84.       subscribers += topic -> (subscribers(topic) + subscriber)
  85.       sender() ! Subscribed(subscribe)
  86.  
  87.     case unsubscribe @ Unsubscribe(topic, subscriber) if !subscribers(topic).contains(subscriber) =>
  88.       sender() ! NotSubscribed(unsubscribe)
  89.  
  90.     case unsubscribe @ Unsubscribe(topic, subscriber) =>
  91.       subscribers += topic -> (subscribers(topic) - subscriber)
  92.       sender() ! Unsubscribed(unsubscribe)
  93.  
  94.     case GetSubscribers(topic) =>
  95.       sender() ! subscribers(topic)
  96.   }
  97. }
  98.  
  99. val subscribe01 = Subscribe(topic01, subscriber01.ref)
  100. mediator ! subscribe01
  101. sender.expectMsg(Subscribed(subscribe01))
  102.  
  103. val subscribe02 = Subscribe(topic01, subscriber02.ref)
  104. mediator ! subscribe02
  105. sender.expectMsg(Subscribed(subscribe02))
  106.  
  107. val subscribe03 = Subscribe(topic02, subscriber03.ref)
  108. mediator ! subscribe03
  109. sender.expectMsg(Subscribed(subscribe03))
  110.  
  111. context.watch(subscriber)
  112.  
  113. class PubSubMediator extends Actor {
  114.   import PubSubMediator._
  115.  
  116.   ...
  117.  
  118.   override def receive = {
  119.     ...
  120.  
  121.     case subscribe @ Subscribe(topic, subscriber) =>
  122.       subscribers += topic -> (subscribers(topic) + subscriber)
  123.       context.watch(subscriber)
  124.       sender() ! Subscribed(subscribe)
  125.  
  126.     ...
  127.  
  128.     case Terminated(subscriber) =>
  129.       subscribers = subscribers.map { case (topic, ss) => topic -> (ss - subscriber) }
  130.   }
  131. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement