• API
• FAQ
• Tools
• Archive
daily pastebin goal
18%
SHARE
TWEET

Untitled

a guest Dec 17th, 2018 65 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
1. import ray
2. import pandas as pd
3. import numpy as np
4.
5. ray.init(local_mode=True)
6.
7.
8. def ge(a, b, by):
9.     gt = a > b
10.     eq = a == b
11.     for col in by:
12.         if gt[col]:
13.             return True
14.         elif not eq[col]:
15.             return False
16.
17.     return True
18.
19.
20. @ray.remote
21. class Partition:
22.     def __init__(self, pid, df, by):
23.         self.pid = pid
24.         self.df = df
25.         self.by = by
26.         self.sorted_df = pd.DataFrame()
27.
28.     def sample(self, n):
29.         return self.df.sample(n)
30.
31.     def set_cutoffs(self, cutoffs):
32.         self.cutoffs = cutoffs
33.
34.     @staticmethod
35.     def __get_partition(row, cutoffs, by):
36.         for part, cutoff in cutoffs:
37.             if ge(cutoff, row, by=by):
38.                 return part
39.
40.         return None, None
41.
42.     def shuffle(self, cutoffs, max_partition):
43.         oids = []
44.         for index, row in self.df.iterrows():
45.             pid, actor = self.__get_partition(row, cutoffs, by=self.by)
46.             if actor is None:
47.                 oid = ray.get(max_partition[1].put.remote(row))
48.                 oids.append(oid)
49.             else:
50.                 oid = ray.get(actor.put.remote(row))
51.                 oids.append(oid)
52.         # sync
53.         ray.get(oids)
54.         return 1
55.
56.     def put(self, row):
57.         self.sorted_df = self.sorted_df.append(row)
58.         return 1
59.
60.     def sort_local(self):
61.         self.sorted_df = self.sorted_df.sort_values(by=self.by)
62.         return 1
63.
64.     def get_df(self):
65.         return self.sorted_df
66.
67.
68. n = 100
69. sample_size = 10
70. m = 4
71. columns = ['A', 'B', 'C', 'D']
72. by = ['B', 'C']
73.
74. # generate df and break into m chunks
75. df = pd.DataFrame(np.random.randn(n, len(columns)), columns=columns)
76. local_sort_df = df.sort_values(by=by)
77. partition_size = n/m
78. chunks = [(i, df[i::m]) for i in range(m)]
79.
80. # create DF partitions (ray Actors)
81. partitions = [(pid, Partition.remote(pid, c, by)) for (pid, c) in chunks]
82. samples = ray.get([actor.sample.remote(n=sample_size) for pid, actor in partitions])
83. sorted_sample = pd.concat(samples).sort_values(by=by)
84. cutoffs = [sorted_sample.iloc[i*sample_size-1] for i in range(1, m)]
85. partitions_cutoffs = list(zip(partitions, cutoffs))
86.
87. # send each row to its relevant partition according to cutoffs
88. t = [actor.shuffle.remote(partitions_cutoffs, partitions[-1]) for pid, actor in partitions]
89. ray.get(t)
90.
91. # sort each partition DF
92. t = [actor.sort_local.remote() for pid, actor in partitions]
93. ray.get(t)
94.
95. # collect and print each DF partition
96. for pid, actor in partitions:
97.     df = ray.get(actor.get_df.remote())
98.     print("** %d **" % pid)
99.     print(df)
100.     print('----')
101.
102. # collect and validate sort against local df
103. distributed_sort_df = pd.concat([ray.get(actor.get_df.remote()) for pid, actor in partitions])
104. assert local_sort_df.equals(distributed_sort_df)
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy.

Top