Advertisement
Guest User

Untitled

a guest
Jan 29th, 2015
272
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.09 KB | None | 0 0
  1. // 一个简单的并发可控、任务可随意拼接的任务队列实现。
  2. // 仅作概念演示用,细节不要纠结。
  3. //
  4. // 基本结构:
  5. // Context:所有任务共享的上下文,任务通过上下文交换数据
  6. // Dispatcher:任务队列管理器,负责创建 Context 并把它放入合适的工作队列
  7. // Worker:任务的抽象接口
  8. // XXXWorker:各个具体的任务处理逻辑
  9. package main
  10.  
  11. import (
  12. "fmt"
  13. "time"
  14. )
  15.  
  16. type JobId string
  17. type JobData string
  18. type WorkerFactory func() Worker
  19.  
  20. type WorkerConfig struct {
  21. Name JobId
  22. Factory WorkerFactory
  23. Count int // 需要启动的 worker 数量
  24. }
  25.  
  26. // 所有的任务都会读取 Context 的内容,所以这个结构会很大。
  27. // 当它变得过于复杂的时候需要重构,不过这就不是现在讨论的问题了。
  28. type Context struct {
  29. Jobs []JobId
  30.  
  31. // 各种可能被用到的字段
  32. Data JobData
  33. Foo string
  34. Bar string
  35. Player string
  36. }
  37.  
  38. // 任务调度器
  39. type Dispatcher struct {
  40. done chan bool
  41. jobChannels map[JobId]*JobChannels
  42. }
  43.  
  44. type JobChannels struct {
  45. input chan *Context
  46. output chan *Context
  47. }
  48.  
  49. // Worker 的接口
  50. type Worker interface {
  51. Work(input <-chan *Context, output chan<- *Context)
  52. }
  53.  
  54. // 各种 worker
  55. type FooWorker struct{}
  56. type BarWorker struct{}
  57. type PlayerWorker struct{}
  58.  
  59. func main() {
  60. fmt.Println("starting...")
  61. dispatcher := NewDispatcher()
  62.  
  63. // 这里用来演示通过网络异步收到 job 的情况
  64. go func() {
  65. job1 := []JobId{"foo", "bar", "player"}
  66. job2 := []JobId{"foo", "player"} // 跳过 bar
  67. job3 := []JobId{"bar", "foo"} // 逆序
  68.  
  69. // 执行任务,每个任务可以带一个自定义数据,现在先简单用 string,未来应该根据设计
  70. dispatcher.Dispatch(job1, "job1")
  71. dispatcher.Dispatch(job2, "job2")
  72. dispatcher.Dispatch(job3, "job3")
  73.  
  74. time.Sleep(time.Second)
  75. dispatcher.Stop()
  76. }()
  77.  
  78. dispatcher.Start()
  79. }
  80.  
  81. func NewDispatcher() *Dispatcher {
  82. return &Dispatcher{
  83. done: make(chan bool),
  84. }
  85. }
  86.  
  87. var workerConfig = []*WorkerConfig{
  88. &WorkerConfig{"foo", NewFooWorker, 1},
  89. &WorkerConfig{"bar", NewBarWorker, 2},
  90. &WorkerConfig{"player", NewPlayerWorker, 3},
  91. }
  92.  
  93. func (d *Dispatcher) Start() {
  94. d.jobChannels = make(map[JobId]*JobChannels)
  95.  
  96. // 启动足够数量的 worker
  97. for _, config := range workerConfig {
  98. channels := &JobChannels{
  99. input: make(chan *Context),
  100. output: make(chan *Context),
  101. }
  102. d.jobChannels[config.Name] = channels
  103.  
  104. for i := 0; i < config.Count; i++ {
  105. worker := config.Factory()
  106. go worker.Work(channels.input, channels.output)
  107. }
  108. }
  109.  
  110. // 做输入输出的调度工作
  111. for _, channels := range d.jobChannels {
  112. go d.monitor(channels.output)
  113. }
  114.  
  115. <-d.done
  116. }
  117.  
  118. func (d *Dispatcher) monitor(output <-chan *Context) {
  119. for ctx := range output {
  120. go d.dispatch(ctx)
  121. }
  122. }
  123.  
  124. func (d *Dispatcher) dispatch(ctx *Context) {
  125. // 所有任务都完成了
  126. if len(ctx.Jobs) == 0 {
  127. fmt.Println("job is done! Name:", ctx.Data, "Data:", *ctx)
  128. return
  129. }
  130.  
  131. // 把 ctx 放入合适的任务队列,开始执行任务
  132. job := ctx.Jobs[0]
  133. ctx.Jobs = ctx.Jobs[1:]
  134. channels := d.jobChannels[job]
  135. channels.input <- ctx
  136. }
  137.  
  138. func (d *Dispatcher) Stop() {
  139. d.done <- true
  140. }
  141.  
  142. func (d *Dispatcher) Dispatch(jobs []JobId, data JobData) {
  143. // 首先初始化一个上下文
  144. ctx := &Context{
  145. Jobs: jobs,
  146. Data: data,
  147. }
  148.  
  149. // 开始派发任务
  150. d.dispatch(ctx)
  151. }
  152.  
  153. func NewFooWorker() Worker {
  154. return &FooWorker{}
  155. }
  156.  
  157. func NewBarWorker() Worker {
  158. return &BarWorker{}
  159. }
  160.  
  161. func NewPlayerWorker() Worker {
  162. return &PlayerWorker{}
  163. }
  164.  
  165. func (foo *FooWorker) Work(input <-chan *Context, output chan<- *Context) {
  166. for ctx := range input {
  167. fmt.Println("Worker foo: current job name is", ctx.Data)
  168. ctx.Foo = "foo-done"
  169. output <- ctx
  170. }
  171. }
  172.  
  173. func (bar *BarWorker) Work(input <-chan *Context, output chan<- *Context) {
  174. for ctx := range input {
  175. fmt.Println("Worker bar: current job name is", ctx.Data)
  176. ctx.Bar = "bar-done"
  177. output <- ctx
  178. }
  179. }
  180.  
  181. func (player *PlayerWorker) Work(input <-chan *Context, output chan<- *Context) {
  182. for ctx := range input {
  183. fmt.Println("Worker player: current job name is", ctx.Data)
  184. ctx.Player = "player-done"
  185. output <- ctx
  186. }
  187. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement