Advertisement
Ivanezko

Untitled

Dec 11th, 2019
549
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 2.80 KB | None | 0 0
  1. package main
  2.  
  3. // очередь без приоритетотв
  4. // таски на запись не должны делаться параллельно с читающими
  5. // писатель в любой момент времени может быть только 1
  6. // таски на чтение могут быть параллельны друг с другом
  7.  
  8. import (
  9.     "fmt"
  10.     "log"
  11.     "math/rand"
  12.     "sync"
  13.     "time"
  14. )
  15.  
  16. type Tm struct {
  17.     Queue chan task // внешний асинхр канал для тасков
  18.     Done chan bool // внешний синхр канал для шатдауна
  19.     readerSyncGroup *sync.WaitGroup //  группа для ожидания окончания всех читателей
  20. }
  21.  
  22. type task struct {
  23.     Type string
  24.     Payload int
  25. }
  26.  
  27. func NewTm() (*Tm, error) {
  28.     var tm Tm
  29.     tm.Queue = make(chan task, 100)
  30.     tm.Done = make(chan bool)
  31.     tm.readerSyncGroup = &sync.WaitGroup{}
  32.  
  33.     go tm.router()
  34.  
  35.     return &tm, nil
  36. }
  37.  
  38.  
  39. func (tm *Tm) router() {
  40.     for {
  41.         task, more := <-tm.Queue
  42.         if more {
  43.             fmt.Println("received job ", task)
  44.             switch task.Type {
  45.             case "write":
  46.                 log.Print("wait for all readers to finish")
  47.                 tm.readerSyncGroup.Wait() // ждем когда все ридеры закончат работу
  48.                 time.Sleep(time.Second*3) // нагрузка
  49.                 registerResult(task.Payload)
  50.             case "read":
  51.                     go func() {
  52.                         tm.readerSyncGroup.Add(1)
  53.                         defer tm.readerSyncGroup.Done()
  54.                         time.Sleep(time.Second*3) // нагрузка
  55.                         log.Print("reader done", task)
  56.                         registerResult(task.Payload)
  57.                     }()
  58.             }
  59.         } else {
  60.             fmt.Println("received all jobs, die")
  61.             tm.Done <- true
  62.             return
  63.         }
  64.     }
  65. }
  66.  
  67. func (tm *Tm) Wait() {
  68.     <- tm.Done
  69.     tm.readerSyncGroup.Wait()
  70. }
  71.  
  72. var orig []int
  73. var result []int
  74. var resultMux  sync.RWMutex
  75.  
  76. func main() {
  77.     rand.Seed(time.Now().UTC().UnixNano())
  78.     defer timeTrack(time.Now(), "total time")
  79.  
  80.     tm, err := NewTm()
  81.     if err!=nil {
  82.         log.Print("Cant create Tm")
  83.     }
  84.  
  85.     totalTaskQty := 10
  86.  
  87.     for i:=0; i<totalTaskQty; i++ {
  88.         r := rand.Intn(200)
  89.         orig = append(orig, r)
  90.         log.Print("sending task:", r)
  91.  
  92.         if r > 100 {
  93.             tm.Queue <- task{
  94.                 Type:"write",
  95.                 Payload:r,
  96.             }
  97.         } else {
  98.             tm.Queue <- task{
  99.                 Type:"read",
  100.                 Payload:r,
  101.             }
  102.         }
  103.         time.Sleep(time.Millisecond * 200)
  104.     }
  105.     log.Print("========================================================")
  106.     close(tm.Queue)
  107.  
  108.     // ждем заполнения результата
  109.     tm.Wait()
  110.  
  111.  
  112.     //time.Sleep(time.Second * 30)
  113.  
  114.     log.Print(orig)
  115.     log.Print(result)
  116.  
  117. }
  118.  
  119.  
  120. func registerResult(v int) {
  121.     resultMux.Lock()
  122.     defer resultMux.Unlock()
  123.     result = append(result, v)
  124. }
  125.  
  126.  
  127.  
  128. func timeTrack(start time.Time, name string) {
  129.     elapsed := time.Since(start)
  130.     log.Printf("%s took %s", name, elapsed)
  131. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement