Advertisement
Pauan

sjs:channel

Sep 22nd, 2014
488
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. @ = require([
  2.   { id: "sjs:assert", name: "assert" },
  3.   { id: "sjs:sequence" }
  4. ])
  5.  
  6.  
  7. exports.pending_limit = 1024
  8.  
  9. exports.closed = {}
  10.  
  11. exports.take = function (channel) {
  12.   return channel.take()
  13. }
  14.  
  15. exports.put = function (channel, value) {
  16.   return channel.put(value)
  17. }
  18.  
  19. exports.close = function (channel) {
  20.   return channel.close()
  21. }
  22.  
  23. exports.isClosed = function (channel) {
  24.   return channel.isClosed()
  25. }
  26.  
  27.  
  28. function wake(array, value) {
  29.   var f = array.shift()
  30.   return f(value)
  31. }
  32.  
  33. function sleep(chan_this, value) {
  34.   if (chan_this.length < exports.pending_limit) {
  35.     // TODO why doesn't this hold work ?
  36.     // hold(0)
  37.     waitfor (var result) {
  38.       function done(x) {
  39.         // TODO is this a good spot for this ?
  40.         // TODO is there a better way than using hold(0) ?
  41.         hold(0)
  42.         resume(x)
  43.         return value
  44.       }
  45.       chan_this.push(done)
  46.     } retract {
  47.       var index = chan_this.indexOf(done)
  48.       console.log("RETRACTED", index)
  49.       @assert.isNot(index, -1) // TODO test if this is ever possible
  50.       if (index !== -1) {
  51.         chan_this.splice(index, 1)
  52.       }
  53.     }
  54.     return result
  55.   } else {
  56.     throw new Error("cannot have more than #{exports.pending_limit} pending operations on a channel")
  57.   }
  58. }
  59.  
  60.  
  61. // The behavior is the same as Clojure's core.async channels (http://clojure.github.io/core.async/)
  62. exports.Channel = function () {
  63.   var closed   = false
  64.     , receiver = []
  65.     , sender   = []
  66.  
  67.   var o = @Stream(function (emit) {
  68.     while (true) {
  69.       // TODO I would like to use this.take, but I can't
  70.       var v = o.take()
  71.       if (v === exports.closed) {
  72.         break
  73.       } else {
  74.         emit(v)
  75.       }
  76.     }
  77.   })
  78.  
  79.   // Gross that it does all this stuff in here rather than on the prototype
  80.   o.take = function () {
  81.     if (sender.length) {
  82.       return wake(sender, null)
  83.     } else if (closed) {
  84.       return exports.closed
  85.     } else {
  86.       return sleep(receiver, null)
  87.     }
  88.   }
  89.  
  90.   // TODO this used to return something even though it didn't use return! find out why
  91.   o.put = function (value) {
  92.     if (closed) {
  93.       return false
  94.     } else if (receiver.length) {
  95.       wake(receiver, value)
  96.       return true
  97.     } else {
  98.       sleep(sender, value)
  99.       return true
  100.     }
  101.   }
  102.  
  103.   o.close = function () {
  104.     closed = true
  105.  
  106.     while (receiver.length) {
  107.       wake(receiver, exports.closed)
  108.     }
  109.   }
  110.  
  111.   o.isClosed = function () {
  112.     return closed
  113.   }
  114.  
  115.   return o
  116. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement