Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import "time"
- import "container/heap"
- import "math/rand"
- import "fmt"
- import "runtime/debug"
- type Task struct {
- num int
- priority int
- index int
- }
- // A PriorityQueue implements heap.Interface and holds Tasks.
- type PriorityQueue []*Task
- func (pq PriorityQueue) Len() int { return len(pq) }
- func (pq PriorityQueue) Less(i, j int) bool {
- // We want Pop to give us the highest, not lowest, priority so we use greater than here.
- return pq[i].priority > pq[j].priority
- }
- func (pq PriorityQueue) Swap(i, j int) {
- pq[i], pq[j] = pq[j], pq[i]
- pq[i].index = i
- pq[j].index = j
- }
- func (pq *PriorityQueue) Push(x interface{}) {
- n := len(*pq)
- item := x.(*Task)
- item.index = n
- *pq = append(*pq, item)
- }
- func (pq *PriorityQueue) Pop() interface{} {
- old := *pq
- n := len(old)
- item := old[n-1]
- item.index = -1 // for safety
- *pq = old[0 : n-1]
- return item
- }
- func main() {
- chanp1 := make(chan *Task, 1000)
- chanc1 := make(chan *Task)
- chanc2 := make(chan int)
- go prod(chanp1)
- for i := 0; i < 1000; i++ {
- go cons(chanc1, chanc2)
- }
- go buffer(chanp1, chanc2, chanc1)
- time.Sleep(1 * time.Hour)
- }
- func buffer(fromprod chan *Task, fromcons chan int, tocons chan *Task) {
- pq := &PriorityQueue{}
- heap.Init(pq)
- bufsize := 10000
- stats := new(debug.GCStats)
- oldt := stats.NumGC
- for {
- debug.ReadGCStats(stats)
- if stats.NumGC != oldt {
- oldt=stats.NumGC
- fmt.Println("Garbage collector: ",stats.NumGC, stats.Pause[0])
- }
- switch pq.Len() {
- case 0:
- // Empty heap. Wait only for a producer.
- mytask := <-fromprod
- heap.Push(pq, mytask)
- case bufsize:
- // Full heap. Wait only for a consumer.
- <-fromcons //Value is not important
- mytask := heap.Pop(pq).(*Task)
- tocons <- mytask
- default:
- // Wait for any of them
- select {
- case mytask := <-fromprod:
- heap.Push(pq, mytask)
- case <-fromcons:
- mytask := heap.Pop(pq).(*Task)
- tocons <- mytask
- }
- }
- }
- }
- func prod(tobuf chan *Task) {
- for i := 0; ; i++ {
- mytask := new(Task)
- mytask.num = i
- mytask.priority = rand.Int()
- tobuf <- mytask
- // fmt.Println("Prod: task ",mytask.num, " pri ",mytask.priority)
- }
- }
- func cons(frombuf chan *Task, tobuf chan int) {
- for {
- tobuf <- 0
- mytask := <-frombuf
- // fmt.Println(" Cons: task ",mytask.num, " pri ",mytask.priority)
- mytask.num++
- time.Sleep(time.Duration(100+rand.Int()%100) * time.Millisecond)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement