Advertisement
Guest User

Untitled

a guest
Mar 30th, 2017
59
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.32 KB | None | 0 0
  1. package common
  2.  
  3. import (
  4. "time"
  5. )
  6.  
  7. type Task func()
  8.  
  9. type dispatcher struct {
  10. WorkerPool chan chan Task
  11. TaskQueue chan Task
  12. }
  13.  
  14. type workerPoolType chan chan Task
  15.  
  16. type worker struct {
  17. ID int
  18. WorkerPool workerPoolType
  19. Task chan (Task)
  20. QuitChan chan bool
  21. }
  22.  
  23. func NewDispatcher() *dispatcher {
  24. var D dispatcher
  25. D.WorkerPool = make(workerPoolType, Config.Workers)
  26. D.TaskQueue = make(chan Task, 1)
  27. for i := 0; i < Config.Workers; i++ {
  28. worker := NewWorker(i+1, D.WorkerPool)
  29. worker.Start()
  30. }
  31. return &D
  32. }
  33.  
  34. func (D *dispatcher) AddTicker(task Task, interval time.Duration) {
  35. go func() {
  36. for {
  37. select {
  38. case D.TaskQueue <- task:
  39. }
  40. select {
  41. case <-time.After(interval):
  42. }
  43. }
  44. }()
  45. }
  46.  
  47. func (D *dispatcher) Start() {
  48. for {
  49. select {
  50. case task := <-D.TaskQueue:
  51. go func() {
  52. worker := <-D.WorkerPool
  53. worker <- task
  54. }()
  55. }
  56. }
  57. }
  58.  
  59. func (w *worker) Start() {
  60. go w.tasksProcessor()
  61. }
  62.  
  63. func (w *worker) tasksProcessor() {
  64. for {
  65. w.WorkerPool <- w.Task
  66. select {
  67. case work := <-w.Task:
  68. work()
  69. case <-w.QuitChan:
  70. return
  71. }
  72. }
  73.  
  74. }
  75.  
  76. func (w worker) Stop() {
  77. go func() {
  78. w.QuitChan <- true
  79. }()
  80. }
  81.  
  82. func NewWorker(id int, workerQueue chan chan Task) *worker {
  83. return &worker{
  84. ID: id,
  85. WorkerPool: workerQueue,
  86. Task: make(chan Task),
  87. QuitChan: make(chan bool),
  88. }
  89. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement