Advertisement
Guest User

Untitled

a guest
Feb 27th, 2017
97
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.98 KB | None | 0 0
  1. import (
  2. "fmt"
  3. "sync"
  4. "time"
  5. )
  6.  
  7. func controlledConcurrencyStopOnFirstError() {
  8. // Note that the process should have more tasks to execute than the amount
  9. // it is allowed to execute concurrently. Because there is no point in doing
  10. // the opposite. tasks > concurrency.
  11. const tasks = 100
  12.  
  13. counter := 0
  14.  
  15. // To achieve that a buffered channel is needed, the channel length should
  16. // be the amount of tasks we want to concurrently execute.
  17. const concurrency = 30
  18. tokens := make(chan struct{}, concurrency)
  19. defer close(tokens)
  20. for i := 0; i < concurrency; i++ {
  21. tokens <- struct{}{}
  22. }
  23.  
  24. // In this type of loop it is usually expected to be able to have access to
  25. // error value, to do so every iteration should return a result value, the
  26. // result in this case have a task field and an err field, and whenever
  27. // an error occurs during an iteration a result value should be return with
  28. // an error in the err field.
  29. results := make(chan *result, tasks)
  30.  
  31. // It is also necessary to have a channel that will be used to notify the
  32. // loop that it should stop.
  33. abort := make(chan struct{})
  34.  
  35. // A sync.WaitGroup is used to allow the function to wait for the concurrent
  36. // tasks to finish before it return the control to the caller.
  37. var wg sync.WaitGroup
  38.  
  39. for task := 0; task < tasks; task++ {
  40. // Increment the sync.WaitGroup before getting into the go routine.
  41. wg.Add(1)
  42.  
  43. fmt.Println(task, "...")
  44. go func(task int) {
  45. // Decrement the sync.WaitGroup counter at the end of the task.
  46. defer wg.Done()
  47. // Use a select statement to abort any pending operation ASAP
  48. select {
  49. // Take a token from the tokens channel, doing so will indicate that
  50. // one of the concurrent tasks are being executed and will prevent
  51. // more than the allowed amount of tasks to execute concurrently.
  52. // It works because as soon as the channel runs out of tokens
  53. // further receive operations on it will lock until a token is sent
  54. // back to the channel.
  55. case token := <-tokens:
  56. defer func() { tokens <- token }()
  57. results <- doWork(counter, task)
  58. case <-abort:
  59. return
  60. }
  61. }(task)
  62. }
  63. // The closer go routine below is necessary to unlock the for loop that
  64. // receives from the results channel, not doing so will create a deadlock as
  65. // soon as the loop above finishes the tasks.
  66. go func() {
  67. wg.Wait()
  68. close(results)
  69. }()
  70.  
  71. // Consumes the results channel and abort the whole operation on the first
  72. // error.
  73. for result := range results {
  74. if result.err != nil {
  75. fmt.Println(result.task, "Fail")
  76. fmt.Printf("sending abort sign: %v\n", result.err)
  77. close(abort)
  78. break
  79. } else {
  80. counter = counter + 1
  81. fmt.Println(result.task, "Ok")
  82. }
  83. }
  84.  
  85. fmt.Printf("finished %d tasks\n", counter)
  86. }
  87.  
  88. type result struct {
  89. task int
  90. err error
  91. }
  92.  
  93. func doWork(counter int, task int) *result {
  94. if counter > 30 {
  95. return &result{
  96. task: task,
  97. err: fmt.Errorf("an error occurred at counter: %d, task: %d",
  98. counter, task)}
  99. }
  100.  
  101. time.Sleep(100 * time.Millisecond)
  102. return &result{task: task}
  103. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement