Last active
August 29, 2015 14:05
-
-
Save adgaudio/a2318f42288a1ff5df14 to your computer and use it in GitHub Desktop.
Distributed Percentile and Distributed Median - a proof of concept and example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This example demonstrates a distributed algorithm to identify the | |
percentile of a distributed data set. | |
Because this is a toy implementation, the data isn't actually | |
distributed across multiple machines. | |
""" | |
import numpy as np | |
def make_distributed_sample_data(start=0, stop=100, step=1): | |
import time | |
np.random.seed(int(time.time())) | |
_arr = np.arange(start=start, stop=stop, step=step) | |
np.random.shuffle(_arr) | |
arrs = np.reshape(_arr, (np.sqrt(stop * 1/step), )*2) | |
# arrs = np.array([[85, 24, 97, 37, 7, 43, 93, 17, 79, 40], | |
# [66, 73, 35, 55, 11, 82, 6, 72, 77, 47], | |
# [50, 75, 57, 87, 71, 96, 64, 0, 21, 36], | |
# [61, 34, 30, 12, 74, 80, 8, 9, 1, 25], | |
# [48, 62, 59, 10, 98, 52, 78, 31, 83, 3], | |
# [68, 67, 88, 19, 18, 81, 89, 69, 54, 90], | |
# [65, 53, 56, 84, 46, 28, 92, 15, 44, 20], | |
# [16, 29, 39, 86, 13, 38, 32, 27, 22, 33], | |
# [49, 76, 5, 63, 45, 95, 23, 4, 42, 41], | |
# [70, 2, 51, 26, 99, 94, 58, 91, 14, 60]]) | |
# np.median(arrs) # == 49.5 because we don't include "100" as a value | |
# np.percentile(np.concatenate(arrs), [0, 25, 50, 75, 100]) | |
# [0.0, 24.75, 49.5, 74.25, 99.0] | |
return arrs | |
def distributed_percentile(arrs, percentile): | |
""" | |
This example demonstrates a distributed algorithm to identify the | |
percentile of a data set. | |
arrs - a "distributed" list of arrays. Because this is a poc example, | |
"distributed" in this case just means in memory | |
percentile - number in range [0, 100] | |
""" | |
assert percentile > 0 and percentile < 100, "Use min() or max()" | |
# | |
# get total number of elements, n | |
n = sum(map(len, arrs)) | |
# | |
# get the index of the closest middle element, k | |
k = int(n * (100 - percentile) / 100) | |
# DEBUG logic | |
# print "n=%s k=%s" % (n, k) | |
# debug_counter = 0 # helps not hang the program if you change this code | |
while True: | |
# DEBUG logic | |
# debug_counter += 1 | |
# if debug_counter > np.prod(arrs.shape): | |
# raise Exception('hung') | |
# print np.concatenate(arrs) | |
# | |
# select a random element, pivot, from array and "broadcast" it to each | |
# distributed dataset. | |
# since this example doesn't actually use distributed nodes, | |
# just think of this pivot array as distributed | |
pivot = np.random.choice(np.concatenate(arrs)) | |
# | |
# on each node, find m, the number of nodes > pivot | |
m = np.ones(len(arrs)) | |
for ith_node, node_data in enumerate(arrs): | |
m[ith_node] = np.count_nonzero(node_data > pivot) | |
# | |
# find the total num nodes > pivot and call it m.sum() | |
# does m.sum() represent less than half of our data? | |
# if so, then on each node we discard the elements > pivot | |
# and reset k to accomodate the discarded items | |
# ie "if less than half of the elements are > pivot, discard the left | |
# side of the sorted arrs array and reset the index, k, to the correct | |
# location of the median" | |
if m.sum() < k: | |
arrs = discard_elements( | |
arrs, pivot, less_than_pivot=False) | |
k -= m.sum() | |
# if not, then on each node we discard the elements < pivot | |
# and reset k to accomodate the discarded items | |
# ie "if more than half of the elements are > pivot, discard the left | |
# side of the sorted arrs array. we don't need to reset the index, k" | |
elif m.sum() > k: | |
arrs = discard_elements(arrs, pivot, less_than_pivot=True) | |
else: # we found the median! m.sum() == k | |
# there are equal num values > and less than the pivot | |
break | |
return pivot | |
def discard_elements(arrs, pivot, less_than_pivot=True): | |
arrs2 = [] | |
for ith_node in np.arange(len(arrs)): | |
data = arrs[ith_node] | |
if less_than_pivot: | |
discards = data < pivot | |
else: | |
discards = data > pivot | |
keep = np.logical_not(discards) | |
arrs2.append(data[keep]) | |
return arrs2 | |
def distributed_median(arrs, **kwargs): | |
return distributed_percentile(arrs, 50, **kwargs) | |
def test_distributed_median(): | |
arrs = make_distributed_sample_data() | |
np.testing.assert_equal(distributed_median(arrs), 49) | |
def test_distributed_percentile(): | |
arrs = make_distributed_sample_data() | |
np.testing.assert_equal( | |
distributed_percentile(arrs, 25), int(np.percentile(arrs, 25))) | |
np.testing.assert_equal( | |
distributed_percentile(arrs, 75), int(np.percentile(arrs, 75))) | |
arrs = make_distributed_sample_data(0, 25, .25) | |
np.testing.assert_equal( | |
distributed_percentile(arrs, 50), 12.25) | |
np.testing.assert_equal( | |
distributed_percentile(arrs, 20), 4.75) | |
if __name__ == '__main__': | |
arrs = make_distributed_sample_data() | |
print "50th percentile: %s" % distributed_percentile(arrs, 50) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment