Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // awaitable queue
- class AwaitableQueue {
- constructor(){
- this.queue = []
- this.resolvers = []
- }
- pop(){
- if(this.queue.length > 0) return Promise.resolve(this.queue.shift());
- return new Promise((resolve, reject) => this.resolvers.push(resolve) )
- }
- push(payload){
- if(this.resolvers.length > 0) this.resolvers.shift()(payload)
- else this.queue.push(payload);
- }
- }
- // session keeps track of closures and callbacks
- class Session extends AwaitableQueue {
- constructor(){
- super()
- this.closures = []
- this.callbacks = []
- }
- pushClosure(fn){ return this.closures.push(fn) - 1 }
- pushCallback(fn){ return this.callbacks.push(fn) - 1 }
- }
- // creates functions that create call messages
- function dispatch(sess, func){
- return (...args) => new Promise((resolve, reject) => {
- let funcs = []
- let param = args.map(k => typeof k === 'function' ?
- (funcs.push(sess.pushClosure(k)), null) : (funcs.push(null), k))
- sess.push({ type: 'call', func: func, param: param, funcs: funcs, callback: sess.pushCallback(resolve) })
- })
- }
- // interprets messages, invokes functions and sends return messages
- function handle(sess, msg, handler){
- if(msg.type === 'call'){
- let fn = msg.func === -1 ? handler : sess.closures[msg.func]
- Promise.resolve(fn(...msg.param.map((k, i) => msg.funcs[i] === null ? k : dispatch(sess, msg.funcs[i]) )))
- .then(value => sess.push({ type: 'return', value: value, callback: msg.callback, func: msg.func }))
- }else if(msg.type === 'return'){
- sess.callbacks[msg.callback](msg.value)
- delete sess.callbacks[msg.callback]
- }
- }
- // mock server / fetch api
- // should be easy to replace with real functions
- var endpoints = {}
- async function fetch(path, opts){
- let data = await endpoints[path](JSON.parse(opts.body))
- console.log('POST ' + path + ':', opts.body, '-->', JSON.stringify(data))
- return { async json(){ return data } }
- }
- async function serve(path, fn){
- endpoints[path] = fn;
- }
- let sessions = []
- // serverside register handler for a particular path
- function register(path, handler){
- serve(path, async function(data){
- if(data.func === -1 && data.type === 'call')
- data.sess = sessions.push(new Session()) - 1; // allocate a new queue
- let sess = sessions[data.sess];
- if(!sess) throw new Error('session not found')
- handle(sess, data, handler)
- let resp = await sess.pop()
- if(resp.func === -1 && resp.type === 'return') delete sessions[data.sess]; // deallocate a queue
- return { ...resp, sess: data.sess };
- })
- }
- // clientside invoke function mounted at particular path
- async function call(path, ...args){
- let sess = new Session(), sid;
- dispatch(sess, -1)(...args)
- while(true){
- let data = { ...(await sess.pop()), sess: sid }
- let resp = await (await fetch(path, { body: JSON.stringify(data) })).json()
- handle(sess, resp, x => { throw new Error('no default handler for client agent') })
- sid = resp.sess;
- if(resp.func === -1 && resp.type === 'return') return resp.value;
- }
- }
- // register an example reduce endpoint
- register('/reduce', async function(list, fn){
- if(list.length === 0) return null;
- if(list.length === 1) return list[0];
- let init = await fn(list[0], list[1])
- for(let i = 2; i < list.length; i++){
- init = await fn(init, list[i])
- }
- return init
- })
- ;(async function(){
- let result = await call('/reduce', [1, 2, 3, 4], (a, b) => {
- return a + b
- })
- console.log('final result: ', result)
- })()
- // register('/bidirectional', async function(list, fn){
- // return await fn(list[0], list[1],
- // (a, b) => a + b)
- // })
- // call('/bidirectional', [1, 2], async function(a, b, op){
- // return await op(a, b)
- // }).then(k => console.log(k))
Add Comment
Please, Sign In to add comment