daily pastebin goal
18%
SHARE
TWEET

Untitled

a guest Dec 17th, 2018 65 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import ray
  2. import pandas as pd
  3. import numpy as np
  4.  
  5. ray.init(local_mode=True)
  6.  
  7.  
  8. def ge(a, b, by):
  9.     gt = a > b
  10.     eq = a == b
  11.     for col in by:
  12.         if gt[col]:
  13.             return True
  14.         elif not eq[col]:
  15.             return False
  16.  
  17.     return True
  18.  
  19.  
  20. @ray.remote
  21. class Partition:
  22.     def __init__(self, pid, df, by):
  23.         self.pid = pid
  24.         self.df = df
  25.         self.by = by
  26.         self.sorted_df = pd.DataFrame()
  27.  
  28.     def sample(self, n):
  29.         return self.df.sample(n)
  30.  
  31.     def set_cutoffs(self, cutoffs):
  32.         self.cutoffs = cutoffs
  33.  
  34.     @staticmethod
  35.     def __get_partition(row, cutoffs, by):
  36.         for part, cutoff in cutoffs:
  37.             if ge(cutoff, row, by=by):
  38.                 return part
  39.  
  40.         return None, None
  41.  
  42.     def shuffle(self, cutoffs, max_partition):
  43.         oids = []
  44.         for index, row in self.df.iterrows():
  45.             pid, actor = self.__get_partition(row, cutoffs, by=self.by)
  46.             if actor is None:
  47.                 oid = ray.get(max_partition[1].put.remote(row))
  48.                 oids.append(oid)
  49.             else:
  50.                 oid = ray.get(actor.put.remote(row))
  51.                 oids.append(oid)
  52.         # sync
  53.         ray.get(oids)
  54.         return 1
  55.  
  56.     def put(self, row):
  57.         self.sorted_df = self.sorted_df.append(row)
  58.         return 1
  59.  
  60.     def sort_local(self):
  61.         self.sorted_df = self.sorted_df.sort_values(by=self.by)
  62.         return 1
  63.  
  64.     def get_df(self):
  65.         return self.sorted_df
  66.  
  67.  
  68. n = 100
  69. sample_size = 10
  70. m = 4
  71. columns = ['A', 'B', 'C', 'D']
  72. by = ['B', 'C']
  73.  
  74. # generate df and break into m chunks
  75. df = pd.DataFrame(np.random.randn(n, len(columns)), columns=columns)
  76. local_sort_df = df.sort_values(by=by)
  77. partition_size = n/m
  78. chunks = [(i, df[i::m]) for i in range(m)]
  79.  
  80. # create DF partitions (ray Actors)
  81. partitions = [(pid, Partition.remote(pid, c, by)) for (pid, c) in chunks]
  82. samples = ray.get([actor.sample.remote(n=sample_size) for pid, actor in partitions])
  83. sorted_sample = pd.concat(samples).sort_values(by=by)
  84. cutoffs = [sorted_sample.iloc[i*sample_size-1] for i in range(1, m)]
  85. partitions_cutoffs = list(zip(partitions, cutoffs))
  86.  
  87. # send each row to its relevant partition according to cutoffs
  88. t = [actor.shuffle.remote(partitions_cutoffs, partitions[-1]) for pid, actor in partitions]
  89. ray.get(t)
  90.  
  91. # sort each partition DF
  92. t = [actor.sort_local.remote() for pid, actor in partitions]
  93. ray.get(t)
  94.  
  95. # collect and print each DF partition
  96. for pid, actor in partitions:
  97.     df = ray.get(actor.get_df.remote())
  98.     print("** %d **" % pid)
  99.     print(df)
  100.     print('----')
  101.    
  102. # collect and validate sort against local df
  103. distributed_sort_df = pd.concat([ray.get(actor.get_df.remote()) for pid, actor in partitions])
  104. assert local_sort_df.equals(distributed_sort_df)
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top