Advertisement
Guest User

Untitled

a guest
Feb 20th, 2017
83
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.33 KB | None | 0 0
  1. package pool
  2.  
  3. import (
  4. "errors"
  5. "fmt"
  6. "log"
  7. "sort"
  8. "time"
  9. )
  10.  
  11. type action func(*Pool) error
  12.  
  13. type Message struct {
  14. id string
  15. body string
  16. }
  17.  
  18. type worker struct {
  19. msgIn chan *Message
  20. lastMsgID string
  21. stopSignal chan struct{}
  22. }
  23.  
  24. type workerGroup []*worker
  25.  
  26. func (ws workerGroup) Len() int { return len(ws) }
  27. func (ws workerGroup) Swap(i, j int) { ws[i], ws[j] = ws[j], ws[i] }
  28. func (ws workerGroup) Less(i, j int) bool { return len(ws[i].msgIn) < len(ws[j].msgIn) }
  29.  
  30. type Pool struct {
  31. workers workerGroup
  32. actionsIn chan action
  33. stopSignal chan struct{}
  34. }
  35.  
  36. func NewPool(numWorkers int) *Pool {
  37. pool := &Pool{
  38. actionsIn: make(chan action),
  39. stopSignal: make(chan struct{}),
  40. }
  41.  
  42. for i := 0; i < numWorkers; i++ {
  43. worker := &worker{
  44. msgIn: make(chan *Message, 1024),
  45. }
  46. pool.workers = append(pool.workers, worker)
  47. }
  48.  
  49. go pool.start()
  50. return pool
  51. }
  52.  
  53. func (p *Pool) start() {
  54. for _, w := range p.workers {
  55. go w.start()
  56. }
  57.  
  58. for {
  59. select {
  60. case act := <-p.actionsIn:
  61. err := act(p)
  62. if err != nil {
  63. log.Println(err)
  64. }
  65. case <-p.stopSignal:
  66. return
  67. }
  68. }
  69. }
  70.  
  71. func (p *Pool) getWorker(id string) *worker {
  72.  
  73. for _, w := range p.workers {
  74. if w.lastMsgID == id {
  75. return w
  76. }
  77. }
  78.  
  79. sort.Sort(p.workers)
  80. return p.workers[0]
  81.  
  82. }
  83.  
  84. func (w *worker) stop() {
  85. w.stopSignal <- struct{}{}
  86. <-w.stopSignal
  87. }
  88.  
  89. func (w *worker) start() {
  90. for {
  91. select {
  92. case msg := <-w.msgIn:
  93. w.lastMsgID = msg.id
  94.  
  95. // do the work
  96. if msg.id == "7" {
  97. time.Sleep(200 * time.Millisecond)
  98. fmt.Print(msg.id)
  99. } else {
  100. fmt.Print(".")
  101. }
  102.  
  103. case <-w.stopSignal:
  104. w.stopSignal <- struct{}{}
  105. return
  106. }
  107. }
  108. }
  109.  
  110. func (p *Pool) ProcessMessage(m *Message) {
  111. p.actionsIn <- processMessage(m)
  112. }
  113.  
  114. func (p *Pool) Stop() {
  115. p.actionsIn <- stop()
  116. }
  117.  
  118. // just an example
  119. func addWorker() action {
  120. return func(p *Pool) error {
  121. return errors.New("not implemented")
  122. }
  123. }
  124.  
  125. func processMessage(msg *Message) action {
  126. return func(p *Pool) error {
  127. w := p.getWorker(msg.id)
  128. select {
  129. case w.msgIn <- msg:
  130. default:
  131. return fmt.Errorf("internal buffer is full dropping message: %s", msg.id)
  132. }
  133. return nil
  134. }
  135. }
  136.  
  137. func stop() action {
  138. return func(p *Pool) error {
  139. for _, w := range p.workers {
  140. w.stop()
  141. }
  142. p.stopSignal <- struct{}{}
  143. return nil
  144. }
  145. }
  146.  
  147. func NewMessage(id, body string) *Message {
  148. return &Message{id, body}
  149. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement