Advertisement
Guest User

Untitled

a guest
Jul 26th, 2017
87
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.82 KB | None | 0 0
  1. import macros, cpuinfo
  2.  
  3. when not compileOption("threads"):
  4. {.error: "ThreadPool requires --threads:on compiler option".}
  5.  
  6. type
  7. ThreadPool* = ref object
  8. chanTo: ChannelTo # Tasks are added to this channel
  9. chanFrom: ChannelFrom # Results are read from this channel
  10. threads: seq[ThreadType]
  11. maxThreads: int
  12.  
  13. FlowVar*[T] = ref object
  14. tp: ThreadPool
  15. when T isnot void:
  16. v: T
  17. isComplete: bool
  18.  
  19. MsgTo = ref object {.inheritable, pure.}
  20. action: proc(m: MsgTo, chanFrom: ChannelFromPtr) {.nimcall.}
  21. flowVar: pointer
  22. complete: bool
  23.  
  24. MsgFrom = ref object {.inheritable, pure.}
  25. writeResult: proc(m: MsgFrom) {.nimcall.}
  26. flowVar: pointer
  27.  
  28. ConcreteMsgFrom[T] = ref object of MsgFrom
  29. when T isnot void:
  30. v: T
  31.  
  32. ChannelTo = Channel[MsgTo]
  33. ChannelFrom = Channel[MsgFrom]
  34.  
  35. ChannelToPtr = ptr ChannelTo
  36. ChannelFromPtr = ptr ChannelFrom
  37.  
  38. ThreadProcArgs = object
  39. chanTo: ChannelToPtr
  40. chanFrom: ChannelFromPtr
  41.  
  42. ThreadType = Thread[ThreadProcArgs]
  43.  
  44.  
  45. proc cleanupAux(tp: ThreadPool) =
  46. var msg: MsgTo
  47. msg.new()
  48. msg.complete = true
  49. for i in 0 ..< tp.threads.len:
  50. tp.chanTo.send(msg)
  51. joinThreads(tp.threads)
  52.  
  53. proc sync*(tp: ThreadPool) =
  54. tp.cleanupAux()
  55. tp.threads.setLen(0)
  56.  
  57. proc finalize(tp: ThreadPool) =
  58. tp.cleanupAux()
  59. tp.chanTo.close()
  60. tp.chanFrom.close()
  61.  
  62. proc threadProc(args: ThreadProcArgs) {.thread.} =
  63. var burstId = 0
  64. while true:
  65. let m = args.chanTo[].recv()
  66. if m.complete:
  67. break
  68. m.action(m, args.chanFrom)
  69. deallocHeap(true, false)
  70.  
  71. proc startThreads(tp: ThreadPool) =
  72. assert(tp.threads.len == 0)
  73. if tp.threads.isNil:
  74. tp.threads = newSeq[ThreadType](tp.maxThreads)
  75. else:
  76. tp.threads.setLen(tp.maxThreads)
  77.  
  78. var args = ThreadProcArgs(chanTo: addr tp.chanTo, chanFrom: addr tp.chanFrom)
  79. for i in 0 ..< tp.maxThreads:
  80. createThread(tp.threads[i], threadProc, args)
  81.  
  82. proc newThreadPool*(maxThreads: int): ThreadPool =
  83. result.new(finalize)
  84. result.maxThreads = maxThreads
  85. result.chanTo.open()
  86. result.chanFrom.open()
  87.  
  88. proc newThreadPool*(): ThreadPool {.inline.} =
  89. newThreadPool(countProcessors())
  90.  
  91. proc newSerialThreadPool*(): ThreadPool {.inline.} =
  92. newThreadPool(1)
  93.  
  94. proc dispatchMessage(tp: ThreadPool, m: MsgTo) =
  95. if tp.threads.len == 0:
  96. tp.startThreads()
  97. tp.chanTo.send(m)
  98.  
  99. proc dispatchMessageWithFlowVar[T](tp: ThreadPool, m: MsgTo): FlowVar[T] =
  100. result.new()
  101. result.tp = tp
  102. GC_ref(result)
  103. m.flowVar = cast[pointer](result)
  104. tp.dispatchMessage(m)
  105.  
  106. proc sendBack[T](v: T, c: ChannelFromPtr, flowVar: pointer) =
  107. if not flowVar.isNil:
  108. var msg: ConcreteMsgFrom[T]
  109. msg.new()
  110. when T isnot void:
  111. msg.v = v
  112. msg.writeResult = proc(m: MsgFrom) {.nimcall.} =
  113. let m = cast[ConcreteMsgFrom[T]](m)
  114. let fv = cast[FlowVar[T]](m.flowVar)
  115. fv.tp = nil
  116. when T isnot void:
  117. fv.v = m.v
  118. fv.isComplete = true
  119. GC_unref(fv)
  120. msg.flowVar = flowVar
  121. c[].send(msg)
  122.  
  123. proc spawnAux(tp: NimNode, e: NimNode, withFlowVar: bool): NimNode =
  124. let msgTypeName = genSym(nskType, "MsgSub")
  125. let dispatchProcName = genSym(nskProc, "dispatchProc")
  126. let msgParamIdent = newIdentNode("m")
  127.  
  128. let origProcName = e[0]
  129. let procTypParams = origProcName.getTypeInst()[0]
  130.  
  131. let msgFields = newNimNode(nnkRecList)
  132.  
  133. let theCall = newCall(origProcName)
  134.  
  135. let msgObjConstr = newNimNode(nnkObjConstr).add(
  136. msgTypeName,
  137. newNimNode(nnkExprColonExpr).add(
  138. newIdentNode("action"),
  139. dispatchProcName
  140. )
  141. )
  142.  
  143. var iParam = 0
  144. for i in 1 ..< procTypParams.len:
  145. # msgFields.add(copyNimTree(procTypParams[i]))
  146. for j in 0 ..< procTypParams[i].len - 2:
  147. let fieldIdent = newIdentNode($procTypParams[i][j])
  148. msgFields.add(newNimNode(nnkIdentDefs).add(fieldIdent, procTypParams[i][^2], newEmptyNode()))
  149. theCall.add(newNimNode(nnkDotExpr).add(
  150. newNimNode(nnkCast).add(msgTypeName, msgParamIdent),
  151. fieldIdent))
  152. msgObjConstr.add(newNimNode(nnkExprColonExpr).add(fieldIdent, e[iParam + 1]))
  153. inc iParam
  154.  
  155. let msgTypDef = newNimNode(nnkTypeSection).add(newNimNode(nnkTypeDef).add(
  156. msgTypeName,
  157. newEmptyNode(),
  158. newNimNode(nnkRefTy).add(
  159. newNimNode(nnkObjectTy).add(
  160. newEmptyNode(),
  161. newNimNode(nnkOfInherit).add(bindSym"MsgTo"),
  162. msgFields
  163. )
  164. )
  165. ))
  166.  
  167. let chanFromIdent = newIdentNode("chanFrom")
  168.  
  169. let dispatchProc = newProc(dispatchProcName, params = [
  170. newEmptyNode(),
  171. newNimNode(nnkIdentDefs).add(
  172. msgParamIdent,
  173. bindSym"MsgTo",
  174. newEmptyNode()
  175. ),
  176. newNimNode(nnkIdentDefs).add(
  177. chanFromIdent,
  178. bindSym"ChannelFromPtr",
  179. newEmptyNode()
  180. )
  181. ],
  182. body = newCall(bindSym"sendBack", theCall, chanFromIdent, newNimNode(nnkDotExpr).add(
  183. msgParamIdent, newIdentNode("flowVar")))
  184. )
  185.  
  186. dispatchProc.addPragma(newIdentNode("gcsafe"))
  187.  
  188. var dispatchCall: NimNode
  189. if withFlowVar:
  190. dispatchCall = newCall(newNimNode(nnkBracketExpr).add(bindSym"dispatchMessageWithFlowVar", procTypParams[0]), tp, msgObjConstr)
  191. else:
  192. dispatchCall = newCall(bindSym"dispatchMessage", tp, msgObjConstr)
  193.  
  194. result = newNimNode(nnkStmtList).add(
  195. msgTypDef,
  196. dispatchProc,
  197. dispatchCall
  198. )
  199.  
  200. macro spawn*(tp: ThreadPool, e: typed{nkCall}): untyped =
  201. spawnAux(tp, e, false)
  202.  
  203. macro spawnFV*(tp: ThreadPool, e: typed{nkCall}): untyped =
  204. spawnAux(tp, e, true)
  205.  
  206. proc nextMessage(tp: ThreadPool) =
  207. let msg = tp.chanFrom.recv()
  208. msg.writeResult(msg)
  209.  
  210. proc read*[T](v: FlowVar[T]): T =
  211. while not v.isComplete:
  212. v.tp.nextMessage()
  213. result = v.v
  214.  
  215. when isMainModule:
  216. import os
  217.  
  218. type Foo = ref object
  219.  
  220. proc finalize(f: Foo) =
  221. echo "foo finalized"
  222.  
  223. block:
  224. proc helloWorld(a: int): int =
  225. return 123 + a
  226.  
  227. let tp = newThreadPool(4)
  228. const numCalcs = 100
  229. var results = newSeq[FlowVar[int]](numCalcs)
  230. for i in 0 ..< numCalcs:
  231. results[i] = tp.spawnFV helloWorld(i)
  232.  
  233. for i in 0 ..< numCalcs:
  234. assert(results[i].read() == 123 + i)
  235.  
  236. block:
  237. var ga = 0
  238. proc helloWorld(a: int) =
  239. atomicInc(ga)
  240. sleep(300)
  241.  
  242. let tp = newThreadPool()
  243. const numCalcs = 10
  244. for i in 0 ..< numCalcs:
  245. tp.spawn helloWorld(i)
  246. tp.sync()
  247. assert ga == numCalcs
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement