Advertisement
Guest User

Untitled

a guest
Mar 18th, 2019
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 2.44 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4.     // "crypto/md5"
  5.     "fmt"
  6.     "sync"
  7.     "sync/atomic"
  8.     "time"
  9.     // "hash/crc32"
  10.     // "strconv"
  11.     // "sync/atomic"
  12.     // "time"
  13. )
  14.  
  15. type job func(in, out chan interface{})
  16.  
  17. // сюда писать код
  18.  
  19. func ExecutePipeline(jobs ...job) {
  20.     shared := make(chan interface{}, 2)
  21.     wg := &sync.WaitGroup{}
  22.     for _, work := range jobs {
  23.         inChan := shared
  24.         outChan := make(chan interface{}, 2)
  25.         wg.Add(1)
  26.         go func() {
  27.             defer wg.Done()
  28.             work(inChan, outChan)
  29.             close(outChan)
  30.             fmt.Println("Done")
  31.         }()
  32.         shared = outChan
  33.     }
  34.     // wg.Wait()
  35.     // jobWrapper := func(work job, in, out chan interface{}, waiter *sync.WaitGroup) {
  36.     //  defer waiter.Done()
  37.     //  work(in, out)
  38.     // }
  39.     // wg := &sync.WaitGroup{}
  40.     // for _, work := range jobs {
  41.     //  wg.Add(1)
  42.     //  // go jobWrapper(param, inGoro, outGoro, wg)
  43.     //  // go func() {
  44.     //  //  defer wg.Done()
  45.     //  //  work(inGoro, outGoro)
  46.     //  // }()
  47.     //  go work(inGoro, outGoro)
  48.     //  inGoro = outGoro
  49.     //  outGoro = make(chan interface{}, 2)
  50.     // }
  51.     // wg.Wait()
  52. }
  53. func MultiHash(in, out chan interface{}) {
  54.     fmt.Println("in MultiHash")
  55. }
  56.  
  57. func CombineResults(in, out chan interface{}) {
  58.     fmt.Println("in CombineResults")
  59. }
  60.  
  61. func SingleHash(in, out chan interface{}) {
  62.     //var read string
  63.     //read = (<-in).(string)
  64.     // _read := read.(string)
  65.     fmt.Println("in SingleHash read :")
  66.     //var ou string = read + "_Out_"
  67.     //out <- ou
  68. }
  69.  
  70. // type job func(in, out chan interface{})
  71.  
  72. func main() {
  73.  
  74.     var ok = true
  75.     var recieved uint32
  76.     freeFlowJobs := []job{
  77.         job(func(in, out chan interface{}) {
  78.             out <- 1
  79.             time.Sleep(10 * time.Millisecond)
  80.             currRecieved := atomic.LoadUint32(&recieved)
  81.             // в чем тут суть
  82.             // если вы накапливаете значения,
  83.             // то пока вся функция не отрабоатет - дальше они не пойдут
  84.             // тут я проверяю, что счетчик увеличился в следующей функции
  85.             // это значит что туда дошло значение прежде чем текущая
  86.             // функция отработала
  87.             if currRecieved == 0 {
  88.                 ok = false
  89.             }
  90.         }),
  91.         job(func(in, out chan interface{}) {
  92.             for _ = range in {
  93.                 atomic.AddUint32(&recieved, 1)
  94.             }
  95.         }),
  96.     }
  97.     ExecutePipeline(freeFlowJobs...)
  98.     time.Sleep(2)
  99.     if !ok || recieved == 0 {
  100.         fmt.Println("no value free flow - dont collect them")
  101.     }
  102. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement