Guest User

Untitled

a guest
Jan 3rd, 2018
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 3.86 KB | None | 0 0
  1. // awaitable queue
  2. class AwaitableQueue {
  3. constructor(){
  4. this.queue = []
  5. this.resolvers = []
  6. }
  7. pop(){
  8. if(this.queue.length > 0) return Promise.resolve(this.queue.shift());
  9. return new Promise((resolve, reject) => this.resolvers.push(resolve) )
  10. }
  11. push(payload){
  12. if(this.resolvers.length > 0) this.resolvers.shift()(payload)
  13. else this.queue.push(payload);
  14. }
  15. }
  16.  
  17. // session keeps track of closures and callbacks
  18. class Session extends AwaitableQueue {
  19. constructor(){
  20. super()
  21. this.closures = []
  22. this.callbacks = []
  23. }
  24. pushClosure(fn){ return this.closures.push(fn) - 1 }
  25. pushCallback(fn){ return this.callbacks.push(fn) - 1 }
  26. }
  27.  
  28.  
  29. // creates functions that create call messages
  30. function dispatch(sess, func){
  31. return (...args) => new Promise((resolve, reject) => {
  32. let funcs = []
  33. let param = args.map(k => typeof k === 'function' ?
  34. (funcs.push(sess.pushClosure(k)), null) : (funcs.push(null), k))
  35. sess.push({ type: 'call', func: func, param: param, funcs: funcs, callback: sess.pushCallback(resolve) })
  36. })
  37. }
  38.  
  39. // interprets messages, invokes functions and sends return messages
  40. function handle(sess, msg, handler){
  41. if(msg.type === 'call'){
  42. let fn = msg.func === -1 ? handler : sess.closures[msg.func]
  43. Promise.resolve(fn(...msg.param.map((k, i) => msg.funcs[i] === null ? k : dispatch(sess, msg.funcs[i]) )))
  44. .then(value => sess.push({ type: 'return', value: value, callback: msg.callback, func: msg.func }))
  45. }else if(msg.type === 'return'){
  46. sess.callbacks[msg.callback](msg.value)
  47. delete sess.callbacks[msg.callback]
  48. }
  49. }
  50.  
  51.  
  52. // mock server / fetch api
  53. // should be easy to replace with real functions
  54. var endpoints = {}
  55. async function fetch(path, opts){
  56. let data = await endpoints[path](JSON.parse(opts.body))
  57. console.log('POST ' + path + ':', opts.body, '-->', JSON.stringify(data))
  58. return { async json(){ return data } }
  59. }
  60. async function serve(path, fn){
  61. endpoints[path] = fn;
  62. }
  63.  
  64.  
  65. let sessions = []
  66.  
  67. // serverside register handler for a particular path
  68. function register(path, handler){
  69. serve(path, async function(data){
  70. if(data.func === -1 && data.type === 'call')
  71. data.sess = sessions.push(new Session()) - 1; // allocate a new queue
  72. let sess = sessions[data.sess];
  73. if(!sess) throw new Error('session not found')
  74. handle(sess, data, handler)
  75. let resp = await sess.pop()
  76. if(resp.func === -1 && resp.type === 'return') delete sessions[data.sess]; // deallocate a queue
  77. return { ...resp, sess: data.sess };
  78. })
  79. }
  80.  
  81. // clientside invoke function mounted at particular path
  82. async function call(path, ...args){
  83. let sess = new Session(), sid;
  84. dispatch(sess, -1)(...args)
  85. while(true){
  86. let data = { ...(await sess.pop()), sess: sid }
  87. let resp = await (await fetch(path, { body: JSON.stringify(data) })).json()
  88. handle(sess, resp, x => { throw new Error('no default handler for client agent') })
  89. sid = resp.sess;
  90. if(resp.func === -1 && resp.type === 'return') return resp.value;
  91. }
  92. }
  93.  
  94.  
  95. // register an example reduce endpoint
  96. register('/reduce', async function(list, fn){
  97. if(list.length === 0) return null;
  98. if(list.length === 1) return list[0];
  99. let init = await fn(list[0], list[1])
  100. for(let i = 2; i < list.length; i++){
  101. init = await fn(init, list[i])
  102. }
  103. return init
  104. })
  105.  
  106. ;(async function(){
  107. let result = await call('/reduce', [1, 2, 3, 4], (a, b) => {
  108. return a + b
  109. })
  110. console.log('final result: ', result)
  111. })()
  112.  
  113.  
  114.  
  115. // register('/bidirectional', async function(list, fn){
  116. // return await fn(list[0], list[1],
  117. // (a, b) => a + b)
  118. // })
  119.  
  120. // call('/bidirectional', [1, 2], async function(a, b, op){
  121. // return await op(a, b)
  122. // }).then(k => console.log(k))
Add Comment
Please, Sign In to add comment