Advertisement
Ivanezko

Untitled

Dec 11th, 2019
629
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 4.95 KB | None | 0 0
  1. package main
  2.  
  3. // очередь задачь с приоретизацией по типу задачи
  4. // таски на запись не должны делаться параллельно с читающими
  5. // писатель в любой момент времени может быть только 1
  6. // таски на чтение могут быть параллельны друг с другом
  7. // читателей в любой момент времени может бытьт не более 10
  8. // если есть читатели - у них приоритет над писателями
  9. // но если писателей накапливается слишком много - надо писать, чтобы не забить буфер
  10.  
  11. import (
  12.     "log"
  13.     "math/rand"
  14.     "sync"
  15.     "time"
  16. )
  17.  
  18. type Tm struct {
  19.     Writer chan int // внешний асинхр канал для записи
  20.     writerDone chan bool // синхронный канал для сообщения об окончании записи
  21.     Reader chan int // внешний асинхр канал для чтения
  22.     readerSyncGroup *sync.WaitGroup //  группа для ожидания окончания всех читателей
  23.  
  24.     w chan int // внутренний синхронный канал для записи
  25.     r chan int // внутренный асинхронный канал для чтения
  26.     readerQty int // количество читателей
  27. }
  28.  
  29. func NewTm() (*Tm, error) {
  30.     var tm Tm
  31.     tm.readerQty = 10
  32.     tm.Writer = make(chan int, 100)
  33.     tm.writerDone = make(chan bool)
  34.     tm.Reader = make(chan int, 100)
  35.     tm.readerSyncGroup = &sync.WaitGroup{} // группа ридеров
  36.  
  37.     tm.w = make(chan int)
  38.     tm.r = make(chan int, tm.readerQty)
  39.  
  40.     go tm.writeDaemon()
  41.     go tm.readDaemon()
  42.     go tm.router()
  43.  
  44.     return &tm, nil
  45. }
  46.  
  47.  
  48. func (tm *Tm) router() {
  49.     for {
  50.         if len(tm.Writer)  > cap(tm.Writer)/2 { // если писатели забили уже половину буфера
  51.             log.Print("we have too much writes in the queue - so force write")
  52.             tm.write(<-tm.Writer) // пишем
  53.         } else {
  54.             select {
  55.             case v:=<-tm.Reader: // если есть запрос на чтение
  56.                 tm.read(v) // читаем
  57.             default: // если нет читателей - проверяем, есть ли писатель
  58.                 select {
  59.                 case v:=<-tm.Writer: // если есть запрос на запись
  60.                     tm.write(v) // пишем
  61.                 default:
  62.                     // нет задач, делать нечего
  63.                 }
  64.             }
  65.         }
  66.     }
  67. }
  68.  
  69.  
  70. func (tm *Tm) read(v int) error {
  71.     log.Print("READERS:", len(tm.Reader))
  72.     tm.readerSyncGroup.Add(1)
  73.     tm.r <- v // отправляем в асинхронный канал на чтение
  74.     return nil
  75. }
  76.  
  77. func (tm *Tm) write(v int) error {
  78.     log.Print("WRITERS:", len(tm.Writer))
  79.     log.Print("wait for all readers to end")
  80.     tm.readerSyncGroup.Wait() // ждем когда все ридеры закончат работу
  81.     log.Print("all readers done, we can write!")
  82.     tm.w <- v // отправляем в синхронный канал на запись, ждем
  83.     <-tm.writerDone // ждем сообщения об окончании записи
  84.     return nil
  85. }
  86.  
  87. func (tm *Tm) writeDaemon() {
  88.     for v := range tm.w {
  89.         log.Print("write ", v, "...")
  90.         time.Sleep(time.Second) // нагрузка
  91.         resultChan <- v
  92.         log.Print("...write done ", v)
  93.         func() {tm.writerDone <- true}() // сигнал об окончании записи
  94.     }
  95. }
  96.  
  97. func (tm *Tm) readDaemon() {
  98.     for i := 0; i<tm.readerQty; i++ {
  99.         go func() {
  100.             for v := range tm.r {
  101.                 log.Print("read ", v, "...")
  102.                 time.Sleep(time.Duration(int(time.Second) * rand.Intn(10))) // нагрузка
  103.                 resultChan <- v
  104.                 log.Print("...read done ", v)
  105.                 tm.readerSyncGroup.Done() // освобождаем ридер
  106.             }
  107.         }()
  108.     }
  109. }
  110.  
  111. var orig []int
  112. var result []int
  113. var resultChan chan int
  114.  
  115. func main() {
  116.     rand.Seed(time.Now().UTC().UnixNano())
  117.     defer timeTrack(time.Now(), "total time")
  118.  
  119.     tm, err := NewTm()
  120.     if err!=nil {
  121.         log.Print("Cant create Tm")
  122.     }
  123.  
  124.  
  125.     resultChan = make(chan int, 100)
  126.  
  127.     // пишем результат из канала
  128.     go func() {
  129.         for {
  130.             r := <- resultChan
  131.             result = append(result, r)
  132.         }
  133.     }()
  134.  
  135.     totalTaskQty := 10
  136.  
  137.     for i:=0; i<totalTaskQty; i++ {
  138.         r := rand.Intn(200)
  139.         orig = append(orig, r)
  140.         log.Print("input:", r)
  141.         if r > 100 {
  142.             tm.Writer <- r
  143.         } else {
  144.             tm.Reader <- r
  145.         }
  146.         time.Sleep(time.Millisecond * 200)
  147.     }
  148.     log.Print("========================================================")
  149.     close(tm.Reader)
  150.     close(tm.Writer)
  151.  
  152.     // ждем заполнения результата
  153.     for len(result) < totalTaskQty {
  154.         time.Sleep(time.Second)
  155.     }
  156.  
  157.     //time.Sleep(time.Second * 30)
  158.  
  159.     log.Print(orig)
  160.     log.Print(result)
  161.  
  162. }
  163.  
  164.  
  165.  
  166.  
  167.  
  168. func timeTrack(start time.Time, name string) {
  169.     elapsed := time.Since(start)
  170.     log.Printf("%s took %s", name, elapsed)
  171. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement