Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- /*
- Copyright (C) 2011 Sam Fredrickson
- All rights reserved.
- The broadcast package implements a way to send values to multiple receivers.
- Inspired by a post on rogpeppe.wordpress.com.
- */
- package broadcast
- import (
- "container/vector"
- "time"
- )
- // A broadcaster sends values to each of its receivers.
- type Broadcaster struct {
- // Map of receivers waiting on this broadcaster.
- receivers map[int]*Receiver
- // Channel of incoming broadcasts to be sent.
- incoming chan interface{}
- // Channel of requests to add a receiver.
- joining chan joinRequest
- // Channel of receiver hash keys to remove from the map.
- leaving chan int
- }
- // A receiver waits for values from a single broadcaster. Received values are
- // buffered so that sleeping readers don't impede the broadcaster. Each receiver
- // has its own buffer, so it's advisible to try to keep the readers up to speed
- // to avoid excessive memory usage.
- type Receiver struct {
- // The hash key in the broadcaster's map of receivers for this one.
- id int
- // Whether or not the receiver should continue waiting for broadcasts.
- running bool
- // The broadcaster for this receiver.
- bcaster *Broadcaster
- // Channel for incoming broadcasts.
- incoming chan interface{}
- // Channel for incoming receive requests.
- requests chan receiveRequest
- // Buffer of received broadcasts.
- buffer vector.Vector
- }
- type joinRequest struct {
- // The requesting receiver
- receiver *Receiver
- // The result of the attempt to join the broadcaster.
- result chan bool
- }
- type receiveRequest struct {
- // The channel on which to reply a broadcast.
- reply chan interface{}
- // The channel on which to reply an error.
- err chan bool
- }
- // Create a new broadcaster.
- func New() *Broadcaster {
- rec := make(map[int]*Receiver)
- inc := make(chan interface{})
- ent := make(chan joinRequest)
- lev := make(chan int)
- brd := &Broadcaster{rec, inc, ent, lev}
- go broadcasterLoop(brd)
- return brd
- }
- // The main broadcaster loop. It monitors the channels and responds
- // appropriately. All modifications occur in the goroutine running this, so the
- // structure should be concurrent-safe.
- func broadcasterLoop(b *Broadcaster) {
- last_id := 0
- for {
- select {
- // If a message is incoming, broadcast it to all receivers.
- case v := <-b.incoming:
- for i := range b.receivers {
- b.receivers[i].incoming <- v
- }
- // If a new receiver is joining, add them and increment id.
- case req := <-b.joining:
- req.receiver.id = last_id
- // If a receiver with this id already exists, don't add it.
- if _, ok := b.receivers[last_id]; ok {
- req.receiver.running = false
- req.result <- false
- } else {
- b.receivers[last_id] = req.receiver
- req.result <- true
- }
- last_id++
- // If a receiver is leaving, remove from map
- case i := <-b.leaving:
- if r, ok := b.receivers[i]; ok {
- r.running = false
- b.receivers[i] = nil, false
- }
- }
- }
- }
- // The main receiver loop. Like the broadcaster one.
- func receiverLoop(r *Receiver) {
- for r.running {
- select {
- // If a broadcast is incoming, push it to the buffer.
- case v := <-r.incoming:
- r.buffer.Push(v)
- // If a request for next broadcast available,
- case req := <-r.requests:
- // If nothing to broadcast, inform Receive() that of the error.
- if r.buffer.Len() == 0 {
- req.err <- true
- } else {
- // Reply with first available broadcast, and remove from buffer.
- v := r.buffer.At(0)
- r.buffer.Delete(0)
- req.reply <- v
- }
- }
- }
- // Might as well ensure the receiver is disposed, it doesn't hurt.
- disposeReceiver(r)
- }
- func disposeReceiver(r *Receiver) {
- r.running = false
- if !closed(r.incoming) {
- close(r.incoming)
- }
- if !closed(r.requests) {
- close(r.requests)
- }
- r.buffer.Resize(0, 0)
- }
- // Broadcast a value to all subscribers to this broadcaster.
- func (b *Broadcaster) Broadcast(v interface{}) {
- b.incoming <- v
- }
- // Subscribe to the broadcaster, using the returned receiver to receive messages.
- // If an error occurs, the result will be nil.
- func (b *Broadcaster) Listen() *Receiver {
- inc := make(chan interface{})
- req := make(chan receiveRequest)
- var buf vector.Vector
- r := &Receiver{0, true, b, inc, req, buf}
- joinReq := joinRequest{r, make(chan bool)}
- // Try to join the broadcaster.
- b.joining <- joinReq
- if <-joinReq.result {
- go receiverLoop(r)
- } else {
- disposeReceiver(r)
- r = nil
- }
- return r
- }
- // Receive and return the next message broadcast to this receiver.
- // Blocks until a value is available to return.
- func (r *Receiver) Receive() (v interface{}) {
- if !r.running {
- return nil
- }
- // Make a request for next broadcast on this channel.
- reply := make(chan interface{})
- err := make(chan bool)
- defer close(reply)
- defer close(err)
- req := receiveRequest{reply, err}
- for r.running {
- if closed(r.requests) {
- return
- }
- // Inform the receiver we want a value.
- r.requests <- req
- select {
- // Yay! Done.
- case v = <-req.reply:
- return
- // Oops, wait a bit then try again.
- case <-err:
- time.Sleep(5e6)
- }
- }
- return
- }
- // Stop the receiver from receiving any further broadcasts.
- func (r *Receiver) Stop() {
- if !r.running {
- return
- }
- r.bcaster.leaving <- r.id
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement