Advertisement
Guest User

Untitled

a guest
Jan 28th, 2015
224
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.74 KB | None | 0 0
  1. // 一个简单的并发可控、任务可随意拼接的任务队列实现。
  2. // 仅作概念演示用,细节不要纠结。
  3. //
  4. // 基本结构:
  5. // Task:当前任务共享的上下文,任务通过上下文交换数据,一个任务可分为很多的工作(Work)
  6. // Dispatcher:任务队列管理器,负责创建 Task 并使用合适的 Worker 来处理数据
  7. // Worker:任务的抽象接口
  8. // XXXWorker:各个具体的任务处理逻辑
  9. // WorkerBench:一个 Worker 池,确保当前正在运行的 Worker 数量不超过限制
  10. package main
  11.  
  12. import (
  13. "fmt"
  14. "math/rand"
  15. "strconv"
  16. "time"
  17. )
  18.  
  19. type WorkerId string
  20. type TaskData string
  21. type WorkerFactory func() Worker
  22.  
  23. type WorkerConfig struct {
  24. Name WorkerId
  25. Factory WorkerFactory
  26. Count int // 需要启动的 worker 数量
  27. }
  28.  
  29. // 所有的任务都会读取 Task 的内容,所以这个结构会很大。
  30. // 当它变得过于复杂的时候需要重构,不过这就不是现在讨论的问题了。
  31. type Task struct {
  32. // 各种可能被用到的字段
  33. Data TaskData
  34. Foo string
  35. Bar string
  36. Player string
  37. }
  38.  
  39. // 任务调度器
  40. type Dispatcher struct {
  41. done chan bool
  42. workerBenches map[WorkerId]*WorkerBench
  43. }
  44.  
  45. // 用来创建 Worker,并限制同时工作的 Worker 总数。
  46. type WorkerBench struct {
  47. throttle chan bool
  48. factory WorkerFactory
  49. }
  50.  
  51. // Worker 的接口
  52. type Worker interface {
  53. Work(*Task)
  54. }
  55.  
  56. // 各种 worker
  57. type FooWorker struct{}
  58. type BarWorker struct{}
  59. type PlayerWorker struct{}
  60.  
  61. func main() {
  62. fmt.Println("starting...")
  63. dispatcher := NewDispatcher()
  64.  
  65. // 这里用来演示通过网络异步收到 work 的情况
  66. go func() {
  67. testWorks := [][]WorkerId{
  68. []WorkerId{"foo", "bar", "player"},
  69. []WorkerId{"foo", "bar", "player"},
  70. []WorkerId{"foo", "bar", "player"},
  71. []WorkerId{"foo", "bar", "player"},
  72. []WorkerId{"foo", "bar", "player"},
  73. []WorkerId{"foo", "bar", "player"},
  74. []WorkerId{"foo", "player"}, // 跳过 bar
  75. []WorkerId{"foo", "player"}, // 跳过 bar
  76. []WorkerId{"foo", "player"}, // 跳过 bar
  77. []WorkerId{"foo", "player"}, // 跳过 bar
  78. []WorkerId{"foo", "player"}, // 跳过 bar
  79. []WorkerId{"foo", "player"}, // 跳过 bar
  80. []WorkerId{"bar", "foo"}, // 逆序
  81. []WorkerId{"bar", "foo"}, // 逆序
  82. []WorkerId{"bar", "foo"}, // 逆序
  83. []WorkerId{"bar", "foo"}, // 逆序
  84. []WorkerId{"bar", "foo"}, // 逆序
  85. }
  86.  
  87. // 执行任务,每个任务可以带一个自定义数据,现在先简单用 string
  88. for i, works := range testWorks {
  89. dispatcher.Exec(works, TaskData("work"+strconv.Itoa(i)))
  90. }
  91.  
  92. time.Sleep(time.Second)
  93. dispatcher.Stop()
  94. }()
  95.  
  96. dispatcher.Start()
  97. }
  98.  
  99. func NewDispatcher() *Dispatcher {
  100. return &Dispatcher{
  101. done: make(chan bool),
  102. }
  103. }
  104.  
  105. var workerConfig = []*WorkerConfig{
  106. &WorkerConfig{"foo", NewFooWorker, 2},
  107. &WorkerConfig{"bar", NewBarWorker, 1},
  108. &WorkerConfig{"player", NewPlayerWorker, 3},
  109. }
  110.  
  111. func (d *Dispatcher) Start() {
  112. workerBenches := make(map[WorkerId]*WorkerBench)
  113.  
  114. // 初始化 WorkerBench
  115. for _, config := range workerConfig {
  116. workerBenches[config.Name] = NewWorkerBench(config.Factory, config.Count)
  117. }
  118.  
  119. d.workerBenches = workerBenches
  120.  
  121. <-d.done
  122. }
  123.  
  124. func (d *Dispatcher) Stop() {
  125. d.done <- true
  126. }
  127.  
  128. // 对指定的数据 data 执行一系列工作。
  129. func (d *Dispatcher) Exec(works []WorkerId, data TaskData) {
  130. go d.exec(works, data)
  131. }
  132.  
  133. func (d *Dispatcher) exec(works []WorkerId, data TaskData) {
  134. // 首先初始化一个上下文
  135. task := &Task{
  136. Data: data,
  137. }
  138.  
  139. // 开始执行所有的任务
  140. for _, work := range works {
  141. bench := d.workerBenches[work]
  142. bench.Work(task)
  143. }
  144. }
  145.  
  146. // 初始化一个 WorkerBench,默认标记所有 Worker 都为空闲。
  147. func NewWorkerBench(factory WorkerFactory, count int) *WorkerBench {
  148. throttle := make(chan bool, count)
  149.  
  150. for i := 0; i < count; i++ {
  151. throttle <- true
  152. }
  153.  
  154. return &WorkerBench{
  155. throttle: throttle,
  156. factory: factory,
  157. }
  158. }
  159.  
  160. // 开始执行一项任务。
  161. func (b *WorkerBench) Work(task *Task) {
  162. <-b.throttle
  163. worker := b.factory()
  164. worker.Work(task)
  165. b.throttle <- true
  166. }
  167.  
  168. func NewFooWorker() Worker {
  169. return &FooWorker{}
  170. }
  171.  
  172. func NewBarWorker() Worker {
  173. return &BarWorker{}
  174. }
  175.  
  176. func NewPlayerWorker() Worker {
  177. return &PlayerWorker{}
  178. }
  179.  
  180. func (foo *FooWorker) Work(task *Task) {
  181. fmt.Println("Worker foo: current work name is", task.Data)
  182. task.Foo = "foo-done"
  183. time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
  184. }
  185.  
  186. func (bar *BarWorker) Work(task *Task) {
  187. fmt.Println("Worker bar: current work name is", task.Data)
  188. task.Bar = "bar-done"
  189. time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
  190. }
  191.  
  192. func (player *PlayerWorker) Work(task *Task) {
  193. fmt.Println("Worker player: current work name is", task.Data)
  194. task.Player = "player-done"
  195. time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
  196. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement