Advertisement
siritinga

Go producer/consumer using more than 1 OS process

May 26th, 2013
267
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 2.49 KB | None | 0 0
  1. package main
  2.  
  3. import "time"
  4. import "container/heap"
  5. import "math/rand"
  6. import "fmt"
  7. import "runtime/debug"
  8.  
  9. type Task struct {
  10.     num      int
  11.     priority int
  12.     index    int
  13. }
  14.  
  15. // A PriorityQueue implements heap.Interface and holds Tasks.
  16. type PriorityQueue []*Task
  17.  
  18. func (pq PriorityQueue) Len() int { return len(pq) }
  19.  
  20. func (pq PriorityQueue) Less(i, j int) bool {
  21.     // We want Pop to give us the highest, not lowest, priority so we use greater than here.
  22.     return pq[i].priority > pq[j].priority
  23. }
  24.  
  25. func (pq PriorityQueue) Swap(i, j int) {
  26.     pq[i], pq[j] = pq[j], pq[i]
  27.     pq[i].index = i
  28.     pq[j].index = j
  29. }
  30.  
  31. func (pq *PriorityQueue) Push(x interface{}) {
  32.     n := len(*pq)
  33.     item := x.(*Task)
  34.     item.index = n
  35.     *pq = append(*pq, item)
  36. }
  37.  
  38. func (pq *PriorityQueue) Pop() interface{} {
  39.     old := *pq
  40.     n := len(old)
  41.     item := old[n-1]
  42.     item.index = -1 // for safety
  43.     *pq = old[0 : n-1]
  44.     return item
  45. }
  46.  
  47. func main() {
  48.     chanp1 := make(chan *Task, 1000)
  49.     chanc1 := make(chan *Task)
  50.     chanc2 := make(chan int)
  51.  
  52.     go prod(chanp1)
  53.     for i := 0; i < 1000; i++ {
  54.         go cons(chanc1, chanc2)
  55.     }
  56.  
  57.     go buffer(chanp1, chanc2, chanc1)
  58.  
  59.     time.Sleep(1 * time.Hour)
  60. }
  61.  
  62. func buffer(fromprod chan *Task, fromcons chan int, tocons chan *Task) {
  63.     pq := &PriorityQueue{}
  64.     heap.Init(pq)
  65.     bufsize := 10000
  66.     stats := new(debug.GCStats)
  67.     oldt := stats.NumGC
  68.  
  69.     for {
  70.         debug.ReadGCStats(stats)
  71.         if stats.NumGC != oldt {
  72.             oldt=stats.NumGC
  73.             fmt.Println("Garbage collector: ",stats.NumGC, stats.Pause[0])
  74.         }
  75.        
  76.         switch pq.Len() {
  77.         case 0:
  78.             // Empty heap. Wait only for a producer.
  79.             mytask := <-fromprod
  80.             heap.Push(pq, mytask)
  81.  
  82.         case bufsize:
  83.             // Full heap. Wait only for a consumer.
  84.             <-fromcons //Value is not important
  85.             mytask := heap.Pop(pq).(*Task)
  86.             tocons <- mytask
  87.         default:
  88.             // Wait for any of them
  89.             select {
  90.             case mytask := <-fromprod:
  91.                 heap.Push(pq, mytask)
  92.             case <-fromcons:
  93.                 mytask := heap.Pop(pq).(*Task)
  94.                 tocons <- mytask
  95.             }
  96.         }
  97.     }
  98. }
  99.  
  100. func prod(tobuf chan *Task) {
  101.     for i := 0; ; i++ {
  102.         mytask := new(Task)
  103.         mytask.num = i
  104.         mytask.priority = rand.Int()
  105.         tobuf <- mytask
  106.         //      fmt.Println("Prod: task ",mytask.num, " pri ",mytask.priority)
  107.     }
  108. }
  109.  
  110. func cons(frombuf chan *Task, tobuf chan int) {
  111.     for {
  112.         tobuf <- 0
  113.         mytask := <-frombuf
  114.         //      fmt.Println("                               Cons: task ",mytask.num, " pri ",mytask.priority)
  115.         mytask.num++
  116.         time.Sleep(time.Duration(100+rand.Int()%100) * time.Millisecond)
  117.     }
  118. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement