Created
June 26, 2019 21:19
-
-
Save simon-mo/842b73f74e664cb963fb5cf6c2c8062c to your computer and use it in GitHub Desktop.
Parallel Sorting in ray
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "# Discussion 11: How to Implement Parallel Sort?" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "As you may know, many algorithms for sorting a list of length $N$ have running time $O(N \\log N)$.\n", | |
| "\n", | |
| "Most sort implementations that we've learned (e.g., quicksort, merge sort, bubble sort, insertion sort, etc) are fundamentally serial algorithms. This exercise explores the question: **can we use multiple cores to speed up sorting.**\n", | |
| "\n", | |
| "Merge sort recursively splits a list into smaller pieces and sorts those pieces. This recursive sorting step could be parallelized. However, merge sort then has to then merge the results. _The running time of the very last merge step will be $O(N)$_, so it seems unlikely that a parallel merge sort implementation will be faster than $O(N)$.\n", | |
| "\n", | |
| "In this exercise, we present a randomized sorting algorithm that is more amenable to parallelization." | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Strategy\n", | |
| "\n", | |
| "The strategy will be the following.\n", | |
| "\n", | |
| "1. (Optional) Our list will start as a numpy array.\n", | |
| "2. We will split the array into separate contiguous (input) blocks.\n", | |
| "3. We will then rearrange the values into separate continguous (output) blocks.\n", | |
| "4. (Optional) We will then concatenate the contiguous blocks.\n", | |
| "\n", | |
| "##### Example Illustration\n", | |
| "\n", | |
| "\n", | |
| "<img src=\"https://i.imgur.com/qWvOCuu.png\" width=\"800\"/>" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "Note that in many cases of interest (for example, if the list is so large that it can't fit on a single machine), then it would make sense for the array to start out as **a collection of blocks distributed around the cluster and to end as a collection of blocks distributed around the cluster**. Steps 1 and 4 would be unnecessary." | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "## Implementation\n", | |
| "To implement step 3, we will choose a number of output blocks $K$ that our algorithm will output in step 3. We will choose cutoff values $c_1, \\ldots, c_{K-1}$. Then, the first output block will be a sorted block consisting of all values in the original array that are less than $c_1$. The second output block will be a sorted block consisting of all values in the original array that are greater than or equal to $c_1$ and less than $c_2$, and so on. Finally, the $K$th block will be a sorted block consisting of all values in the original array that are greater than or equal to $c_{K-1}$.\n", | |
| "\n", | |
| "We will choose the cutoff values $c_1, \\ldots, c_{K-1}$ by randomly sampling some values from the original array, sorting those sampled values, dividing the sorted values into $K$ contiguous blocks, and choosing cutoffs that separate the blocks from each other.\n", | |
| "\n", | |
| "\n", | |
| "<img src=\"https://i.imgur.com/iHVB4Lx.png\" width=\"800\"/>\n", | |
| "\n", | |
| "\n", | |
| "\n", | |
| "<img src=\"https://i.imgur.com/rQfG1Mn.png\" width=\"800\"/>\n", | |
| "\n", | |
| "\n", | |
| "<img src=\"https://i.imgur.com/QzbWCMF.png\" width=\"800\"/>\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "**EXERCISE:** Code to implement the serial version of this is given below. First try it out and make sure that it runs. The goal of this exercise is to parallelize the implementation and make sure that the parallel implementation is faster than numpy's builtin sorting algorithm." | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 1, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "import numpy as np\n", | |
| "import ray\n", | |
| "import time" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 2, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stderr", | |
| "output_type": "stream", | |
| "text": [ | |
| "2019-04-25 21:52:03,193\tINFO node.py:423 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-04-25_21-52-03_1192/logs.\n", | |
| "2019-04-25 21:52:03,301\tINFO services.py:363 -- Waiting for redis server at 127.0.0.1:13004 to respond...\n", | |
| "2019-04-25 21:52:03,425\tINFO services.py:363 -- Waiting for redis server at 127.0.0.1:13294 to respond...\n", | |
| "2019-04-25 21:52:03,429\tINFO services.py:760 -- Starting Redis shard with 0.43 GB max memory.\n", | |
| "2019-04-25 21:52:03,458\tWARNING services.py:1272 -- WARNING: object_store_memory is not verified when plasma_directory is set.\n", | |
| "2019-04-25 21:52:03,460\tINFO services.py:1384 -- Starting the Plasma object store with 2.0 GB memory using /tmp.\n" | |
| ] | |
| }, | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "{'node_ip_address': None,\n", | |
| " 'redis_address': '10.244.5.91:13004',\n", | |
| " 'object_store_address': '/tmp/ray/session_2019-04-25_21-52-03_1192/sockets/plasma_store',\n", | |
| " 'webui_url': None,\n", | |
| " 'raylet_socket_name': '/tmp/ray/session_2019-04-25_21-52-03_1192/sockets/raylet'}" | |
| ] | |
| }, | |
| "execution_count": 2, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "ray.init(\n", | |
| " num_cpus=8, # We will be using 8 workers\n", | |
| " include_webui=False, \n", | |
| " plasma_directory='/tmp', # The object store is mounted to local file system\n", | |
| " ignore_reinit_error=True,\n", | |
| " object_store_memory=int(2*1e9), \n", | |
| ")" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "We will be working with 4 input blocks, 4 output blocks, and trying to sort 100,000,000 integers" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 3, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "num_input_blocks = 4\n", | |
| "num_output_blocks = 4\n", | |
| "num_samples_for_cutoffs = 100\n", | |
| "array = np.random.randint(0, 256, size =10**8, dtype=np.uint8)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 4, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "def compute_cutoffs(array, num_samples):\n", | |
| " samples = array[np.random.randint(0, len(array), size=num_samples)]\n", | |
| " samples.sort()\n", | |
| " boundary_indices = np.arange(1, num_output_blocks) * (len(samples) // num_output_blocks)\n", | |
| "\n", | |
| " # These are the boundaries between the output blocks. We will assume that each\n", | |
| " # boundary value is contained in the upper block.\n", | |
| " cutoffs = samples[boundary_indices]\n", | |
| " return cutoffs" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "Below we define two functions `partition_input_block` and `compute_output_block`.\n", | |
| "\n", | |
| "1. The function `partition_input_block` will need to be called once per input block. We use the argument `num_return_vals` for the `@ray.remote` decorator in order to return one value per output block.\n", | |
| "\n", | |
| "2. The function `compute_output_block` will need to be called once per output block." | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 5, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "@ray.remote(num_return_vals=num_output_blocks)\n", | |
| "def partition_input_block(input_block, cutoffs):\n", | |
| " # By default, numpy arrays passed to remote functions are read-only so that\n", | |
| " # they can be accessed through shared memory without creating a copy.\n", | |
| " # However, we need to mutate this array, so we will create a copy.\n", | |
| " input_block = input_block.copy()\n", | |
| " input_block.sort()\n", | |
| " partition_indices = input_block.searchsorted(cutoffs)\n", | |
| " return np.split(input_block, partition_indices)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 6, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "@ray.remote\n", | |
| "def compute_output_block(*partitions):\n", | |
| " # There are probably faster ways to merge these sorted partitions together \n", | |
| " # than to concatenate them and sort the result, but this seems to work.\n", | |
| " result = np.concatenate(partitions)\n", | |
| " result.sort()\n", | |
| " return result" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "Below we actually run the parallel sort." | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "#### Step 1: Split input array into blocks" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 7, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "blocks = np.split(array, num_input_blocks)" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 8, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "[array([ 77, 211, 154, ..., 124, 210, 59], dtype=uint8),\n", | |
| " array([ 32, 13, 252, ..., 43, 114, 36], dtype=uint8),\n", | |
| " array([104, 93, 32, ..., 93, 41, 71], dtype=uint8),\n", | |
| " array([117, 185, 9, ..., 124, 185, 138], dtype=uint8)]" | |
| ] | |
| }, | |
| "execution_count": 8, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "blocks" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 9, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "[(25000000,), (25000000,), (25000000,), (25000000,)]" | |
| ] | |
| }, | |
| "execution_count": 9, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "[b.shape for b in blocks]" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "#### Step 2: Compute cutoffs/pivots" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 10, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "array([ 74, 117, 174], dtype=uint8)" | |
| ] | |
| }, | |
| "execution_count": 10, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "cutoffs = compute_cutoffs(array, num_samples_for_cutoffs)\n", | |
| "cutoffs" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "#### Step 3: Use ray to paralleize sorting" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "We first put each input blocks into ray's shared object store, so each worker can access it without making expensive copies." | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 11, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "block_ids = [ray.put(block) for block in blocks]" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "Then we are going parittion each blocks into their corresponding output block" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 12, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "partition_ids = np.empty(shape=(num_input_blocks, num_output_blocks), dtype=object)\n", | |
| "for i in range(num_input_blocks):\n", | |
| " partition_ids[i] = np.array(partition_input_block.remote(block_ids[i], cutoffs))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 13, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "(4, 4)" | |
| ] | |
| }, | |
| "execution_count": 13, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "partition_ids.shape" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 14, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "ObjectID(010000000d99e9de1f20d47e4b398f6d4b6654af)" | |
| ] | |
| }, | |
| "execution_count": 14, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "# This array stores object ids\n", | |
| "partition_ids[0,0]" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "We are now sorting each output blocks. Notice how we are just passing a list of object ids into the remote function" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 15, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "output_block_ids = []\n", | |
| "for j in range(num_output_blocks):\n", | |
| " output_block_ids.append(compute_output_block.remote(*partition_ids[:, j]))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "Lastly, we can get the result from each worker and concatenate it" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 16, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "sorted_result = np.concatenate(ray.get(output_block_ids))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 17, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": [ | |
| "array([ 0, 0, 0, ..., 255, 255, 255], dtype=uint8)" | |
| ] | |
| }, | |
| "execution_count": 17, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ], | |
| "source": [ | |
| "sorted_result" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "#### Speed comparison!" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 19, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "Parallel sort duration: 2.8410232067108154.\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "parallel_start_time = time.time()\n", | |
| "\n", | |
| "cutoffs = compute_cutoffs(array, num_samples_for_cutoffs)\n", | |
| "blocks = np.split(array, num_input_blocks)\n", | |
| "block_ids = [ray.put(block) for block in blocks]\n", | |
| "partition_ids = np.empty(shape=(num_input_blocks, num_output_blocks), dtype=object)\n", | |
| "for i in range(num_input_blocks):\n", | |
| " partition_ids[i] = np.array(partition_input_block.remote(block_ids[i], cutoffs))\n", | |
| "output_block_ids = []\n", | |
| "for j in range(num_output_blocks):\n", | |
| " output_block_ids.append(compute_output_block.remote(*partition_ids[:, j]))\n", | |
| "sorted_result = np.concatenate(ray.get(output_block_ids))\n", | |
| "\n", | |
| "parallel_duration = time.time() - parallel_start_time\n", | |
| "\n", | |
| "print(\"Parallel sort duration: {}.\".format(parallel_duration))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "The box below times numpy's built in sorting algorithm. Our parallel sort should be faster than numpy's built in sort. You do not need to modify anything in the box below." | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 20, | |
| "metadata": {}, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": [ | |
| "Serial sort duration: 5.1032140254974365.\n" | |
| ] | |
| } | |
| ], | |
| "source": [ | |
| "array_copy = array.copy()\n", | |
| "serial_start_time = time.time()\n", | |
| "array_copy.sort()\n", | |
| "serial_duration = time.time() - serial_start_time\n", | |
| "print(\"Serial sort duration: {}.\".format(serial_duration))" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": {}, | |
| "source": [ | |
| "This box checks that the sort was implemented correctly and that the parallel version gave a speedup." | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "execution_count": 21, | |
| "metadata": {}, | |
| "outputs": [], | |
| "source": [ | |
| "# Check work.\n", | |
| "assert parallel_duration < 0.75 * serial_duration\n", | |
| "assert np.all(sorted_result == array_copy)" | |
| ] | |
| } | |
| ], | |
| "metadata": { | |
| "kernelspec": { | |
| "display_name": "Python 3", | |
| "language": "python", | |
| "name": "python3" | |
| }, | |
| "language_info": { | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "file_extension": ".py", | |
| "mimetype": "text/x-python", | |
| "name": "python", | |
| "nbconvert_exporter": "python", | |
| "pygments_lexer": "ipython3", | |
| "version": "3.6.4" | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 2 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Blocks are sorted twice here. First time they are sorted in
partition_input_blockwhich sorts a whole block. Second time they are sorted incompute_output_block. At the same time there is no such thing as "Step 2" where workers distribute unsorted data into blocks between cutoffs, as shown on the picture.