Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import (
- "fmt"
- "sync"
- "time"
- )
- func controlledConcurrencyStopOnFirstError() {
- // Note that the process should have more tasks to execute than the amount
- // it is allowed to execute concurrently. Because there is no point in doing
- // the opposite. tasks > concurrency.
- const tasks = 100
- counter := 0
- // To achieve that a buffered channel is needed, the channel length should
- // be the amount of tasks we want to concurrently execute.
- const concurrency = 30
- tokens := make(chan struct{}, concurrency)
- defer close(tokens)
- for i := 0; i < concurrency; i++ {
- tokens <- struct{}{}
- }
- // In this type of loop it is usually expected to be able to have access to
- // error value, to do so every iteration should return a result value, the
- // result in this case have a task field and an err field, and whenever
- // an error occurs during an iteration a result value should be return with
- // an error in the err field.
- results := make(chan *result, tasks)
- // It is also necessary to have a channel that will be used to notify the
- // loop that it should stop.
- abort := make(chan struct{})
- // A sync.WaitGroup is used to allow the function to wait for the concurrent
- // tasks to finish before it return the control to the caller.
- var wg sync.WaitGroup
- for task := 0; task < tasks; task++ {
- // Increment the sync.WaitGroup before getting into the go routine.
- wg.Add(1)
- fmt.Println(task, "...")
- go func(task int) {
- // Decrement the sync.WaitGroup counter at the end of the task.
- defer wg.Done()
- // Use a select statement to abort any pending operation ASAP
- select {
- // Take a token from the tokens channel, doing so will indicate that
- // one of the concurrent tasks are being executed and will prevent
- // more than the allowed amount of tasks to execute concurrently.
- // It works because as soon as the channel runs out of tokens
- // further receive operations on it will lock until a token is sent
- // back to the channel.
- case token := <-tokens:
- defer func() { tokens <- token }()
- results <- doWork(counter, task)
- case <-abort:
- return
- }
- }(task)
- }
- // The closer go routine below is necessary to unlock the for loop that
- // receives from the results channel, not doing so will create a deadlock as
- // soon as the loop above finishes the tasks.
- go func() {
- wg.Wait()
- close(results)
- }()
- // Consumes the results channel and abort the whole operation on the first
- // error.
- for result := range results {
- if result.err != nil {
- fmt.Println(result.task, "Fail")
- fmt.Printf("sending abort sign: %v\n", result.err)
- close(abort)
- break
- } else {
- counter = counter + 1
- fmt.Println(result.task, "Ok")
- }
- }
- fmt.Printf("finished %d tasks\n", counter)
- }
- type result struct {
- task int
- err error
- }
- func doWork(counter int, task int) *result {
- if counter > 30 {
- return &result{
- task: task,
- err: fmt.Errorf("an error occurred at counter: %d, task: %d",
- counter, task)}
- }
- time.Sleep(100 * time.Millisecond)
- return &result{task: task}
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement