Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package common
- import (
- "time"
- )
- type Task func()
- type dispatcher struct {
- WorkerPool chan chan Task
- TaskQueue chan Task
- }
- type workerPoolType chan chan Task
- type worker struct {
- ID int
- WorkerPool workerPoolType
- Task chan (Task)
- QuitChan chan bool
- }
- func NewDispatcher() *dispatcher {
- var D dispatcher
- D.WorkerPool = make(workerPoolType, Config.Workers)
- D.TaskQueue = make(chan Task, 1)
- for i := 0; i < Config.Workers; i++ {
- worker := NewWorker(i+1, D.WorkerPool)
- worker.Start()
- }
- return &D
- }
- func (D *dispatcher) AddTicker(task Task, interval time.Duration) {
- go func() {
- for {
- select {
- case D.TaskQueue <- task:
- }
- select {
- case <-time.After(interval):
- }
- }
- }()
- }
- func (D *dispatcher) Start() {
- for {
- select {
- case task := <-D.TaskQueue:
- go func() {
- worker := <-D.WorkerPool
- worker <- task
- }()
- }
- }
- }
- func (w *worker) Start() {
- go w.tasksProcessor()
- }
- func (w *worker) tasksProcessor() {
- for {
- w.WorkerPool <- w.Task
- select {
- case work := <-w.Task:
- work()
- case <-w.QuitChan:
- return
- }
- }
- }
- func (w worker) Stop() {
- go func() {
- w.QuitChan <- true
- }()
- }
- func NewWorker(id int, workerQueue chan chan Task) *worker {
- return &worker{
- ID: id,
- WorkerPool: workerQueue,
- Task: make(chan Task),
- QuitChan: make(chan bool),
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement