Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- @ = require([
- { id: "sjs:assert", name: "assert" },
- { id: "sjs:sequence" }
- ])
- exports.pending_limit = 1024
- exports.closed = {}
- exports.take = function (channel) {
- return channel.take()
- }
- exports.put = function (channel, value) {
- return channel.put(value)
- }
- exports.close = function (channel) {
- return channel.close()
- }
- exports.isClosed = function (channel) {
- return channel.isClosed()
- }
- function wake(array, value) {
- var f = array.shift()
- return f(value)
- }
- function sleep(chan_this, value) {
- if (chan_this.length < exports.pending_limit) {
- // TODO why doesn't this hold work ?
- // hold(0)
- waitfor (var result) {
- function done(x) {
- // TODO is this a good spot for this ?
- // TODO is there a better way than using hold(0) ?
- hold(0)
- resume(x)
- return value
- }
- chan_this.push(done)
- } retract {
- var index = chan_this.indexOf(done)
- console.log("RETRACTED", index)
- @assert.isNot(index, -1) // TODO test if this is ever possible
- if (index !== -1) {
- chan_this.splice(index, 1)
- }
- }
- return result
- } else {
- throw new Error("cannot have more than #{exports.pending_limit} pending operations on a channel")
- }
- }
- // The behavior is the same as Clojure's core.async channels (http://clojure.github.io/core.async/)
- exports.Channel = function () {
- var closed = false
- , receiver = []
- , sender = []
- var o = @Stream(function (emit) {
- while (true) {
- // TODO I would like to use this.take, but I can't
- var v = o.take()
- if (v === exports.closed) {
- break
- } else {
- emit(v)
- }
- }
- })
- // Gross that it does all this stuff in here rather than on the prototype
- o.take = function () {
- if (sender.length) {
- return wake(sender, null)
- } else if (closed) {
- return exports.closed
- } else {
- return sleep(receiver, null)
- }
- }
- // TODO this used to return something even though it didn't use return! find out why
- o.put = function (value) {
- if (closed) {
- return false
- } else if (receiver.length) {
- wake(receiver, value)
- return true
- } else {
- sleep(sender, value)
- return true
- }
- }
- o.close = function () {
- closed = true
- while (receiver.length) {
- wake(receiver, exports.closed)
- }
- }
- o.isClosed = function () {
- return closed
- }
- return o
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement