Advertisement
Guest User

Untitled

a guest
Jul 22nd, 2017
58
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Go 5.24 KB | None | 0 0
  1. /*
  2.     Copyright (C) 2011 Sam Fredrickson
  3.     All rights reserved.
  4.  
  5.     The broadcast package implements a way to send values to multiple receivers.
  6.     Inspired by a post on rogpeppe.wordpress.com.
  7. */
  8. package broadcast
  9.  
  10. import (
  11.     "container/vector"
  12.     "time"
  13. )
  14.  
  15. // A broadcaster sends values to each of its receivers.
  16. type Broadcaster struct {
  17.     // Map of receivers waiting on this broadcaster.
  18.     receivers map[int]*Receiver
  19.     // Channel of incoming broadcasts to be sent.
  20.     incoming chan interface{}
  21.     // Channel of requests to add a receiver.
  22.     joining chan joinRequest
  23.     // Channel of receiver hash keys to remove from the map.
  24.     leaving chan int
  25. }
  26.  
  27. // A receiver waits for values from a single broadcaster. Received values are
  28. // buffered so that sleeping readers don't impede the broadcaster. Each receiver
  29. // has its own buffer, so it's advisible to try to keep the readers up to speed
  30. // to avoid excessive memory usage.
  31. type Receiver struct {
  32.     // The hash key in the broadcaster's map of receivers for this one.
  33.     id int
  34.     // Whether or not the receiver should continue waiting for broadcasts.
  35.     running bool
  36.     // The broadcaster for this receiver.
  37.     bcaster *Broadcaster
  38.     // Channel for incoming broadcasts.
  39.     incoming chan interface{}
  40.     // Channel for incoming receive requests.
  41.     requests chan receiveRequest
  42.     // Buffer of received broadcasts.
  43.     buffer vector.Vector
  44. }
  45.  
  46. type joinRequest struct {
  47.     // The requesting receiver
  48.     receiver *Receiver
  49.     // The result of the attempt to join the broadcaster.
  50.     result chan bool
  51. }
  52.  
  53. type receiveRequest struct {
  54.     // The channel on which to reply a broadcast.
  55.     reply chan interface{}
  56.     // The channel on which to reply an error.
  57.     err chan bool
  58. }
  59.  
  60. // Create a new broadcaster.
  61. func New() *Broadcaster {
  62.     rec := make(map[int]*Receiver)
  63.     inc := make(chan interface{})
  64.     ent := make(chan joinRequest)
  65.     lev := make(chan int)
  66.     brd := &Broadcaster{rec, inc, ent, lev}
  67.     go broadcasterLoop(brd)
  68.     return brd
  69. }
  70.  
  71. // The main broadcaster loop. It monitors the channels and responds
  72. // appropriately. All modifications occur in the goroutine running this, so the
  73. // structure should be concurrent-safe.
  74. func broadcasterLoop(b *Broadcaster) {
  75.     last_id := 0
  76.  
  77.     for {
  78.         select {
  79.         // If a message is incoming, broadcast it to all receivers.
  80.         case v := <-b.incoming:
  81.             for i := range b.receivers {
  82.                 b.receivers[i].incoming <- v
  83.             }
  84.         // If a new receiver is joining, add them and increment id.
  85.         case req := <-b.joining:
  86.             req.receiver.id = last_id
  87.             // If a receiver with this id already exists, don't add it.
  88.             if _, ok := b.receivers[last_id]; ok {
  89.                 req.receiver.running = false
  90.                 req.result <- false
  91.             } else {
  92.                 b.receivers[last_id] = req.receiver
  93.                 req.result <- true
  94.             }
  95.             last_id++
  96.         // If a receiver is leaving, remove from map
  97.         case i := <-b.leaving:
  98.             if r, ok := b.receivers[i]; ok {
  99.                 r.running = false
  100.                 b.receivers[i] = nil, false
  101.             }
  102.         }
  103.     }
  104. }
  105.  
  106. // The main receiver loop. Like the broadcaster one.
  107. func receiverLoop(r *Receiver) {
  108.     for r.running {
  109.         select {
  110.         // If a broadcast is incoming, push it to the buffer.
  111.         case v := <-r.incoming:
  112.             r.buffer.Push(v)
  113.         // If a request for next broadcast available,
  114.         case req := <-r.requests:
  115.             // If nothing to broadcast, inform Receive() that of the error.
  116.             if r.buffer.Len() == 0 {
  117.                 req.err <- true
  118.             } else {
  119.                 // Reply with first available broadcast, and remove from buffer.
  120.                 v := r.buffer.At(0)
  121.                 r.buffer.Delete(0)
  122.                 req.reply <- v
  123.             }
  124.         }
  125.     }
  126.  
  127.     // Might as well ensure the receiver is disposed, it doesn't hurt.
  128.     disposeReceiver(r)
  129. }
  130.  
  131. func disposeReceiver(r *Receiver) {
  132.     r.running = false
  133.     if !closed(r.incoming) {
  134.         close(r.incoming)
  135.     }
  136.     if !closed(r.requests) {
  137.         close(r.requests)
  138.     }
  139.     r.buffer.Resize(0, 0)
  140. }
  141.  
  142. // Broadcast a value to all subscribers to this broadcaster.
  143. func (b *Broadcaster) Broadcast(v interface{}) {
  144.     b.incoming <- v
  145. }
  146.  
  147. // Subscribe to the broadcaster, using the returned receiver to receive messages.
  148. // If an error occurs, the result will be nil.
  149. func (b *Broadcaster) Listen() *Receiver {
  150.     inc := make(chan interface{})
  151.     req := make(chan receiveRequest)
  152.     var buf vector.Vector
  153.     r := &Receiver{0, true, b, inc, req, buf}
  154.     joinReq := joinRequest{r, make(chan bool)}
  155.  
  156.     // Try to join the broadcaster.
  157.     b.joining <- joinReq
  158.  
  159.     if <-joinReq.result {
  160.         go receiverLoop(r)
  161.     } else {
  162.         disposeReceiver(r)
  163.         r = nil
  164.     }
  165.  
  166.     return r
  167. }
  168.  
  169. // Receive and return the next message broadcast to this receiver.
  170. // Blocks until a value is available to return.
  171. func (r *Receiver) Receive() (v interface{}) {
  172.     if !r.running {
  173.         return nil
  174.     }
  175.  
  176.     // Make a request for next broadcast on this channel.
  177.     reply := make(chan interface{})
  178.     err := make(chan bool)
  179.     defer close(reply)
  180.     defer close(err)
  181.     req := receiveRequest{reply, err}
  182.  
  183.     for r.running {
  184.         if closed(r.requests) {
  185.             return
  186.         }
  187.  
  188.         // Inform the receiver we want a value.
  189.         r.requests <- req
  190.  
  191.         select {
  192.         // Yay! Done.
  193.         case v = <-req.reply:
  194.             return
  195.         // Oops, wait a bit then try again.
  196.         case <-err:
  197.             time.Sleep(5e6)
  198.         }
  199.     }
  200.  
  201.     return
  202. }
  203.  
  204. // Stop the receiver from receiving any further broadcasts.
  205. func (r *Receiver) Stop() {
  206.     if !r.running {
  207.         return
  208.     }
  209.  
  210.     r.bcaster.leaving <- r.id
  211. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement