Last active
December 17, 2018 13:29
-
-
Save eavidan/d5acdebd9743ae6dba3387131103dd5a to your computer and use it in GitHub Desktop.
an implementation of distributed bucket sort for pandas using Ray
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ray | |
import pandas as pd | |
import numpy as np | |
ray.init(local_mode=True) | |
def ge(a, b, by): | |
gt = a > b | |
eq = a == b | |
for col in by: | |
if gt[col]: | |
return True | |
elif not eq[col]: | |
return False | |
return True | |
@ray.remote | |
class Partition: | |
def __init__(self, pid, df): | |
self.pid = pid | |
self.df = df | |
self.sorted_df = pd.DataFrame() | |
def sample(self, n): | |
return self.df.sample(n) | |
@staticmethod | |
def _get_partition(row, cutoffs, by): | |
for part, cutoff in cutoffs: | |
if ge(cutoff, row, by=by): | |
return part | |
return None, None | |
def shuffle(self, cutoffs, max_partition, by): | |
oids = [] | |
for index, row in self.df.iterrows(): | |
pid, actor = self._get_partition(row, cutoffs, by=by) | |
if actor is None: | |
oid = max_partition[1].put.remote(row) | |
oids.append(oid) | |
else: | |
oid = actor.put.remote(row) | |
oids.append(oid) | |
# sync | |
ray.get(oids) | |
return 1 | |
def put(self, row): | |
self.sorted_df = self.sorted_df.append(row) | |
return 1 | |
def sort_local(self, by): | |
self.sorted_df = self.sorted_df.sort_values(by=by) | |
return 1 | |
def get_sorted_df(self): | |
return self.sorted_df | |
def get_sorted_ix(self): | |
return self.sorted_df.index | |
n = 100 | |
sample_size = 10 | |
m = 4 | |
columns = ['A', 'B', 'C', 'D'] | |
by = ['B', 'C'] | |
# generate df and break into m chunks | |
df = pd.DataFrame(np.random.randn(n, len(columns)), columns=columns) | |
partition_size = n/m | |
chunks = [(i, df[i::m]) for i in range(m)] | |
# create DF partitions (ray Actors) | |
partitions = [(pid, Partition.remote(pid, c)) for (pid, c) in chunks] | |
samples = ray.get([actor.sample.remote(n=sample_size) for pid, actor in partitions]) | |
sorted_sample = pd.concat(samples).sort_values(by=by) | |
cutoffs = [sorted_sample.iloc[i*sample_size-1] for i in range(1, m)] | |
partitions_cutoffs = list(zip(partitions, cutoffs)) | |
# shuffle - send each row to its relevant partition according to cutoffs | |
t = [actor.shuffle.remote(partitions_cutoffs, partitions[-1], by=by) for pid, actor in partitions] | |
ray.get(t) | |
# local sort - sort each partition DF | |
t = [actor.sort_local.remote(by=by) for pid, actor in partitions] | |
ray.get(t) | |
# collect sorted indexes from each partition and repartition original DF | |
new_index = [] | |
for pid, actor in partitions: | |
new_index.extend(ray.get(actor.get_sorted_ix.remote())) | |
distributed_sort_df = df.reindex(index=new_index) | |
# validate sort against local df | |
local_sort_df = df.sort_values(by=by) | |
assert local_sort_df.equals(distributed_sort_df) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment