Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import (
- "fmt"
- "log"
- "math/rand"
- "time"
- )
- type Threader struct {
- TIME_LIMIT time.Time
- numWorkers int
- jobs int
- jobsProduced int
- jobsWorking int
- doneProducing chan bool
- doneReceiving chan bool
- pwCounterChan chan int
- pwChan chan int
- wrChan chan int
- errChan chan error
- initTime time.Time
- }
- const TIME_LIMIT = time.Second
- func main() {
- mimt := Threader{}
- mimt.Init(4, 27)
- }
- func (tr *Threader) Init(numWorkers int, jobs int) {
- tr.pwCounterChan = make(chan int)
- tr.doneProducing = make(chan bool)
- tr.doneReceiving = make(chan bool)
- tr.pwChan = make(chan int)
- tr.wrChan = make(chan int)
- tr.errChan = make(chan error)
- tr.initTime = time.Now()
- tr.numWorkers = numWorkers
- tr.jobsWorking = 0
- log.Printf("Init %p", &tr.jobsWorking)
- tr.jobs = jobs
- // start producing jobs
- go tr.produce()
- // start waiting for jobs done
- go tr.receiver()
- // start workers for jobs
- for i := 0; i < tr.numWorkers; i++ {
- go tr.work()
- }
- // wait for done producing
- <-tr.doneProducing
- }
- func (tr *Threader) produce() {
- // current job
- i := 0
- // marker for timing
- tr.jobsProduced = 0
- // start tracking time passed producing
- t := time.Now()
- log.Printf("Jobs Working %p", &tr.jobsWorking)
- javi := 0
- exit := 0
- for {
- for {
- select {
- case n := <-tr.pwCounterChan:
- javi += n
- fmt.Println("received javi", n, "jval", javi)
- default:
- exit = 1
- }
- if exit == 1 {
- exit = 0
- break
- }
- }
- if tr.jobsProduced == tr.numWorkers {
- // if moreThanTimeLimit has passed
- if moreThanTL(t) {
- // reset
- tr.jobsProduced = javi
- t = time.Now()
- }
- } else {
- // normal round, send work
- log.Printf("Sending %d", i)
- // send job
- tr.pwChan <- i
- // add to counters
- i += 1
- tr.jobsProduced += 1
- }
- // we finished doing our jobs, quit producing
- if i == tr.jobs {
- //log.Printf("Rondas done %d", i)
- break
- }
- }
- end(tr.initTime, "PRODUCE")
- // make main wait for all to finish
- tr.doneProducing <- <-tr.doneReceiving
- }
- func (tr *Threader) work() {
- // the routine receives the data,
- // sleeps for a random time
- // and sends response to receiver (wrChan)
- var mi_rand int = rand.Intn(7)
- for {
- resp := <-tr.pwChan
- // do work
- tr.pwCounterChan <- 1
- if mi_rand%3 == 0 {
- //log.Printf("mi_rand %d", mi_rand)
- time.Sleep(10 * (time.Second))
- }
- tr.pwCounterChan <- -1
- // rend response
- tr.wrChan <- resp
- }
- }
- func (tr *Threader) receiver() {
- // waits for msgs from workers
- for {
- u := <-tr.wrChan
- log.Printf("Received %d", u)
- // all workers finished, quit
- if u == (tr.jobs - 1) {
- break
- }
- }
- end(tr.initTime, "RECEIVE")
- tr.doneReceiving <- true
- }
- ////
- // helpers
- ////
- func moreThanTL(t time.Time) bool {
- elapsed := time.Since(t)
- if TIME_LIMIT < elapsed {
- return true
- }
- return false
- }
- func end(t time.Time, s string) {
- elapsed := time.Since(t)
- log.Printf("%s took %s", s, elapsed)
- }
- func random(min, max int) int {
- rand.Seed(time.Now().Unix())
- return rand.Intn(max-min) + min
- }
Add Comment
Please, Sign In to add comment