Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import ray
- import pandas as pd
- import numpy as np
- ray.init(local_mode=True)
- def ge(a, b, by):
- gt = a > b
- eq = a == b
- for col in by:
- if gt[col]:
- return True
- elif not eq[col]:
- return False
- return True
- @ray.remote
- class Partition:
- def __init__(self, pid, df, by):
- self.pid = pid
- self.df = df
- self.by = by
- self.sorted_df = pd.DataFrame()
- def sample(self, n):
- return self.df.sample(n)
- def set_cutoffs(self, cutoffs):
- self.cutoffs = cutoffs
- @staticmethod
- def __get_partition(row, cutoffs, by):
- for part, cutoff in cutoffs:
- if ge(cutoff, row, by=by):
- return part
- return None, None
- def shuffle(self, cutoffs, max_partition):
- oids = []
- for index, row in self.df.iterrows():
- pid, actor = self.__get_partition(row, cutoffs, by=self.by)
- if actor is None:
- oid = ray.get(max_partition[1].put.remote(row))
- oids.append(oid)
- else:
- oid = ray.get(actor.put.remote(row))
- oids.append(oid)
- # sync
- ray.get(oids)
- return 1
- def put(self, row):
- self.sorted_df = self.sorted_df.append(row)
- return 1
- def sort_local(self):
- self.sorted_df = self.sorted_df.sort_values(by=self.by)
- return 1
- def get_df(self):
- return self.sorted_df
- n = 100
- sample_size = 10
- m = 4
- columns = ['A', 'B', 'C', 'D']
- by = ['B', 'C']
- # generate df and break into m chunks
- df = pd.DataFrame(np.random.randn(n, len(columns)), columns=columns)
- local_sort_df = df.sort_values(by=by)
- partition_size = n/m
- chunks = [(i, df[i::m]) for i in range(m)]
- # create DF partitions (ray Actors)
- partitions = [(pid, Partition.remote(pid, c, by)) for (pid, c) in chunks]
- samples = ray.get([actor.sample.remote(n=sample_size) for pid, actor in partitions])
- sorted_sample = pd.concat(samples).sort_values(by=by)
- cutoffs = [sorted_sample.iloc[i*sample_size-1] for i in range(1, m)]
- partitions_cutoffs = list(zip(partitions, cutoffs))
- # send each row to its relevant partition according to cutoffs
- t = [actor.shuffle.remote(partitions_cutoffs, partitions[-1]) for pid, actor in partitions]
- ray.get(t)
- # sort each partition DF
- t = [actor.sort_local.remote() for pid, actor in partitions]
- ray.get(t)
- # collect and print each DF partition
- for pid, actor in partitions:
- df = ray.get(actor.get_df.remote())
- print("** %d **" % pid)
- print(df)
- print('----')
- # collect and validate sort against local df
- distributed_sort_df = pd.concat([ray.get(actor.get_df.remote()) for pid, actor in partitions])
- assert local_sort_df.equals(distributed_sort_df)
Add Comment
Please, Sign In to add comment