Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class MyActor extends Actor {
- override def receive = ???
- }
- class PubSubMediator extends Actor {
- override def receive = Actor.emptyBehavior
- }
- val system = ActorSystem("pub-sub-mediator-spec-system")
- system.actorOf(Props(new PubSubMediator), "pub-sub-mediator")
- object PubSubMediator {
- final val Name = "pub-sub-mediator"
- def props: Props = Props(new PubSubMediator)
- }
- class PubSubMediator extends Actor {
- override def receive = Actor.emptyBehavior
- }
- class PubSubMediatorSpec extends WordSpec with Matchers with BeforeAndAfterAll {
- implicit val system = ActorSystem("pub-sub-mediator-spec-system")
- "A PubSubMediator" should {
- "be suited for getting started" in {
- EventFilter.debug(occurrences = 1, pattern = s"started.*${classOf[PubSubMediator].getName}").intercept {
- system.actorOf(PubSubMediator.props)
- }
- }
- }
- override protected def afterAll() = {
- Await.ready(system.terminate(), Duration.Inf)
- super.afterAll()
- }
- }
- mediator ! GetSubscribers("topic")
- override def receive = {
- case Subscribe(topic) =>
- // ИМЕННО ТУТ обрабатывается подписка
- sender() ! Subscribed
- }
- object PubSubMediator {
- case class Publish(topic: String, message: Any)
- case class Published(publish: Publish)
- case class Subscribe(topic: String, subscriber: ActorRef)
- case class Subscribed(subscribe: Subscribe)
- case class AlreadySubscribed(subscribe: Subscribe)
- case class Unsubscribe(topic: String, subscriber: ActorRef)
- case class Unsubscribed(unsubscribe: Unsubscribe)
- case class NotSubscribed(unsubscribe: Unsubscribe)
- case class GetSubscribers(topic: String)
- final val Name = "pub-sub-mediator"
- def props: Props = Props(new PubSubMediator)
- }
- class PubSubMediator extends Actor {
- import PubSubMediator._
- private var subscribers = Map.empty[String, Set[ActorRef]].withDefaultValue(Set.empty)
- override def receive = {
- case publish @ Publish(topic, message) =>
- subscribers(topic).foreach(_ ! message)
- sender() ! Published(publish)
- case subscribe @ Subscribe(topic, subscriber) if subscribers(topic).contains(subscriber) =>
- sender() ! AlreadySubscribed(subscribe)
- case subscribe @ Subscribe(topic, subscriber) =>
- subscribers += topic -> (subscribers(topic) + subscriber)
- sender() ! Subscribed(subscribe)
- case unsubscribe @ Unsubscribe(topic, subscriber) if !subscribers(topic).contains(subscriber) =>
- sender() ! NotSubscribed(unsubscribe)
- case unsubscribe @ Unsubscribe(topic, subscriber) =>
- subscribers += topic -> (subscribers(topic) - subscriber)
- sender() ! Unsubscribed(unsubscribe)
- case GetSubscribers(topic) =>
- sender() ! subscribers(topic)
- }
- }
- val subscribe01 = Subscribe(topic01, subscriber01.ref)
- mediator ! subscribe01
- sender.expectMsg(Subscribed(subscribe01))
- val subscribe02 = Subscribe(topic01, subscriber02.ref)
- mediator ! subscribe02
- sender.expectMsg(Subscribed(subscribe02))
- val subscribe03 = Subscribe(topic02, subscriber03.ref)
- mediator ! subscribe03
- sender.expectMsg(Subscribed(subscribe03))
- context.watch(subscriber)
- class PubSubMediator extends Actor {
- import PubSubMediator._
- ...
- override def receive = {
- ...
- case subscribe @ Subscribe(topic, subscriber) =>
- subscribers += topic -> (subscribers(topic) + subscriber)
- context.watch(subscriber)
- sender() ! Subscribed(subscribe)
- ...
- case Terminated(subscriber) =>
- subscribers = subscribers.map { case (topic, ss) => topic -> (ss - subscriber) }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement