Advertisement
Guest User

Untitled

a guest
Aug 23rd, 2017
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.09 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4. "fmt"
  5. "sync"
  6. )
  7.  
  8. func gen(done <-chan struct{}, nums ...int) <-chan int {
  9. out := make(chan int)
  10. go func() {
  11. defer close(out)
  12. for _, n := range nums {
  13. select {
  14. case out <- n:
  15. case <-done:
  16. return
  17. }
  18. }
  19. }()
  20. return out
  21. }
  22.  
  23. func sq(done <-chan struct{}, in <-chan int) <-chan int {
  24. out := make(chan int)
  25. go func() {
  26. defer close(out)
  27. for n := range in {
  28. select {
  29. case out <- n * n:
  30. case <-done:
  31. return
  32. }
  33. }
  34. }()
  35. return out
  36. }
  37.  
  38. func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
  39. var wg sync.WaitGroup
  40. out := make(chan int)
  41.  
  42. output := func(c <-chan int) {
  43. defer wg.Done()
  44. for n := range c {
  45. select {
  46. case out <- n:
  47. case <-done:
  48. return
  49. }
  50. }
  51. }
  52.  
  53. wg.Add(len(cs))
  54. for _, c := range cs {
  55. go output(c)
  56. }
  57.  
  58. go func() {
  59. wg.Wait()
  60. close(out)
  61. }()
  62. return out
  63. }
  64.  
  65. func main() {
  66. // Setup a done channel that's shared by the whole pipeline
  67. done := make(chan struct{})
  68. defer close(done)
  69.  
  70. in := gen(done, 2, 3)
  71.  
  72. // fanout
  73. c1 := sq(done, in)
  74. c2 := sq(done, in)
  75.  
  76. // fanin
  77. out := merge(done, c1, c2)
  78. fmt.Println(<-out)
  79. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement