Guest User

Untitled

a guest
Dec 17th, 2018
92
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.70 KB | None | 0 0
  1. import ray
  2. import pandas as pd
  3. import numpy as np
  4.  
  5. ray.init(local_mode=True)
  6.  
  7.  
  8. def ge(a, b, by):
  9. gt = a > b
  10. eq = a == b
  11. for col in by:
  12. if gt[col]:
  13. return True
  14. elif not eq[col]:
  15. return False
  16.  
  17. return True
  18.  
  19.  
  20. @ray.remote
  21. class Partition:
  22. def __init__(self, pid, df, by):
  23. self.pid = pid
  24. self.df = df
  25. self.by = by
  26. self.sorted_df = pd.DataFrame()
  27.  
  28. def sample(self, n):
  29. return self.df.sample(n)
  30.  
  31. def set_cutoffs(self, cutoffs):
  32. self.cutoffs = cutoffs
  33.  
  34. @staticmethod
  35. def __get_partition(row, cutoffs, by):
  36. for part, cutoff in cutoffs:
  37. if ge(cutoff, row, by=by):
  38. return part
  39.  
  40. return None, None
  41.  
  42. def shuffle(self, cutoffs, max_partition):
  43. oids = []
  44. for index, row in self.df.iterrows():
  45. pid, actor = self.__get_partition(row, cutoffs, by=self.by)
  46. if actor is None:
  47. oid = ray.get(max_partition[1].put.remote(row))
  48. oids.append(oid)
  49. else:
  50. oid = ray.get(actor.put.remote(row))
  51. oids.append(oid)
  52. # sync
  53. ray.get(oids)
  54. return 1
  55.  
  56. def put(self, row):
  57. self.sorted_df = self.sorted_df.append(row)
  58. return 1
  59.  
  60. def sort_local(self):
  61. self.sorted_df = self.sorted_df.sort_values(by=self.by)
  62. return 1
  63.  
  64. def get_df(self):
  65. return self.sorted_df
  66.  
  67.  
  68. n = 100
  69. sample_size = 10
  70. m = 4
  71. columns = ['A', 'B', 'C', 'D']
  72. by = ['B', 'C']
  73.  
  74. # generate df and break into m chunks
  75. df = pd.DataFrame(np.random.randn(n, len(columns)), columns=columns)
  76. local_sort_df = df.sort_values(by=by)
  77. partition_size = n/m
  78. chunks = [(i, df[i::m]) for i in range(m)]
  79.  
  80. # create DF partitions (ray Actors)
  81. partitions = [(pid, Partition.remote(pid, c, by)) for (pid, c) in chunks]
  82. samples = ray.get([actor.sample.remote(n=sample_size) for pid, actor in partitions])
  83. sorted_sample = pd.concat(samples).sort_values(by=by)
  84. cutoffs = [sorted_sample.iloc[i*sample_size-1] for i in range(1, m)]
  85. partitions_cutoffs = list(zip(partitions, cutoffs))
  86.  
  87. # send each row to its relevant partition according to cutoffs
  88. t = [actor.shuffle.remote(partitions_cutoffs, partitions[-1]) for pid, actor in partitions]
  89. ray.get(t)
  90.  
  91. # sort each partition DF
  92. t = [actor.sort_local.remote() for pid, actor in partitions]
  93. ray.get(t)
  94.  
  95. # collect and print each DF partition
  96. for pid, actor in partitions:
  97. df = ray.get(actor.get_df.remote())
  98. print("** %d **" % pid)
  99. print(df)
  100. print('----')
  101.  
  102. # collect and validate sort against local df
  103. distributed_sort_df = pd.concat([ray.get(actor.get_df.remote()) for pid, actor in partitions])
  104. assert local_sort_df.equals(distributed_sort_df)
Add Comment
Please, Sign In to add comment