Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- // очередь задачь с приоретизацией по типу задачи
- // таски на запись не должны делаться параллельно с читающими
- // писатель в любой момент времени может быть только 1
- // таски на чтение могут быть параллельны друг с другом
- // читателей в любой момент времени может бытьт не более 10
- // если есть читатели - у них приоритет над писателями
- // но если писателей накапливается слишком много - надо писать, чтобы не забить буфер
- import (
- "log"
- "math/rand"
- "sync"
- "time"
- )
- type Tm struct {
- Writer chan int // внешний асинхр канал для записи
- writerDone chan bool // синхронный канал для сообщения об окончании записи
- Reader chan int // внешний асинхр канал для чтения
- readerSyncGroup *sync.WaitGroup // группа для ожидания окончания всех читателей
- w chan int // внутренний синхронный канал для записи
- r chan int // внутренный асинхронный канал для чтения
- readerQty int // количество читателей
- }
- func NewTm() (*Tm, error) {
- var tm Tm
- tm.readerQty = 10
- tm.Writer = make(chan int, 100)
- tm.writerDone = make(chan bool)
- tm.Reader = make(chan int, 100)
- tm.readerSyncGroup = &sync.WaitGroup{} // группа ридеров
- tm.w = make(chan int)
- tm.r = make(chan int, tm.readerQty)
- go tm.writeDaemon()
- go tm.readDaemon()
- go tm.router()
- return &tm, nil
- }
- func (tm *Tm) router() {
- for {
- if len(tm.Writer) > cap(tm.Writer)/2 { // если писатели забили уже половину буфера
- log.Print("we have too much writes in the queue - so force write")
- tm.write(<-tm.Writer) // пишем
- } else {
- select {
- case v:=<-tm.Reader: // если есть запрос на чтение
- tm.read(v) // читаем
- default: // если нет читателей - проверяем, есть ли писатель
- select {
- case v:=<-tm.Writer: // если есть запрос на запись
- tm.write(v) // пишем
- default:
- // нет задач, делать нечего
- }
- }
- }
- }
- }
- func (tm *Tm) read(v int) error {
- log.Print("READERS:", len(tm.Reader))
- tm.readerSyncGroup.Add(1)
- tm.r <- v // отправляем в асинхронный канал на чтение
- return nil
- }
- func (tm *Tm) write(v int) error {
- log.Print("WRITERS:", len(tm.Writer))
- log.Print("wait for all readers to end")
- tm.readerSyncGroup.Wait() // ждем когда все ридеры закончат работу
- log.Print("all readers done, we can write!")
- tm.w <- v // отправляем в синхронный канал на запись, ждем
- <-tm.writerDone // ждем сообщения об окончании записи
- return nil
- }
- func (tm *Tm) writeDaemon() {
- for v := range tm.w {
- log.Print("write ", v, "...")
- time.Sleep(time.Second) // нагрузка
- resultChan <- v
- log.Print("...write done ", v)
- func() {tm.writerDone <- true}() // сигнал об окончании записи
- }
- }
- func (tm *Tm) readDaemon() {
- for i := 0; i<tm.readerQty; i++ {
- go func() {
- for v := range tm.r {
- log.Print("read ", v, "...")
- time.Sleep(time.Duration(int(time.Second) * rand.Intn(10))) // нагрузка
- resultChan <- v
- log.Print("...read done ", v)
- tm.readerSyncGroup.Done() // освобождаем ридер
- }
- }()
- }
- }
- var orig []int
- var result []int
- var resultChan chan int
- func main() {
- rand.Seed(time.Now().UTC().UnixNano())
- defer timeTrack(time.Now(), "total time")
- tm, err := NewTm()
- if err!=nil {
- log.Print("Cant create Tm")
- }
- resultChan = make(chan int, 100)
- // пишем результат из канала
- go func() {
- for {
- r := <- resultChan
- result = append(result, r)
- }
- }()
- totalTaskQty := 10
- for i:=0; i<totalTaskQty; i++ {
- r := rand.Intn(200)
- orig = append(orig, r)
- log.Print("input:", r)
- if r > 100 {
- tm.Writer <- r
- } else {
- tm.Reader <- r
- }
- time.Sleep(time.Millisecond * 200)
- }
- log.Print("========================================================")
- close(tm.Reader)
- close(tm.Writer)
- // ждем заполнения результата
- for len(result) < totalTaskQty {
- time.Sleep(time.Second)
- }
- //time.Sleep(time.Second * 30)
- log.Print(orig)
- log.Print(result)
- }
- func timeTrack(start time.Time, name string) {
- elapsed := time.Since(start)
- log.Printf("%s took %s", name, elapsed)
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement