Advertisement
mfgnik

Untitled

Apr 5th, 2020
868
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 0.60 KB | None | 0 0
  1. func (p *MyPubSub) Subscribe(subj string, cb MsgHandler) (Subscription, error) {
  2.     p.mu.Lock()
  3.     defer p.mu.Unlock()
  4.     subscription := MySubscription{pubSub: p, subj: subj, cb: cb}
  5.     p.subs[subj] = append(p.subs[subj], &subscription)
  6.     if _, ok := p.conds[subj]; !ok {
  7.         p.conds[subj] = sync.NewCond(&p.mu)
  8.         p.chans[subj] = make(chan interface{})
  9.         go func() {
  10.             p.mu.Lock()
  11.             defer p.mu.Unlock()
  12.             for {
  13.                 p.conds[subj].Wait()
  14.                 select {
  15.                 case msg := <- p.chans[subj]:
  16.                     for _, sub := range p.subs[subj] {
  17.                         sub.cb(msg)
  18.                     }
  19.                 }
  20.             }
  21.         }()
  22.     }
  23.     return &subscription, nil
  24. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement