Skip to content

Instantly share code, notes, and snippets.

@simon-mo
Last active April 20, 2018 12:10
Show Gist options
  • Save simon-mo/ea69546fb48edbefd59960bd6ac95756 to your computer and use it in GitHub Desktop.
Save simon-mo/ea69546fb48edbefd59960bd6ac95756 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Discussion on Re-partition\n",
"We need a *good* re-partition method to serve at least two functions:\n",
"1. Homogenous data blocks (shown in this notebook)\n",
"2. Data append/insert/joining will require repartitions. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Homogenous Data Blocks\n",
"\n",
"*Block Manager*: Pandas has internal block manager that keeps columns with same data type in 2D numpy array. This makes computation faster. \n",
"\n",
"Since we are working with columns partition in some sense, we should consider to group the columns by dtypes. The following snippets shows the performance gain when we partition the data by putting same dtype next to each other. \n",
"\n",
"#### Not the block manager; A lazy mask instead\n",
"\n",
"We are not implementing \"distributed\" block manager! What I have in mind is essentially a lazy re-partition strategy. \n",
"\n",
"- A `DataFrame` object has two states, specified by a 'valid bit' (or whatever you would like to call it). Either:\n",
" - The metadatas perfectly match the block partition. Basically `len(_index_metadata) == block_partitions.shape[0]` and `len(_col_metadata) == block_partitions.shape[1]` AND they occur in the same order (`partitions` column is sorted). \n",
" - The metadatas do not match the block partition. It can be the case that they only specify a sub dataframe, or some re-indexing of the dataframe, even [enlargement](https://pandas.pydata.org/pandas-docs/stable/indexing.html#setting-with-enlargement) of the dataframe.\n",
"\n",
"- The `DataFrame` object goes through column re-arrangement or sub-df masking whenever `_get_blk_partitions` gets called. Here's code snippet for (un-optimized) sub-dataframe masking:\n",
"\n",
"```python\n",
"def _mask_block_partitions(blk_partitions, row_metadata, col_metadata):\n",
" row_len, col_len = blk_partitions.shape\n",
" mask_blocks = []\n",
" for i, block_oid in enumerate(blk_partitions.ravel()):\n",
" blk_row_idx, blk_col_idx = i // col_len, i % col_len\n",
" row_idx = row_metadata.get_partition(\n",
" blk_row_idx).index_within_partition\n",
" col_idx = col_metadata.get_partition(\n",
" blk_col_idx).index_within_partition\n",
" mask_blocks.append(_mask_single_block.remote(\n",
" block_oid, row_idx, col_idx))\n",
" return np.array(mask_blocks).reshape(row_len, col_len)\n",
"\n",
"\n",
"@ray.remote\n",
"def _mask_single_block(block_oid, row_idx, col_idx):\n",
" return block_oid.iloc[row_idx, col_idx]\n",
"```\n",
"\n",
"- This setting allows us to modify the original data without creating complicated lineage tree, and help us consolidate columns with the same data type by re-arrange columns. \n",
"- Also now, a `copy` method will make sense; it will be squeezing, re-arranging, or enlarging the block partition to match metadata. \n",
"- Future methods like `join` and `groupby` can use this re-partition scheme to optimize computation. \n",
"\n",
"#### My Question\n",
"- I don't know if this design is good enough for MVP? \n",
"- Do you see any apparent issue? "
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Process STDOUT and STDERR is being redirected to /tmp/raylogs/.\n",
"Waiting for redis server at 127.0.0.1:29096 to respond...\n",
"Waiting for redis server at 127.0.0.1:10425 to respond...\n",
"Starting local scheduler with the following resources: {'CPU': 4, 'GPU': 0}.\n",
"\n",
"======================================================================\n",
"View the web UI at http://localhost:8891/notebooks/ray_ui10495.ipynb?token=53e7e541eca34e76e7c5fc1a943179220c43bae322c89b16\n",
"======================================================================\n",
"\n"
]
}
],
"source": [
"from ray.dataframe import DataFrame\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from ray.dataframe import set_npartition_default"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [],
"source": [
"import ray"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"set_npartition_default(2)"
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [],
"source": [
"df_len = 1e6\n",
"mixed_df = DataFrame({\n",
" 'int1': np.arange(df_len, dtype=np.int),\n",
" 'float1': np.arange(df_len, dtype=np.float),\n",
" 'int2': np.arange(df_len, dtype=np.int),\n",
" 'float2': np.arange(df_len, dtype=np.float),\n",
"}, columns=['int1', 'float1', 'int2', 'float2'])"
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"245 ms ± 50.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)\n"
]
}
],
"source": [
"%%timeit\n",
"_ = mixed_df.max()"
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>0</th>\n",
" <th>1</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>0</td>\n",
" <td>0.0</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" 0 1\n",
"0 0 0.0"
]
},
"execution_count": 36,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ray.get(mixed_df._block_partitions[0][0]).head(1)"
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [],
"source": [
"blocked_df = DataFrame({\n",
" 'int1': np.arange(df_len, dtype=np.int),\n",
" 'int2': np.arange(df_len, dtype=np.int),\n",
" 'float1': np.arange(df_len, dtype=np.float),\n",
" 'float2': np.arange(df_len, dtype=np.float),\n",
"})"
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"190 ms ± 62.4 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)\n"
]
}
],
"source": [
"%%timeit\n",
"_ = blocked_df.max()"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>0</th>\n",
" <th>1</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>0.0</td>\n",
" <td>0.0</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" 0 1\n",
"0 0.0 0.0"
]
},
"execution_count": 39,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ray.get(blocked_df._block_partitions[0][0]).head(1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment