Advertisement
Guest User

Untitled

a guest
Jan 8th, 2016
57
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 10.33 KB | None | 0 0
  1. import queues
  2. import locks
  3. import macros
  4.  
  5. type
  6. SchedulerCommandType = enum
  7. scDone,
  8. scYield,
  9. scWaitForCoroutine,
  10. scWaitForCoroutines,
  11. SchedulerCommand = object
  12. case kind: SchedulerCommandType
  13. of scDone: discard
  14. of scYield: discard
  15. of scWaitForCoroutine:
  16. coroutine: CoroutineBase
  17. of scWaitForCoroutines:
  18. coroutines: seq[CoroutineBase]
  19. Counter = ref object
  20. count: int
  21. waitingCoroutine: CoroutineBase
  22. CoroutineBase = ref object of RootObj
  23. iter: iterator(): SchedulerCommand
  24. counter: Counter
  25. Coroutine[T] = ref object of CoroutineBase
  26. when T is not void:
  27. result: T
  28.  
  29. type ThreadQueue[T] = object
  30. queue: Queue[T]
  31. mutex: Lock
  32. cond: Cond
  33.  
  34. proc initThreadQueue[T]: ThreadQueue[T] =
  35. result.queue = initQueue[T]()
  36. initLock(result.mutex)
  37. initCond(result.cond)
  38.  
  39. proc push[T](tq: var ThreadQueue[T], item: T) =
  40. tq.mutex.acquire()
  41. defer: tq.mutex.release()
  42. tq.queue.enqueue(item)
  43. tq.cond.signal()
  44.  
  45. proc pop[T](tq: var ThreadQueue[T]): T =
  46. tq.mutex.acquire()
  47. defer: tq.mutex.release()
  48. while tq.queue.len == 0:
  49. tq.cond.wait(tq.mutex)
  50. tq.queue.dequeue()
  51.  
  52. var queue = initThreadQueue[CoroutineBase]()
  53.  
  54. proc go(c: CoroutineBase) =
  55. queue.push(c)
  56.  
  57. proc schedule(c: CoroutineBase) =
  58. let cmd = c.iter()
  59. case cmd.kind
  60. of scDone:
  61. if c.counter != nil and atomicDec(c.counter.count) == 0:
  62. go c.counter.waitingCoroutine
  63. c.counter = nil
  64. of scYield:
  65. go c
  66. of scWaitForCoroutine:
  67. echo "not implemented"
  68. assert false
  69. of scWaitForCoroutines:
  70. let counter = Counter(count: cmd.coroutines.len, waitingCoroutine: c)
  71. for c in cmd.coroutines:
  72. c.counter = counter
  73. go c
  74.  
  75. proc worker(n: int) =
  76. echo "worker " & $n & " on duty"
  77. while true:
  78. let c = queue.pop()
  79. if c == nil:
  80. break
  81. schedule(c)
  82. echo "worker " & $n & " shutting down"
  83.  
  84. var workers: array[4, Thread[int]]
  85. for i in 0..high(workers):
  86. createThread(workers[i], cast[proc(n: int) {.gcsafe.}](worker), i)
  87.  
  88. proc shutdown() =
  89. for i in 0..high(workers):
  90. queue.push(nil)
  91.  
  92. template ppTree(e: expr) =
  93. echo "-------------------- AST ---------------------"
  94. echo treeRepr(e)
  95. echo "------------------- Code ---------------------"
  96. echo toStrLit(e)
  97. echo "----------------------------------------------"
  98.  
  99. proc wrapAwaitValue(tmpSym, cmd, n: NimNode): NimNode =
  100. result = newNimNode(nnkStmtList, n).add(
  101. newLetStmt(
  102. tmpSym,
  103. newCall(
  104. newIdentNode(!"coroutineAwaitValue"),
  105. cmd[1]
  106. )
  107. ),
  108. newNimNode(nnkYieldStmt).add(
  109. newDotExpr(tmpSym, newIdentNode(!"command"))
  110. ),
  111. )
  112.  
  113. # Let's recursively convert various aspects of a coroutine's body
  114. # 1. return expr
  115. # ->
  116. # retCoroutine.result = expr
  117. # yield SchedulerCommand(kind: scDone)
  118. #
  119. # 2. return
  120. # ->
  121. # retCoroutine.result = result
  122. # yield SchedulerCommand(kind: scDone)
  123. # or (if there is no result)
  124. # yield SchedulerCommand(kind: scDone)
  125. #
  126. # 3. await expr
  127. # ->
  128. # yield coroutineAwait(expr)
  129. #
  130. # 4. let x = await expr
  131. # ->
  132. # let tmp = coroutineAwaitValue(expr)
  133. # yield tmp.command
  134. # let x = tmp.value
  135. #
  136. # 5. var x = await expr (same as 4)
  137. # 6. x = await expr (same as 4)
  138. # 7. discard await expr (same as 4)
  139. # 8. try statements are not allowed
  140. proc convertToCoroutineBody(n, retSym: NimNode, hasResult: bool): NimNode =
  141. result = n
  142. case n.kind
  143. of nnkReturnStmt:
  144. result = newNimNode(nnkStmtList, n)
  145. if n[0].kind == nnkEmpty:
  146. # return
  147. if hasResult:
  148. result.add(
  149. newAssignment(
  150. newDotExpr(retSym, newIdentNode(!"result")),
  151. newIdentNode(!"result"),
  152. )
  153. )
  154. else:
  155. # return expr
  156. if not hasResult:
  157. error("Non-void return inside a void coroutine")
  158. result.add(
  159. newAssignment(
  160. newDotExpr(retSym, newIdentNode(!"result")),
  161. n[0],
  162. )
  163. )
  164. result.add(
  165. newNimNode(nnkYieldStmt, n).add(
  166. newNimNode(nnkObjConstr).add(
  167. bindSym"SchedulerCommand",
  168. newColonExpr(
  169. newIdentNode(!"kind"),
  170. bindSym"scDone"
  171. )
  172. )
  173. )
  174. )
  175. of nnkCommand, nnkCall:
  176. if n[0].kind == nnkIdent and n[0].ident == !"await":
  177. # await expr
  178. expectLen(n, 2)
  179. result = newNimNode(nnkYieldStmt, n).add(
  180. newCall(
  181. newIdentNode(!"coroutineAwait"),
  182. n[1]
  183. )
  184. )
  185. of nnkVarSection, nnkLetSection:
  186. let cmd = n[0][2]
  187. case cmd.kind
  188. of nnkCommand, nnkCall:
  189. if cmd[0].kind == nnkIdent and cmd[0].ident == !"await":
  190. # let x = await expr
  191. expectLen(cmd, 2)
  192. let tmpSym = genSym(nskLet, "await" & $n[0][0].ident)
  193. result = wrapAwaitValue(tmpSym, cmd, n).add(
  194. newNimNode(n.kind).add(
  195. newNimNode(nnkIdentDefs).add(
  196. n[0][0],
  197. newNimNode(nnkEmpty),
  198. newDotExpr(tmpSym, newIdentNode(!"value"))
  199. )
  200. )
  201. )
  202. else:
  203. discard
  204. of nnkAsgn:
  205. let cmd = n[1]
  206. case cmd.kind
  207. of nnkCommand, nnkCall:
  208. if cmd[0].kind == nnkIdent and cmd[0].ident == !"await":
  209. # x = await expr
  210. expectLen(cmd, 2)
  211. let tmpSym = genSym(nskLet, "await" & $n[0].ident)
  212. result = wrapAwaitValue(tmpSym, cmd, n).add(
  213. newAssignment(
  214. n[0],
  215. newDotExpr(tmpSym, newIdentNode(!"value"))
  216. )
  217. )
  218. else:
  219. discard
  220. of nnkDiscardStmt:
  221. let cmd = n[0]
  222. case cmd.kind
  223. of nnkCommand, nnkCall:
  224. if cmd[0].kind == nnkIdent and cmd[0].ident == !"await":
  225. # discard await x
  226. expectLen(cmd, 2)
  227. let tmpSym = genSym(nskLet, "awaitDiscard")
  228. result = wrapAwaitValue(tmpSym, cmd, n)
  229. else:
  230. discard
  231. of nnkTryStmt:
  232. error("try statements are not allowed in coroutine functions")
  233. else: discard
  234.  
  235. # TODO: implicit return?
  236.  
  237. for i in 0..<result.len:
  238. result[i] = convertToCoroutineBody(result[i], retSym, hasResult)
  239.  
  240. # We create a coroutine with an iterator here:
  241. # let retCoroutine = Coroutine[T]()
  242. # retCoroutine.iter = iterator() SchedulerCommand =
  243. # {.push warning[resultshadowed]: off.}
  244. # var result: T
  245. # {.pop.}
  246. # # <<< body >>>
  247. # retCoroutine.result = result
  248. # retCoroutine
  249. #
  250. # Existing body will be preprocessed and included as iterator body
  251. proc convertToCoroutine(n: NimNode): NimNode =
  252. #ppTree(n)
  253. if n.kind notin {nnkProcDef, nnkLambda}:
  254. error("Cannot transform this node kind into a coroutine")
  255.  
  256. hint("Converting " & $n[0].ident & " to coroutine")
  257. let unRetType = n[3][0]
  258. var retType: NimNode
  259.  
  260. case unRetType.kind
  261. of nnkBracketExpr:
  262. if unRetType[0].ident != !"Coroutine":
  263. error("Return type of a coroutine should be Coroutine[T] or void")
  264. retType = unRetType[1]
  265. of nnkEmpty:
  266. retType = newIdentNode(!"void") # all good, no return type means void
  267. else:
  268. error("Return type of a coroutine should be Coroutine[T] or void")
  269.  
  270. let retSym = genSym(nskLet, "retCoroutine")
  271. let hasResult = retType.ident != !"void"
  272. let coBody = newNimNode(nnkStmtList, n[6]) # second arg is used for line info
  273. let itBody = convertToCoroutineBody(n[6], retSym, hasResult)
  274. if hasResult:
  275. itBody.insert(0,
  276. newNimNode(nnkPragma).add(
  277. newIdentNode("push"),
  278. newNimNode(nnkExprColonExpr).add(
  279. newNimNode(nnkBracketExpr).add(
  280. newIdentNode("warning"),
  281. newIdentNode("resultshadowed")
  282. ),
  283. newIdentNode("off")
  284. )
  285. )
  286. )
  287. itBody.insert(1,
  288. newNimNode(nnkVarSection, n[6]).add(
  289. newIdentDefs(newIdentNode("result"), retType)
  290. )
  291. )
  292. itBody.insert(2,
  293. newNimNode(nnkPragma).add(newIdentNode("pop"))
  294. )
  295. itBody.add(
  296. newAssignment(
  297. newDotExpr(retSym, newIdentNode(!"result")),
  298. newIdentNode(!"result"),
  299. )
  300. )
  301. else:
  302. discard
  303.  
  304. coBody.add(
  305. newLetStmt(
  306. retSym,
  307. newCall(
  308. newNimNode(nnkBracketExpr, n[6]).add(newIdentNode(!"Coroutine"), retType)
  309. )
  310. )
  311. )
  312. coBody.add(
  313. newAssignment(
  314. newDotExpr(retSym, newIdentNode(!"iter")),
  315. newProc(
  316. procType = nnkIteratorDef,
  317. params = [bindSym"SchedulerCommand"],
  318. body = itBody,
  319. )
  320. )
  321. )
  322. coBody.add(retSym)
  323.  
  324. result = n
  325. # TODO: do I need this?
  326. # for i in 0..<result[4].len:
  327. # if result[4][i].kind == nnkIdent and result[4][i].ident == !"coroutine":
  328. # result[4].del(i)
  329. result[6] = coBody
  330. #ppTree(result)
  331.  
  332. macro coroutine(n: stmt): stmt {.immediate.} =
  333. convertToCoroutine(n)
  334.  
  335. #==============================================================================
  336.  
  337. proc coroutineAwait(coroutines: seq[CoroutineBase]): SchedulerCommand =
  338. SchedulerCommand(kind: scWaitForCoroutines, coroutines: coroutines)
  339.  
  340. proc computeNumber(i, n: int): Coroutine[int] {.coroutine.} =
  341. echo "computing... ", i
  342. var count = 1
  343. for i in 0..<n:
  344. count += i
  345. return count
  346.  
  347. proc computeAll(n: int): Coroutine[void] {.coroutine.} =
  348. var s = newSeq[CoroutineBase]()
  349. for i in 0..<100:
  350. s.add(computeNumber(i, n))
  351. await s
  352. echo "done"
  353. shutdown()
  354.  
  355. queue.push(computeAll(10000))
  356. joinThreads(workers)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement