Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // 一个简单的并发可控、任务可随意拼接的任务队列实现。
- // 仅作概念演示用,细节不要纠结。
- //
- // 基本结构:
- // Context:所有任务共享的上下文,任务通过上下文交换数据
- // Dispatcher:任务队列管理器,负责创建 Context 并把它放入合适的工作队列
- // Worker:任务的抽象接口
- // XXXWorker:各个具体的任务处理逻辑
- package main
- import (
- "fmt"
- "time"
- )
- type JobId string
- type JobData string
- type WorkerFactory func() Worker
- type WorkerConfig struct {
- Name JobId
- Factory WorkerFactory
- Count int // 需要启动的 worker 数量
- }
- // 所有的任务都会读取 Context 的内容,所以这个结构会很大。
- // 当它变得过于复杂的时候需要重构,不过这就不是现在讨论的问题了。
- type Context struct {
- Jobs []JobId
- // 各种可能被用到的字段
- Data JobData
- Foo string
- Bar string
- Player string
- }
- // 任务调度器
- type Dispatcher struct {
- done chan bool
- jobChannels map[JobId]*JobChannels
- }
- type JobChannels struct {
- input chan *Context
- output chan *Context
- }
- // Worker 的接口
- type Worker interface {
- Work(input <-chan *Context, output chan<- *Context)
- }
- // 各种 worker
- type FooWorker struct{}
- type BarWorker struct{}
- type PlayerWorker struct{}
- func main() {
- fmt.Println("starting...")
- dispatcher := NewDispatcher()
- // 这里用来演示通过网络异步收到 job 的情况
- go func() {
- job1 := []JobId{"foo", "bar", "player"}
- job2 := []JobId{"foo", "player"} // 跳过 bar
- job3 := []JobId{"bar", "foo"} // 逆序
- // 执行任务,每个任务可以带一个自定义数据,现在先简单用 string,未来应该根据设计
- dispatcher.Dispatch(job1, "job1")
- dispatcher.Dispatch(job2, "job2")
- dispatcher.Dispatch(job3, "job3")
- time.Sleep(time.Second)
- dispatcher.Stop()
- }()
- dispatcher.Start()
- }
- func NewDispatcher() *Dispatcher {
- return &Dispatcher{
- done: make(chan bool),
- }
- }
- var workerConfig = []*WorkerConfig{
- &WorkerConfig{"foo", NewFooWorker, 1},
- &WorkerConfig{"bar", NewBarWorker, 2},
- &WorkerConfig{"player", NewPlayerWorker, 3},
- }
- func (d *Dispatcher) Start() {
- d.jobChannels = make(map[JobId]*JobChannels)
- // 启动足够数量的 worker
- for _, config := range workerConfig {
- channels := &JobChannels{
- input: make(chan *Context),
- output: make(chan *Context),
- }
- d.jobChannels[config.Name] = channels
- for i := 0; i < config.Count; i++ {
- worker := config.Factory()
- go worker.Work(channels.input, channels.output)
- }
- }
- // 做输入输出的调度工作
- for _, channels := range d.jobChannels {
- go d.monitor(channels.output)
- }
- <-d.done
- }
- func (d *Dispatcher) monitor(output <-chan *Context) {
- for ctx := range output {
- go d.dispatch(ctx)
- }
- }
- func (d *Dispatcher) dispatch(ctx *Context) {
- // 所有任务都完成了
- if len(ctx.Jobs) == 0 {
- fmt.Println("job is done! Name:", ctx.Data, "Data:", *ctx)
- return
- }
- // 把 ctx 放入合适的任务队列,开始执行任务
- job := ctx.Jobs[0]
- ctx.Jobs = ctx.Jobs[1:]
- channels := d.jobChannels[job]
- channels.input <- ctx
- }
- func (d *Dispatcher) Stop() {
- d.done <- true
- }
- func (d *Dispatcher) Dispatch(jobs []JobId, data JobData) {
- // 首先初始化一个上下文
- ctx := &Context{
- Jobs: jobs,
- Data: data,
- }
- // 开始派发任务
- d.dispatch(ctx)
- }
- func NewFooWorker() Worker {
- return &FooWorker{}
- }
- func NewBarWorker() Worker {
- return &BarWorker{}
- }
- func NewPlayerWorker() Worker {
- return &PlayerWorker{}
- }
- func (foo *FooWorker) Work(input <-chan *Context, output chan<- *Context) {
- for ctx := range input {
- fmt.Println("Worker foo: current job name is", ctx.Data)
- ctx.Foo = "foo-done"
- output <- ctx
- }
- }
- func (bar *BarWorker) Work(input <-chan *Context, output chan<- *Context) {
- for ctx := range input {
- fmt.Println("Worker bar: current job name is", ctx.Data)
- ctx.Bar = "bar-done"
- output <- ctx
- }
- }
- func (player *PlayerWorker) Work(input <-chan *Context, output chan<- *Context) {
- for ctx := range input {
- fmt.Println("Worker player: current job name is", ctx.Data)
- ctx.Player = "player-done"
- output <- ctx
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement