Skip to content

Instantly share code, notes, and snippets.

@huylenq
Last active June 11, 2018 03:10
Show Gist options
  • Select an option

  • Save huylenq/d43f5e8e32b389a96d513bc64d505fde to your computer and use it in GitHub Desktop.

Select an option

Save huylenq/d43f5e8e32b389a96d513bc64d505fde to your computer and use it in GitHub Desktop.
embulk_s3_input_tsv_perf
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
}
},
"cell_type": "markdown",
"source": "# Embulk S3 Input for TSV Performance"
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
}
},
"cell_type": "markdown",
"source": "Automatically generate TSV on variable number of rows X columns and upload to S3, then execute the corresponding embulk-input-s3 task both locally with embulk and remotely on BulkLoader via td-api\n\nRequirements: `aws` CLI installed (and authenticated through \"~/.aws/credentials\"), `python3.6`, some Python packages (`python-dotenv` and `PyYAML`). And assume you have a embulk s3 input config.yml on the same diretory with the notebook-dir."
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
},
"trusted": true
},
"cell_type": "code",
"source": "import os\nfrom dotenv import load_dotenv\nload_dotenv(dotenv_path='.env')\n\n# Configurations\nS3_PATH_PREFIX = os.getenv('S3_PATH_PREFIX')\nTD_API_KEY = os.getenv('TD_API_KEY')\nTD_API_ENDPOINT = os.getenv('TD_API_ENDPOINT')\n\n# Miscs.\nimport time\ndef format_duration(secs):\n return time.strftime('%H:%M:%S', time.gmtime(secs))\n\ndef adhoc(job_id):\n \"\"\"Artificial result to replace the cell's widget output to support static rendering\n \"\"\"\n print(f'Started job {job_id}')\n print(f'Job {job_id} done in {job_duration(job_id)} seconds')",
"execution_count": 150,
"outputs": []
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
}
},
"cell_type": "markdown",
"source": "## TSV Generation"
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
},
"trusted": true
},
"cell_type": "code",
"source": "import csv\nimport string\nfrom itertools import cycle, islice\n\ndef generate(file_path, col=10, row=100, delimiter='\\t'):\n headers = islice(cycle(list(string.ascii_uppercase)), col)\n row_data = range(0, col)\n with open(file_path, 'w') as f:\n wr = csv.writer(f, delimiter='\\t')\n wr.writerow(headers)\n for _ in range(row):\n wr.writerow(row_data)\n\ndef upload(file_path):\n !aws s3 cp {file_path} s3://{S3_PATH_PREFIX}/{file_path} >/dev/null 2>&1\n\ndef sample_out_columns():\n !head -1 out/000.00.csv | sed 's/[^,]//g' | wc -c",
"execution_count": 193,
"outputs": []
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
}
},
"cell_type": "markdown",
"source": "## Config Update"
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
},
"trusted": true
},
"cell_type": "code",
"source": "import yaml\nimport collections\nfrom shutil import copyfile\n\n# Reserve yaml entries order\n_mapping_tag = yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG\ndef dict_representer(dumper, data):\n return dumper.represent_dict(data.items())\ndef dict_constructor(loader, node):\n return collections.OrderedDict(loader.construct_pairs(node))\nyaml.add_representer(collections.OrderedDict, dict_representer)\nyaml.add_constructor(_mapping_tag, dict_constructor)\n\ndef update_config(row, col):\n with open('config.yml', 'r') as f:\n copyfile('config.yml', 'config.bak.yml')\n config = yaml.load(f)\n config['in']['path_prefix'] = f'cs_2867/large_{row}_{col}.csv'\n with open('config.yml', 'w') as f:\n yaml.dump(config, f)\n\ndef read_config():\n with open('config.yml', 'r') as f:\n config = yaml.load(f)\n return config",
"execution_count": 15,
"outputs": []
},
{
"metadata": {
"run_control": {
"read_only": false,
"frozen": false
}
},
"cell_type": "markdown",
"source": "## Remote Job Handling"
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
},
"trusted": true
},
"cell_type": "code",
"source": "from tdclient import api, errors\n\n_td = None\ndef td(ensure=False):\n def connect():\n global _td\n _td = api.API(apikey=TD_API_KEY, endpoint=TD_API_ENDPOINT)\n if not _td:\n connect()\n if ensure:\n try:\n with _td.get('/') as res:\n if res.status // 100 != 2:\n connect()\n except:\n connect()\n return _td",
"execution_count": 104,
"outputs": []
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
},
"trusted": true
},
"cell_type": "code",
"source": "def establish_connection(name, db='huy', table='cs_2867'):\n \"\"\"Establish the remote connetion on development server based on the local config file\n \"\"\"\n local_config = read_config()\n guessed_config = td().connector_guess({'config': local_config})['config']\n guessed_config['out'] = {'type': 'td_internal'}\n try:\n td().connector_delete(name)\n except errors.NotFoundError:\n pass\n return td().connector_create(name, 'huy', 'cs_2867', {'config': guessed_config})",
"execution_count": 99,
"outputs": []
},
{
"metadata": {
"trusted": true,
"run_control": {
"read_only": false,
"frozen": false
}
},
"cell_type": "code",
"source": "import threading\nimport time\nimport json\nimport ipywidgets as widgets\n\ndef job_logs(job_id, source='output'):\n with td().get(f\"/v4/jobs/{job_id}/log?source={source}\") as res:\n print(str(res.read()).replace('\\\\n', '\\n').replace('\\\\t', '\\t'))\n\ndef job_duration(job_id):\n job = td().show_job(job_id)\n return (job['end_at'] - job['created_at']).seconds\n\n\nclass JobChecker(threading.Thread):\n def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):\n super().__init__(group=group, target=target, name=name, daemon=daemon)\n self.job_id = None\n self.should_stop = False\n self.output = None\n def run(self):\n while True:\n if self.should_stop or td(ensure=True).job_status(self.job_id) == 'success':\n with self.output:\n print(f'Job {self.job_id} done in {job_duration(self.job_id)} seconds')\n return\n time.sleep(10)",
"execution_count": 176,
"outputs": []
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
}
},
"cell_type": "markdown",
"source": "## Execution"
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
},
"trusted": true
},
"cell_type": "code",
"source": "from time\n\ndef run(rows, cols, delimiter='\\t', file_exists=False):\n if not file_exists:\n fname = f'large_{rows}_{cols}.csv'\n generate(fname, rows, cols, delimiter)\n upload(fname)\n update_config(rows, cols)\n !embulk guess config.yml -o config.yml >/dev/null 2>&1\n start = time.time()\n !embulk run config.yml >/dev/null 2>&1\n return f\"Import completed in {time() - start:.4f} secs\"",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
},
"trusted": true
},
"cell_type": "code",
"source": "def run_remotely(rows, cols, delimiter='\\t', file_exists=False):\n if not file_exists:\n fname = f'large_{rows}_{cols}.csv'\n generate(fname, rows, cols, delimiter)\n upload(fname)\n update_config(rows, cols) # establish_connection relies on local config.yml\n connection_name = f'huy_cs2867_{rows}_{cols}'\n establish_connection(connection_name)\n job = td().connector_run(connection_name)\n job_checker = JobChecker(name=f\"job_checker_{job['job_id']}\")\n job_checker.job_id = job['job_id']\n job_checker.output= widgets.Output()\n with job_checker.output:\n print(f\"Started job {job['job_id']}\")\n job_checker.start()\n return job_checker.output",
"execution_count": 191,
"outputs": []
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
}
},
"cell_type": "markdown",
"source": "## Sample Runs"
},
{
"metadata": {
"run_control": {
"read_only": false,
"frozen": false
}
},
"cell_type": "markdown",
"source": "### Local"
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
}
},
"cell_type": "markdown",
"source": "#### 100,000 rows with 100 -> 500 -> 1000 columns"
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"scrolled": false,
"trusted": true
},
"cell_type": "code",
"source": "run(rows=100_000, cols=100)",
"execution_count": 153,
"outputs": [
{
"data": {
"text/plain": "'Import completed in 32.21 secs'"
},
"execution_count": 153,
"metadata": {},
"output_type": "execute_result"
}
]
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"scrolled": true,
"trusted": true
},
"cell_type": "code",
"source": "run(rows=100_000, cols=500)",
"execution_count": 154,
"outputs": [
{
"data": {
"text/plain": "'Import completed in 133.3 secs'"
},
"execution_count": 154,
"metadata": {},
"output_type": "execute_result"
}
]
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"scrolled": true,
"trusted": true
},
"cell_type": "code",
"source": "run(rows=100_000, cols=1_000)",
"execution_count": 155,
"outputs": [
{
"data": {
"text/plain": "'Import completed in 276.2 secs'"
},
"execution_count": 155,
"metadata": {},
"output_type": "execute_result"
}
]
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
}
},
"cell_type": "markdown",
"source": "#### 1,000,000 rows with 100 -> 500 -> 1000 -> 2000 columns"
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"trusted": true
},
"cell_type": "code",
"source": "run(rows=1_000_000, cols=100)",
"execution_count": 156,
"outputs": [
{
"data": {
"text/plain": "'Import completed in 311.6 secs'"
},
"execution_count": 156,
"metadata": {},
"output_type": "execute_result"
}
]
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"trusted": true
},
"cell_type": "code",
"source": "run(rows=1_000_000, cols=500)",
"execution_count": 165,
"outputs": [
{
"data": {
"text/plain": "'Import completed in 1568.0 secs'"
},
"execution_count": 165,
"metadata": {},
"output_type": "execute_result"
}
]
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"scrolled": false,
"trusted": true
},
"cell_type": "code",
"source": "run(rows=1_000_000, cols=1_000)",
"execution_count": 169,
"outputs": [
{
"data": {
"text/plain": "'Import completed in 3005.0 secs'"
},
"execution_count": 169,
"metadata": {},
"output_type": "execute_result"
}
]
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"scrolled": true,
"trusted": true
},
"cell_type": "code",
"source": "run(rows=1_000_000, cols=2_000)",
"execution_count": 6,
"outputs": [
{
"data": {
"text/plain": "'Import completed in 4814.9043 secs'"
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
]
},
{
"metadata": {
"run_control": {
"read_only": false,
"frozen": false
}
},
"cell_type": "markdown",
"source": "### Remote"
},
{
"metadata": {
"run_control": {
"read_only": false,
"frozen": false
}
},
"cell_type": "markdown",
"source": "The below results may not be rendered on some static notebook rendering tool due to the use of widget (necessary for thread-bound output)"
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
}
},
"cell_type": "markdown",
"source": "#### 100,000 rows with 100 -> 500 -> 1000 columns"
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"scrolled": false,
"trusted": true
},
"cell_type": "code",
"source": "run_remotely(rows=100_000, cols=100, file_exists=True)",
"execution_count": 145,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "Started job 7458179\nJob 7458179 done in 62 seconds\n"
}
]
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"scrolled": false,
"trusted": true
},
"cell_type": "code",
"source": "run_remotely(rows=100_000, cols=500, file_exists=True)",
"execution_count": 146,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "Started job 7458190\nJob 7458190 done in 115 seconds\n"
}
]
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"scrolled": false,
"trusted": true
},
"cell_type": "code",
"source": "run_remotely(rows=100_000, cols=1000, file_exists=True)",
"execution_count": 147,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "Started job 7458196\nJob 7458196 done in 196 seconds\n"
}
]
},
{
"metadata": {
"run_control": {
"frozen": false,
"read_only": false
}
},
"cell_type": "markdown",
"source": "#### 1,000,000 rows with 100 -> 500 -> 1000 -> 2000 columns"
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"scrolled": true,
"trusted": true
},
"cell_type": "code",
"source": "run_remotely(rows=1_000_000, cols=100, file_exists=True)",
"execution_count": 144,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "Started job 7460936\nJob 7460936 done in 168 seconds\n"
}
]
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"scrolled": true,
"trusted": true
},
"cell_type": "code",
"source": "run_remotely(rows=1_000_000, cols=500, file_exists=True)",
"execution_count": 143,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "Started job 7460912\nJob 7460912 done in 717 seconds\n"
}
]
},
{
"metadata": {
"run_control": {
"read_only": false,
"frozen": false
}
},
"cell_type": "markdown",
"source": "Aha! Reproduced."
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"scrolled": true,
"trusted": true
},
"cell_type": "code",
"source": "run_remotely(rows=1_000_000, cols=1000, file_exists=True)",
"execution_count": 137,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "Started job 7458260\nJob 7458260 done in 11277 seconds\n"
}
]
},
{
"metadata": {
"run_control": {
"read_only": false,
"frozen": false
}
},
"cell_type": "markdown",
"source": "Also a bump at 2k cols, but now growing so much far away from 1k, is there a threshold at some point that trigger this performance degration?"
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"scrolled": true,
"trusted": true
},
"cell_type": "code",
"source": "run_remotely(rows=1_000_000, cols=2000, file_exists=True)",
"execution_count": 142,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "Started job 7458264\nJob 7458264 done in 12442 seconds\n"
}
]
},
{
"metadata": {
"run_control": {
"read_only": false,
"frozen": false
}
},
"cell_type": "markdown",
"source": "Let's see if CSV having the same issue"
},
{
"metadata": {
"run_control": {
"frozen": true,
"read_only": true
},
"scrolled": true,
"trusted": true
},
"cell_type": "code",
"source": "run_remotely(rows=1_000_000, cols=1000, delimiter=',')",
"execution_count": 199,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "Started job 7465732\nJob 7465732 done in 1263 seconds\n"
}
]
},
{
"metadata": {
"trusted": true,
"run_control": {
"read_only": true,
"frozen": true
},
"scrolled": true
},
"cell_type": "code",
"source": "run_remotely(rows=1_000_000, cols=2000, delimiter=',')",
"execution_count": 202,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "Started job 7466811\nJob 7466811 done in 2118 seconds\n"
}
]
},
{
"metadata": {
"run_control": {
"read_only": false,
"frozen": false
}
},
"cell_type": "markdown",
"source": "Nope, it' not. So the problem is really with TSV (but only when running remotely)"
}
],
"metadata": {
"_draft": {
"nbviewer_url": "https://gist.github.com/d43f5e8e32b389a96d513bc64d505fde"
},
"gist": {
"id": "d43f5e8e32b389a96d513bc64d505fde",
"data": {
"description": "embulk_s3_input_tsv_perf",
"public": true
}
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3",
"language": "python"
},
"language_info": {
"name": "python",
"version": "3.6.5",
"mimetype": "text/x-python",
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"pygments_lexer": "ipython3",
"nbconvert_exporter": "python",
"file_extension": ".py"
},
"toc": {
"toc_cell": false,
"toc_number_sections": false,
"toc_threshold": 4,
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment