Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- // очередь без приоритетотв
- // таски на запись не должны делаться параллельно с читающими
- // писатель в любой момент времени может быть только 1
- // таски на чтение могут быть параллельны друг с другом
- import (
- "fmt"
- "log"
- "math/rand"
- "sync"
- "time"
- )
- type Tm struct {
- Queue chan task // внешний асинхр канал для тасков
- Done chan bool // внешний синхр канал для шатдауна
- readerSyncGroup *sync.WaitGroup // группа для ожидания окончания всех читателей
- }
- type task struct {
- Type string
- Payload int
- }
- func NewTm() (*Tm, error) {
- var tm Tm
- tm.Queue = make(chan task, 100)
- tm.Done = make(chan bool)
- tm.readerSyncGroup = &sync.WaitGroup{}
- go tm.router()
- return &tm, nil
- }
- func (tm *Tm) router() {
- for {
- task, more := <-tm.Queue
- if more {
- fmt.Println("received job ", task)
- switch task.Type {
- case "write":
- log.Print("wait for all readers to finish")
- tm.readerSyncGroup.Wait() // ждем когда все ридеры закончат работу
- time.Sleep(time.Second*3) // нагрузка
- registerResult(task.Payload)
- case "read":
- go func() {
- tm.readerSyncGroup.Add(1)
- defer tm.readerSyncGroup.Done()
- time.Sleep(time.Second*3) // нагрузка
- log.Print("reader done", task)
- registerResult(task.Payload)
- }()
- }
- } else {
- fmt.Println("received all jobs, die")
- tm.Done <- true
- return
- }
- }
- }
- func (tm *Tm) Wait() {
- <- tm.Done
- tm.readerSyncGroup.Wait()
- }
- var orig []int
- var result []int
- var resultMux sync.RWMutex
- func main() {
- rand.Seed(time.Now().UTC().UnixNano())
- defer timeTrack(time.Now(), "total time")
- tm, err := NewTm()
- if err!=nil {
- log.Print("Cant create Tm")
- }
- totalTaskQty := 10
- for i:=0; i<totalTaskQty; i++ {
- r := rand.Intn(200)
- orig = append(orig, r)
- log.Print("sending task:", r)
- if r > 100 {
- tm.Queue <- task{
- Type:"write",
- Payload:r,
- }
- } else {
- tm.Queue <- task{
- Type:"read",
- Payload:r,
- }
- }
- time.Sleep(time.Millisecond * 200)
- }
- log.Print("========================================================")
- close(tm.Queue)
- // ждем заполнения результата
- tm.Wait()
- //time.Sleep(time.Second * 30)
- log.Print(orig)
- log.Print(result)
- }
- func registerResult(v int) {
- resultMux.Lock()
- defer resultMux.Unlock()
- result = append(result, v)
- }
- func timeTrack(start time.Time, name string) {
- elapsed := time.Since(start)
- log.Printf("%s took %s", name, elapsed)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement