Advertisement
Guest User

Untitled

a guest
Jun 25th, 2024
123
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.09 KB | Sports | 0 0
  1. package main
  2.  
  3. import (
  4. "context"
  5. "sync"
  6. "time"
  7.  
  8. "github.com/pkg/errors"
  9. )
  10.  
  11. // PubSub contains and manage the map of topics -> subscribers
  12. type PubSub[T any] struct {
  13. sync.Mutex
  14. m map[string][]*Sub[T]
  15. }
  16.  
  17. func NewPubSub[T any]() *PubSub[T] {
  18. ps := PubSub[T]{}
  19. ps.m = make(map[string][]*Sub[T])
  20. return &ps
  21. }
  22.  
  23. func (p *PubSub[T]) getSubscribers(topic string) []*Sub[T] {
  24. p.Lock()
  25. defer p.Unlock()
  26. return p.m[topic]
  27. }
  28.  
  29. func (p *PubSub[T]) addSubscriber(s *Sub[T]) {
  30. p.Lock()
  31. for _, topic := range s.topics {
  32. p.m[topic] = append(p.m[topic], s)
  33. }
  34. p.Unlock()
  35. }
  36.  
  37. func (p *PubSub[T]) removeSubscriber(s *Sub[T]) {
  38. p.Lock()
  39. for _, topic := range s.topics {
  40. for i, subscriber := range p.m[topic] {
  41. if subscriber == s {
  42. p.m[topic] = append(p.m[topic][:i], p.m[topic][i+1:]...)
  43. break
  44. }
  45. }
  46. }
  47. p.Unlock()
  48. }
  49.  
  50. // Subscribe is an alias for NewSub
  51. func (p *PubSub[T]) Subscribe(topics []string) *Sub[T] {
  52. ctx, cancel := context.WithCancel(context.Background())
  53. s := &Sub[T]{topics: topics, ch: make(chan Payload[T], 10), ctx: ctx, cancel: cancel, p: p}
  54. p.addSubscriber(s)
  55. return s
  56. }
  57.  
  58. // Pub shortcut for publish which ignore the error
  59. func (p *PubSub[T]) Pub(topic string, msg T) {
  60. for _, s := range p.getSubscribers(topic) {
  61. s.publish(Payload[T]{topic, msg})
  62. }
  63. }
  64.  
  65. type Payload[T any] struct {
  66. Topic string
  67. Msg T
  68. }
  69.  
  70. // ErrTimeout error returned when timeout occurs
  71. var ErrTimeout = errors.New("timeout")
  72.  
  73. // ErrCancelled error returned when context is cancelled
  74. var ErrCancelled = errors.New("cancelled")
  75.  
  76. // Sub subscriber will receive messages published on a Topic in his ch
  77. type Sub[T any] struct {
  78. topics []string // Topics subscribed to
  79. ch chan Payload[T] // Receives messages in this channel
  80. ctx context.Context
  81. cancel context.CancelFunc
  82. p *PubSub[T]
  83. }
  84.  
  85. // ReceiveTimeout2 returns a message received on the channel or timeout
  86. func (s *Sub[T]) ReceiveTimeout2(timeout time.Duration, c1 <-chan struct{}) (topic string, msg T, err error) {
  87. select {
  88. case p := <-s.ch:
  89. return p.Topic, p.Msg, nil
  90. case <-time.After(timeout):
  91. return topic, msg, ErrTimeout
  92. case <-c1:
  93. return topic, msg, ErrCancelled
  94. case <-s.ctx.Done():
  95. return topic, msg, ErrCancelled
  96. }
  97. }
  98.  
  99. // ReceiveTimeout returns a message received on the channel or timeout
  100. func (s *Sub[T]) ReceiveTimeout(timeout time.Duration) (topic string, msg T, err error) {
  101. c1 := make(chan struct{})
  102. return s.ReceiveTimeout2(timeout, c1)
  103. }
  104.  
  105. // Receive returns a message
  106. func (s *Sub[T]) Receive() (topic string, msg T, err error) {
  107. var res T
  108. select {
  109. case p := <-s.ch:
  110. return p.Topic, p.Msg, nil
  111. case <-s.ctx.Done():
  112. return topic, res, ErrCancelled
  113. }
  114. }
  115.  
  116. // ReceiveCh returns a message
  117. func (s *Sub[T]) ReceiveCh() <-chan Payload[T] {
  118. return s.ch
  119. }
  120.  
  121. // Close will remove the subscriber from the Topic subscribers
  122. func (s *Sub[T]) Close() {
  123. s.cancel()
  124. s.p.removeSubscriber(s)
  125. }
  126.  
  127. // publish a message to the subscriber channel
  128. func (s *Sub[T]) publish(p Payload[T]) {
  129. select {
  130. case s.ch <- p:
  131. default:
  132. }
  133. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement