Advertisement
Guest User

Untitled

a guest
Mar 22nd, 2017
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 19.91 KB | None | 0 0
  1. use MONKEY;
  2.  
  3. class ConcurrentChain does Iterable { # perhaps this should be HyperSeq ?
  4. has $!source; # the source iterator
  5. has int $!batch; # batch size
  6. has int $!degree; # degree size (number of workers)
  7. has str $!method; # name of method: "hyper" or "race"
  8. has @!actions; # Pairs of actions: Method => Capture
  9.  
  10. # Need to special case "map", to handle "for {}", which is code-genned
  11. # as a .map, but with a :item named parameter specified
  12. my $map-method := List.^find_method("map");
  13. method map(ConcurrentChain:D: :$item, |c) {
  14. if defined($item) {
  15. Seq.new(self.iterator).map(:$item, |c)
  16. }
  17. elsif c.AT-POS(0).has-phaser("LAST") {
  18. Seq.new(self.iterator).map(|c) # must use serial version
  19. }
  20. else { # concurrent, save as action to do
  21. @!actions.push(Pair.new($map-method,c));
  22. self
  23. }
  24. }
  25.  
  26. # Need to special case "grep" in case the block has a LAST phaser.
  27. my $grep-method := List.^find_method("grep");
  28. method grep(ConcurrentChain:D: |c) {
  29. if c.AT-POS(0).has-phaser("LAST") {
  30. Seq.new(self.iterator).grep(|c) # must use serial version
  31. }
  32. else { # concurrent, save as action to do
  33. @!actions.push(Pair.new($grep-method,c));
  34. self
  35. }
  36. }
  37.  
  38. # concurrent methods that should just save action for later
  39. # BEGIN for <grep> {
  40. # my $call-method := List.^find_method($_);
  41. # my $chain-method := method (ConcurrentChain:D: |c) {
  42. # @!actions.push(Pair.new($call-method,c));
  43. # self
  44. # }
  45. # $chain-method.set_name($_);
  46. # ::?CLASS.^add_method($_,$chain-method)
  47. # }
  48.  
  49. # serial methods
  50. BEGIN for <gist keys kv perl repeated squish Str unique values> {
  51. my $call-method := List.^find_method($_);
  52. my $chain-method := method (ConcurrentChain:D: |c) {
  53. self.iterator.push-all(my $buffer := IterationBuffer.CREATE);
  54. $call-method(
  55. nqp::p6bindattrinvres(nqp::create(List),List,'$!reified',$buffer)
  56. )
  57. }
  58. $chain-method.set_name($_);
  59. ::?CLASS.^add_method($_,$chain-method)
  60. }
  61.  
  62. method !SET-SELF(\source, \batch, \degree, $method) {
  63. $!source := source;
  64. $!batch = batch;
  65. $!degree = degree;
  66. $!method = $method;
  67. self
  68. }
  69. method new(\source, \batch, \degree, $method) {
  70. batch <= 0
  71. ?? X::Invalid::Value.new(
  72. :$method, :name<batch>, :value(batch)).throw
  73. !! degree <= 0
  74. ?? X::Invalid::Value.new(
  75. :$method,:name<degree>,:value(degree)).throw
  76. !! self.CREATE!SET-SELF(source, batch, degree, $method)
  77. }
  78.  
  79. # The serial iterator producing the results
  80. method iterator() {
  81. @!actions # we can run actions
  82. ?? $!method eq 'hyper'
  83. ?? Rakudo::Iterator.HyperActions( # we want "hyper"
  84. $!source,@!actions,$!batch,$!degree)
  85. !! Rakudo::Iterator.RaceActions( # we want "race"
  86. $!source,@!actions,$!batch,$!degree)
  87. !! $!source # sadly, nothing to do
  88. }
  89.  
  90. # Make sure we will always run the iterator
  91. method sink() { self.iterator.sink-all }
  92. }
  93.  
  94. # Since we cannot augment a role, specifically the Iterable role, we do
  95. # an augment on Seq. For now, we must therefore do a .Seq on any iterable
  96. # before we can hyper / race it.
  97. augment class Seq {
  98.  
  99. # Cannot call it hyper
  100. method hijper(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) {
  101. ConcurrentChain.new(self.iterator,$batch,$degree,"hyper")
  102. }
  103.  
  104. # Cannot call it race
  105. method rees(Int(Cool) :$batch = 64, Int(Cool) :$degree = 4) {
  106. ConcurrentChain.new(self.iterator,$batch,$degree,"race")
  107. }
  108. }
  109.  
  110. augment class Rakudo::Iterator {
  111.  
  112. my $empty := IterationBuffer.CREATE;
  113.  
  114. # Returns a sort of hyper-iterator that can be called from different
  115. # threads at the same, and which produces Pairs with an ordinal number
  116. # and an IterationBuffer filled with as many elements that could be
  117. # fetched from the given source iterator and the given batch size.
  118. # If the source iterator is exhausted, it will keep returning
  119. # IterationEnd no matter how many times .pull-buffer is called.
  120. method ConcurrentBatcher(\iterator, Int:D $batch) {
  121. class {
  122. has $!iterator; # the source iterator
  123. has $!lock; # making sure one worker runs this code
  124. has int $!ordinal; # the key of the Pair we produce
  125. has int $!batch; # 0 indicates we're exhausted
  126.  
  127. method !SET-SELF(\iterator, \batch) {
  128. $!iterator := iterator;
  129. $!lock := Lock.new;
  130. $!ordinal = -1;
  131. $!batch = batch;
  132. self
  133. }
  134. method new(\it,\ba) { self.CREATE!SET-SELF(it,ba) }
  135.  
  136. method pull-buffer() {
  137. nqp::if(
  138. $!batch,
  139. nqp::stmts( # we're still in business
  140. (my $buffer := IterationBuffer.CREATE),
  141. (my $iterator := $!iterator),
  142. (my int $batch = $!batch),
  143. (my int $found = -1),
  144. $!lock.protect({
  145. nqp::stmts( # we haz the source iterator
  146. nqp::until(
  147. nqp::iseq_i(
  148. ($found = nqp::add_i($found,1)),
  149. $batch
  150. ) || nqp::eqaddr(
  151. (my $pulled := $iterator.pull-one),
  152. IterationEnd
  153. ),
  154. nqp::bindpos($buffer,$found,$pulled)
  155. ),
  156. nqp::if( # source iterator exhausted
  157. nqp::islt_i($found,$batch),
  158. nqp::stmts( # only produce what we found
  159. nqp::splice(
  160. $buffer,
  161. $empty,
  162. $found,
  163. nqp::sub_i($batch,$found)
  164. ),
  165. ($!batch = 0) # tell the world we're done
  166. )
  167. ),
  168. nqp::if(
  169. $found,
  170. Pair.new( # produce a batch
  171. ($!ordinal = nqp::add_i($!ordinal,1)),
  172. $buffer
  173. ),
  174. IterationEnd # we're done here
  175. )
  176. )
  177. })
  178. ),
  179. IterationEnd # we were already done
  180. )
  181. }
  182. }.new(iterator,$batch)
  183. }
  184.  
  185. # This role provides the basis for concurrent processing of an
  186. # iterator. Its .new method expects a source iterator, a list of
  187. # actions to be performed (consisting of method => capture pairs),
  188. # on each value obtained from the source iterator, the maximum size
  189. # of the buffer to be processed inside a single worker, and the
  190. # degree (aka number of workers). Classes must provide at least a
  191. # pull-one and a !TWEAK method (which is expected to return self).
  192. #
  193. # Please note that although all sorts of parallel processing happens
  194. # inside the classes that do this role, the classes are expose
  195. # themselves as ordinary Iterator classes to the world.
  196. role ConcurrentActionator does Iterator {
  197. has $!queue;
  198. has @!promises;
  199. has Int $!alive; # alas, cannot be a native int :-(
  200.  
  201. # We use a concurrent blocking queue (the work horse of a
  202. # Channel, but without any frills here). It is save to
  203. # nqp::push() on it from any thread, and nqp::shift() will
  204. # block until a value becomes available. Since we know the
  205. # number of workers, and each worker will send an IterationEnd
  206. # at the end, by counting the number of IterationEnd's seen,
  207. # we know when we should stop nqp::shift()ing from the queue.
  208. my class Queue is repr('ConcBlockingQueue') { }
  209.  
  210. method !TWEAK { ... }
  211.  
  212. method !SET-SELF(\source, \actions, \batch, \degree) {
  213.  
  214. # Create two lists: one with methods to call, and one
  215. # with captures to apply, so we can easily index into
  216. # what we need to do.
  217. my int $todo = actions.elems;
  218. my $methods := nqp::setelems(nqp::list,$todo);
  219. my $captures := nqp::setelems(nqp::list,$todo);
  220. for actions.kv -> $w, $action {
  221. my $m := $action.key;
  222. nqp::istype($m,Method)
  223. ?? nqp::bindpos($methods,$w,$m)
  224. !! nqp::bindpos($methods,$w,List.^find_method($m.Str));
  225. nqp::bindpos($captures,$w,$action.value);
  226. }
  227.  
  228. # Set up the batcher and the queue.
  229. $!alive = degree;
  230. $!queue := Queue.CREATE;
  231. my $batcher := Rakudo::Iterator.ConcurrentBatcher(source,batch);
  232.  
  233. # Set up the promises of the workers. Not sure we would
  234. # actually ever need that, but maybe we can do something
  235. # with reducing operations in the future, where we would
  236. # need the result. Or we can have the worker just return
  237. # the number of elements processed, or other statistics.
  238. for ^degree -> $w {
  239. @!promises.BIND-POS($w,
  240. start {
  241.  
  242. CATCH { .say } # not sure what to do about exceptions
  243.  
  244. # Ensure we *always* queue an IterationEnd for this
  245. # worker when we're done, normally or exceptionally.
  246. LEAVE nqp::push($!queue,IterationEnd);
  247.  
  248. # A dummy HLL list to be passed as a "self", with
  249. # its $!reified changed for each call.
  250. my $List := List.CREATE;
  251.  
  252. # While we have batches
  253. nqp::until(
  254. nqp::eqaddr(
  255. (my $enroute := $batcher.pull-buffer),
  256. IterationEnd
  257. ),
  258.  
  259. # For all actions to perform
  260. nqp::stmts(
  261. (my int $i = -1),
  262. nqp::while(
  263. nqp::islt_i(($i = nqp::add_i($i,1)),$todo),
  264. nqp::stmts(
  265.  
  266. # Transplant batch into List
  267. nqp::bindattr($List,List,'$!reified',
  268. nqp::getattr($enroute,Pair,'$!value')
  269. ),
  270.  
  271. # Run the iterator on the List for this action
  272. nqp::atpos($methods,$i)(
  273. $List,|nqp::atpos($captures,$i)
  274. ).iterator.push-all(
  275. nqp::bindattr($enroute,Pair,'$!value',
  276. IterationBuffer.CREATE
  277. )
  278. )
  279. )
  280. )
  281. ),
  282.  
  283. # Queue the result of all actions
  284. nqp::push($!queue,$enroute)
  285. )
  286. }
  287. )
  288. }
  289.  
  290. # The instantiated object
  291. self
  292. }
  293.  
  294. method new(\source, \actions, \batch, \degree) {
  295. self.CREATE!SET-SELF(source,actions,batch,degree)!TWEAK
  296. }
  297.  
  298. method sink-all(--> IterationEnd) is raw {
  299.  
  300. # Just eat the queue, we don't care about order in any way
  301. nqp::while(
  302. $!alive,
  303. nqp::stmts(
  304. nqp::until(
  305. nqp::eqaddr(nqp::shift($!queue),IterationEnd),
  306. nqp::null
  307. ),
  308. nqp::unless(
  309. --$!alive,
  310. self!cleanup,
  311. )
  312. )
  313. )
  314. }
  315.  
  316. # Handle the promises, indicate we're done
  317. method !cleanup(--> IterationEnd) {
  318. .result for @!promises
  319. }
  320. }
  321.  
  322. # The "hyper" case of the ConcurrentActionator role.
  323. method HyperActions(\source,\actions,\batch,\degree) {
  324. class :: does ConcurrentActionator {
  325. has $!slipped; # current list of values to produce
  326. has $!processed; # list of processed chunks
  327. has int $!offset; # ordinal number of chunk at index 0
  328.  
  329. method !TWEAK() {
  330. $!slipped := $empty;
  331. $!processed := nqp::list;
  332. self
  333. }
  334.  
  335. method pull-one() is raw {
  336. nqp::if(
  337. nqp::elems($!slipped),
  338. nqp::shift($!slipped), # produce from the chunk
  339. nqp::if(
  340. $!alive,
  341. nqp::stmts(
  342. nqp::if( # no chunk to produce from
  343. nqp::existspos($!processed,0),
  344. nqp::stmts( # next chunk is available
  345. ($!offset = nqp::add_i($!offset,1)),
  346. ($!slipped := nqp::shift($!processed))
  347. ),
  348. nqp::if( # next chunk not there
  349. nqp::eqaddr(
  350. (my $chunk := nqp::shift($!queue)),
  351. IterationEnd
  352. ),
  353. nqp::if( # a worker has expired
  354. --$!alive,
  355. self.pull-one, # others not, try again
  356. self!cleanup, # mohican time, bye bye
  357. ),
  358. nqp::if( # a fresh chunk
  359. nqp::iseq_i(
  360. $!offset,
  361. (my int $ordinal = $chunk.key)
  362. ),
  363. nqp::stmts( # in sequence chunk
  364. ($!offset = nqp::add_i($!offset,1)),
  365. nqp::if( # lose placeholder if any
  366. nqp::elems($!processed),
  367. nqp::shift($!processed)
  368. ),
  369. ($!slipped := $chunk.value)
  370. ),
  371. nqp::stmts( # out of sequence
  372. nqp::bindpos( # store for later usage
  373. $!processed,
  374. nqp::sub_i($ordinal,$!offset),
  375. $chunk.value
  376. ),
  377. )
  378. )
  379. )
  380. ),
  381. self.pull-one # rinse and repeat
  382. ),
  383. IterationEnd
  384. )
  385. )
  386. }
  387. method push-all($target --> IterationEnd) {
  388. nqp::stmts(
  389. nqp::while( # produce from available
  390. nqp::elems($!slipped),
  391. $target.push(nqp::shift($!slipped))
  392. ),
  393. nqp::while( # do the other chunks
  394. $!alive,
  395. nqp::if(
  396. nqp::existspos($!processed,0),
  397. nqp::stmts( # next chunk is available
  398. ($!offset = nqp::add_i($!offset,1)),
  399. (my $slipped := nqp::shift($!processed)),
  400. nqp::while(
  401. nqp::elems($slipped),
  402. $target.push(nqp::shift($slipped))
  403. )
  404. ),
  405. nqp::if( # next chunk not there
  406. nqp::eqaddr(
  407. (my $chunk := nqp::shift($!queue)),
  408. IterationEnd
  409. ),
  410. nqp::unless( # worker expired
  411. --$!alive,
  412. self!cleanup # mohican time, bye bye
  413. ),
  414. nqp::bindpos( # out of sequence
  415. $!processed, # store for later usage
  416. nqp::sub_i($chunk.key,$!offset),
  417. $chunk.value
  418. )
  419. )
  420. )
  421. )
  422. )
  423. }
  424. }.new(source, actions, batch, degree)
  425. }
  426.  
  427. # The "race" case of the ConcurrentActionator role.
  428. method RaceActions(\source,\actions,\batch,\degree) {
  429. class :: does ConcurrentActionator {
  430. has $!slipped;
  431.  
  432. method !TWEAK() {
  433. $!slipped := $empty;
  434. self
  435. }
  436.  
  437. method pull-one() is raw {
  438. nqp::if(
  439. nqp::elems($!slipped),
  440. nqp::shift($!slipped), # produce from available
  441. nqp::if( # no values available
  442. nqp::eqaddr(
  443. (my $chunk := nqp::shift($!queue)),
  444. IterationEnd
  445. ),
  446. nqp::if( # worker exhausted
  447. --$!alive,
  448. self.pull-one, # but still others
  449. self!cleanup, # mohican time, bye bye
  450. ),
  451. nqp::stmts(
  452. ($!slipped := $chunk.value), # could be empty
  453. self.pull-one # so try again
  454. )
  455. )
  456. )
  457. }
  458. method push-all($target --> IterationEnd) {
  459. nqp::stmts(
  460. nqp::while( # produce from available
  461. nqp::elems($!slipped),
  462. $target.push(nqp::shift($!slipped))
  463. ),
  464. nqp::while( # do the other chunks
  465. $!alive,
  466. nqp::until( # still in business
  467. nqp::eqaddr(
  468. (my $chunk := nqp::shift($!queue)),
  469. IterationEnd
  470. ),
  471. nqp::stmts( # we have a chunk
  472. (my $slipped := $chunk.value),
  473. nqp::while( # push the chunk
  474. nqp::elems($slipped),
  475. $target.push(nqp::shift($slipped))
  476. )
  477. )
  478. ),
  479. nqp::unless(
  480. --$!alive,
  481. self!cleanup, # mohican time, bye bye
  482. )
  483. )
  484. )
  485. }
  486. }.new(source, actions, batch, degree)
  487. }
  488. }
  489.  
  490. my @a = ^106;
  491. my $now = now;
  492. dd @a.Seq.hijper(:10batch).map({ sleep rand / 1000; $_++ }).grep({ $_ %% 2 });
  493. say "parallel processed in {now - $now}";
  494. $now = now;
  495. dd @a.map( { sleep rand / 1000; $_++ } ).grep: { $_ %% 2 };
  496. say "serial processed in {now - $now}";
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement