WeltEnSTurm

Untitled

Feb 2nd, 2018
583
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. module pipes;
  2.  
  3. import
  4.     core.thread,
  5.     std.parallelism,
  6.     std.concurrency,
  7.     std.functional,
  8.     std.typecons,
  9.     std.traits;
  10.  
  11.  
  12. synchronized class Queue(T) {
  13.     private T[] queue;
  14.  
  15.     void opOpAssign(string op)(T object) if(op == "~") {
  16.         queue ~= object;
  17.     }
  18.  
  19.     auto length(){
  20.         return queue.length;
  21.     }
  22.  
  23.     Nullable!T pop(){
  24.         Nullable!T result;
  25.         if(!queue.length){
  26.             result.nullify;
  27.             return result;
  28.         }
  29.         result = queue[0];
  30.         queue = queue[1..$];
  31.         return result;
  32.     }
  33. }
  34.  
  35.  
  36. synchronized class Flag {
  37.     private bool flag;
  38.     void set(bool flag){
  39.         this.flag = flag;
  40.     }
  41.     bool get(){
  42.         return flag;
  43.     }
  44. }
  45.  
  46.  
  47. auto pipe(Iterable, Fn)(Iterable iterable, Fn callback) if(isIterable!Iterable) {
  48.     shared class RootAction {
  49.         Queue!(Tuple!(ForeachType!Iterable)) results;
  50.         this(Iterable iterable){
  51.             results = new shared Queue!(Tuple!(ForeachType!Iterable));
  52.             foreach(e; iterable){
  53.                 results ~= tuple(e);
  54.             }
  55.         }
  56.         auto previous(){
  57.             return null;
  58.         }
  59.         bool has(){
  60.             return results.length > 0;
  61.         }
  62.         void call(){}
  63.     }
  64.     return pipe(new shared RootAction(iterable), callback);
  65. }
  66.  
  67. auto pipe(I, O)(I fnIn, O fnOut) if(!isIterable!I) {
  68.     shared class PipeAction {
  69.         Flag working;
  70.         O cb;
  71.         Queue!(ReturnType!O) results;
  72.         this(O cb){
  73.             this.cb = cb;
  74.             working = new Flag;
  75.             results = new shared Queue!(ReturnType!O);
  76.         }
  77.         auto previous(){
  78.             return fnIn;
  79.         }
  80.         bool has(){
  81.             return working.get || fnIn.results.length > 0 || fnIn.has;
  82.         }
  83.         void call(){
  84.             working.set(true);
  85.             auto element = fnIn.results.pop;
  86.             if(!element.isNull){
  87.                 results ~= cb(element.expand);
  88.             }
  89.             working.set(false);
  90.             fnIn.call;
  91.         }
  92.     }
  93.     return new shared PipeAction(fnOut);
  94. }
  95.  
  96. auto run(I)(I chain){
  97.     while(chain.has){
  98.         chain.call;
  99.     }
  100. }
  101.  
  102. auto parallel(I)(I action, int workerCount=0){
  103.     if(workerCount == 0){
  104.         workerCount = totalCPUs*2-1;
  105.     }
  106.     class Parallel {
  107.         Thread[] workers;
  108.         void join(){
  109.             foreach(w; workers)
  110.                 w.join;
  111.         }
  112.     }
  113.     auto p = new Parallel;
  114.     foreach(_; 0..workerCount){
  115.         p.workers ~= new Thread({ run(action); }).start();
  116.     }
  117.     return p;
  118. }
RAW Paste Data

Adblocker detected! Please consider disabling it...

We've detected AdBlock Plus or some other adblocking software preventing Pastebin.com from fully loading.

We don't have any obnoxious sound, or popup ads, we actively block these annoying types of ads!

Please add Pastebin.com to your ad blocker whitelist or disable your adblocking software.

×