Advertisement
Guest User

Untitled

a guest
Mar 20th, 2019
96
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.27 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4. "context"
  5. "fmt"
  6. "log"
  7. "strings"
  8. "sync"
  9.  
  10. "golang.org/x/sync/errgroup"
  11. )
  12.  
  13. func main() {
  14. if err := run(); err != nil {
  15. log.Fatalf("%+v", err)
  16. }
  17. }
  18.  
  19. func run() error {
  20. defer fmt.Println("fin.")
  21.  
  22. ctx := context.Background()
  23. g, ctx := errgroup.WithContext(ctx)
  24.  
  25. aForX := make(chan int)
  26. aForY := make(chan int)
  27. bForY := make(chan string)
  28. bForZ := make(chan string)
  29. cForX := make(chan string)
  30. cForZ := make(chan string)
  31.  
  32. produceA := NewProducer(func(state State) {
  33. fmt.Println("start producer A")
  34. // producerA
  35. ch := make(chan int)
  36. g.Go(func() error {
  37. defer close(ch)
  38. if !state.Activated {
  39. fmt.Println("disabled", "A")
  40. return nil
  41. }
  42. fmt.Println("start", "A")
  43. for i := 0; i < 5; i++ {
  44. ch <- i
  45. }
  46. fmt.Println("end", "A")
  47. return nil
  48. })
  49. // modelatorI
  50. g.Go(func() error {
  51. fmt.Println("start", "i")
  52. n := 0
  53. defer close(aForX)
  54. defer close(aForY)
  55. for x := range ch {
  56. aForX <- x
  57. n += x
  58. }
  59. aForY <- n
  60. fmt.Println("end", "i")
  61. return nil
  62. })
  63. })
  64.  
  65. produceB := NewProducer(func(state State) {
  66. fmt.Println("start producer B")
  67. // producerB
  68. ch := make(chan string)
  69. g.Go(func() error {
  70. defer close(ch)
  71. if !state.Activated {
  72. fmt.Println("disabled", "B")
  73. return nil
  74. }
  75.  
  76. fmt.Println("start", "B")
  77.  
  78. xs := []string{"a", "b", "c", "d", "f"}
  79. for _, x := range xs {
  80. ch <- x
  81. }
  82. fmt.Println("end", "B")
  83. return nil
  84. })
  85. // modelatorJ
  86. g.Go(func() error {
  87. fmt.Println("start", "j")
  88. var merged []string
  89. defer close(bForY)
  90. defer close(bForZ)
  91. for x := range ch {
  92. bForZ <- x
  93. merged = append(merged, x)
  94. }
  95. bForY <- strings.Join(merged, ", ")
  96. fmt.Println("end", "j")
  97. return nil
  98. })
  99. })
  100.  
  101. produceC := NewProducer(func(state State) {
  102. fmt.Println("start producer C")
  103. // producerC
  104. ch := make(chan string)
  105. g.Go(func() error {
  106. defer close(ch)
  107. if !state.Activated {
  108. fmt.Println("disabled", "C")
  109. return nil
  110. }
  111.  
  112. fmt.Println("start", "C")
  113. xs := []string{"x", "y"}
  114. for _, x := range xs {
  115. ch <- x
  116. }
  117. fmt.Println("end", "C")
  118. return nil
  119. })
  120. // modelatorJ
  121. g.Go(func() error {
  122. fmt.Println("start", "k")
  123. defer close(cForX)
  124. defer close(cForZ)
  125. for x := range ch {
  126. cForX <- x
  127. cForZ <- x
  128. }
  129. fmt.Println("end", "k")
  130. return nil
  131. })
  132. })
  133.  
  134. consumeX := NewConsumer(func(state State) {
  135. g.Go(func() error {
  136. fmt.Println("start consumer X")
  137. var as []int
  138. var cs []string
  139. for x := range aForX {
  140. as = append(as, x)
  141. }
  142. for x := range cForX {
  143. cs = append(cs, x)
  144. }
  145. fmt.Println("end consumer X", state.Activated, as, cs)
  146. return nil
  147. })
  148. })
  149.  
  150. consumeY := NewConsumer(func(state State) {
  151. g.Go(func() error {
  152. fmt.Println("start consumer Y")
  153. fmt.Println("end consumer Y", state.Activated, <-aForY, <-bForY)
  154. return nil
  155. })
  156. })
  157.  
  158. consumeZ := NewConsumer(func(state State) {
  159. g.Go(func() error {
  160. fmt.Println("start consumer Z")
  161. var bs []string
  162. var cs []string
  163. for x := range bForZ {
  164. bs = append(bs, x)
  165. }
  166. for x := range cForZ {
  167. cs = append(cs, x)
  168. }
  169. fmt.Println("end consumer Z", state.Activated, bs, cs)
  170. return nil
  171. })
  172. })
  173.  
  174. // consumerX
  175. consumeX(WithActivated(), produceA, produceC)
  176.  
  177. // consumerY
  178. consumeY(WithDeactivated(), produceA, produceB)
  179.  
  180. // consumerZ
  181. consumeZ(WithDeactivated(), produceB, produceC)
  182.  
  183. return g.Wait()
  184. }
  185.  
  186. type producer struct {
  187. once sync.Once
  188. Fn func(state State)
  189. }
  190.  
  191. func (p *producer) Produce(state State) {
  192. p.once.Do(func() { p.Fn(state) })
  193. }
  194.  
  195. // NewProducer :
  196. func NewProducer(fn func(state State)) func(state State) {
  197. p := &producer{Fn: fn}
  198. return p.Produce
  199. }
  200.  
  201. type State struct {
  202. Activated bool
  203. }
  204.  
  205. // consumer :
  206. type consumer struct {
  207. State *State
  208. Fn func(state State)
  209. }
  210.  
  211. func (c *consumer) Consume(opt func(state *State), depends ...func(state State)) {
  212. opt(c.State)
  213. for i := range depends {
  214. depends[i](*c.State)
  215. }
  216. c.Fn(*c.State)
  217. }
  218.  
  219. // NewConsumer :
  220. func NewConsumer(fn func(state State)) func(opt func(state *State), depends ...func(state State)) {
  221. c := &consumer{Fn: fn, State: &State{Activated: true}}
  222. return c.Consume
  223. }
  224.  
  225. func WithDeactivated() func(state *State) {
  226. return func(state *State) {
  227. state.Activated = false
  228. }
  229. }
  230. func WithActivated() func(state *State) {
  231. return func(state *State) {
  232. state.Activated = true
  233. }
  234. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement