Last active
June 11, 2018 03:10
-
-
Save huylenq/d43f5e8e32b389a96d513bc64d505fde to your computer and use it in GitHub Desktop.
embulk_s3_input_tsv_perf
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "cells": [ | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "# Embulk S3 Input for TSV Performance" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "Automatically generate TSV on variable number of rows X columns and upload to S3, then execute the corresponding embulk-input-s3 task both locally with embulk and remotely on BulkLoader via td-api\n\nRequirements: `aws` CLI installed (and authenticated through \"~/.aws/credentials\"), `python3.6`, some Python packages (`python-dotenv` and `PyYAML`). And assume you have a embulk s3 input config.yml on the same diretory with the notebook-dir." | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| }, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "import os\nfrom dotenv import load_dotenv\nload_dotenv(dotenv_path='.env')\n\n# Configurations\nS3_PATH_PREFIX = os.getenv('S3_PATH_PREFIX')\nTD_API_KEY = os.getenv('TD_API_KEY')\nTD_API_ENDPOINT = os.getenv('TD_API_ENDPOINT')\n\n# Miscs.\nimport time\ndef format_duration(secs):\n return time.strftime('%H:%M:%S', time.gmtime(secs))\n\ndef adhoc(job_id):\n \"\"\"Artificial result to replace the cell's widget output to support static rendering\n \"\"\"\n print(f'Started job {job_id}')\n print(f'Job {job_id} done in {job_duration(job_id)} seconds')", | |
| "execution_count": 150, | |
| "outputs": [] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "## TSV Generation" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| }, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "import csv\nimport string\nfrom itertools import cycle, islice\n\ndef generate(file_path, col=10, row=100, delimiter='\\t'):\n headers = islice(cycle(list(string.ascii_uppercase)), col)\n row_data = range(0, col)\n with open(file_path, 'w') as f:\n wr = csv.writer(f, delimiter='\\t')\n wr.writerow(headers)\n for _ in range(row):\n wr.writerow(row_data)\n\ndef upload(file_path):\n !aws s3 cp {file_path} s3://{S3_PATH_PREFIX}/{file_path} >/dev/null 2>&1\n\ndef sample_out_columns():\n !head -1 out/000.00.csv | sed 's/[^,]//g' | wc -c", | |
| "execution_count": 193, | |
| "outputs": [] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "## Config Update" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| }, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "import yaml\nimport collections\nfrom shutil import copyfile\n\n# Reserve yaml entries order\n_mapping_tag = yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG\ndef dict_representer(dumper, data):\n return dumper.represent_dict(data.items())\ndef dict_constructor(loader, node):\n return collections.OrderedDict(loader.construct_pairs(node))\nyaml.add_representer(collections.OrderedDict, dict_representer)\nyaml.add_constructor(_mapping_tag, dict_constructor)\n\ndef update_config(row, col):\n with open('config.yml', 'r') as f:\n copyfile('config.yml', 'config.bak.yml')\n config = yaml.load(f)\n config['in']['path_prefix'] = f'cs_2867/large_{row}_{col}.csv'\n with open('config.yml', 'w') as f:\n yaml.dump(config, f)\n\ndef read_config():\n with open('config.yml', 'r') as f:\n config = yaml.load(f)\n return config", | |
| "execution_count": 15, | |
| "outputs": [] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "read_only": false, | |
| "frozen": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "## Remote Job Handling" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| }, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "from tdclient import api, errors\n\n_td = None\ndef td(ensure=False):\n def connect():\n global _td\n _td = api.API(apikey=TD_API_KEY, endpoint=TD_API_ENDPOINT)\n if not _td:\n connect()\n if ensure:\n try:\n with _td.get('/') as res:\n if res.status // 100 != 2:\n connect()\n except:\n connect()\n return _td", | |
| "execution_count": 104, | |
| "outputs": [] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| }, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "def establish_connection(name, db='huy', table='cs_2867'):\n \"\"\"Establish the remote connetion on development server based on the local config file\n \"\"\"\n local_config = read_config()\n guessed_config = td().connector_guess({'config': local_config})['config']\n guessed_config['out'] = {'type': 'td_internal'}\n try:\n td().connector_delete(name)\n except errors.NotFoundError:\n pass\n return td().connector_create(name, 'huy', 'cs_2867', {'config': guessed_config})", | |
| "execution_count": 99, | |
| "outputs": [] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "run_control": { | |
| "read_only": false, | |
| "frozen": false | |
| } | |
| }, | |
| "cell_type": "code", | |
| "source": "import threading\nimport time\nimport json\nimport ipywidgets as widgets\n\ndef job_logs(job_id, source='output'):\n with td().get(f\"/v4/jobs/{job_id}/log?source={source}\") as res:\n print(str(res.read()).replace('\\\\n', '\\n').replace('\\\\t', '\\t'))\n\ndef job_duration(job_id):\n job = td().show_job(job_id)\n return (job['end_at'] - job['created_at']).seconds\n\n\nclass JobChecker(threading.Thread):\n def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):\n super().__init__(group=group, target=target, name=name, daemon=daemon)\n self.job_id = None\n self.should_stop = False\n self.output = None\n def run(self):\n while True:\n if self.should_stop or td(ensure=True).job_status(self.job_id) == 'success':\n with self.output:\n print(f'Job {self.job_id} done in {job_duration(self.job_id)} seconds')\n return\n time.sleep(10)", | |
| "execution_count": 176, | |
| "outputs": [] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "## Execution" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| }, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "from time\n\ndef run(rows, cols, delimiter='\\t', file_exists=False):\n if not file_exists:\n fname = f'large_{rows}_{cols}.csv'\n generate(fname, rows, cols, delimiter)\n upload(fname)\n update_config(rows, cols)\n !embulk guess config.yml -o config.yml >/dev/null 2>&1\n start = time.time()\n !embulk run config.yml >/dev/null 2>&1\n return f\"Import completed in {time() - start:.4f} secs\"", | |
| "execution_count": null, | |
| "outputs": [] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| }, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "def run_remotely(rows, cols, delimiter='\\t', file_exists=False):\n if not file_exists:\n fname = f'large_{rows}_{cols}.csv'\n generate(fname, rows, cols, delimiter)\n upload(fname)\n update_config(rows, cols) # establish_connection relies on local config.yml\n connection_name = f'huy_cs2867_{rows}_{cols}'\n establish_connection(connection_name)\n job = td().connector_run(connection_name)\n job_checker = JobChecker(name=f\"job_checker_{job['job_id']}\")\n job_checker.job_id = job['job_id']\n job_checker.output= widgets.Output()\n with job_checker.output:\n print(f\"Started job {job['job_id']}\")\n job_checker.start()\n return job_checker.output", | |
| "execution_count": 191, | |
| "outputs": [] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "## Sample Runs" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "read_only": false, | |
| "frozen": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "### Local" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "#### 100,000 rows with 100 -> 500 -> 1000 columns" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "scrolled": false, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run(rows=100_000, cols=100)", | |
| "execution_count": 153, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": "'Import completed in 32.21 secs'" | |
| }, | |
| "execution_count": 153, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "scrolled": true, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run(rows=100_000, cols=500)", | |
| "execution_count": 154, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": "'Import completed in 133.3 secs'" | |
| }, | |
| "execution_count": 154, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "scrolled": true, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run(rows=100_000, cols=1_000)", | |
| "execution_count": 155, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": "'Import completed in 276.2 secs'" | |
| }, | |
| "execution_count": 155, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "#### 1,000,000 rows with 100 -> 500 -> 1000 -> 2000 columns" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run(rows=1_000_000, cols=100)", | |
| "execution_count": 156, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": "'Import completed in 311.6 secs'" | |
| }, | |
| "execution_count": 156, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run(rows=1_000_000, cols=500)", | |
| "execution_count": 165, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": "'Import completed in 1568.0 secs'" | |
| }, | |
| "execution_count": 165, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "scrolled": false, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run(rows=1_000_000, cols=1_000)", | |
| "execution_count": 169, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": "'Import completed in 3005.0 secs'" | |
| }, | |
| "execution_count": 169, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "scrolled": true, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run(rows=1_000_000, cols=2_000)", | |
| "execution_count": 6, | |
| "outputs": [ | |
| { | |
| "data": { | |
| "text/plain": "'Import completed in 4814.9043 secs'" | |
| }, | |
| "execution_count": 6, | |
| "metadata": {}, | |
| "output_type": "execute_result" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "read_only": false, | |
| "frozen": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "### Remote" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "read_only": false, | |
| "frozen": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "The below results may not be rendered on some static notebook rendering tool due to the use of widget (necessary for thread-bound output)" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "#### 100,000 rows with 100 -> 500 -> 1000 columns" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "scrolled": false, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run_remotely(rows=100_000, cols=100, file_exists=True)", | |
| "execution_count": 145, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "Started job 7458179\nJob 7458179 done in 62 seconds\n" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "scrolled": false, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run_remotely(rows=100_000, cols=500, file_exists=True)", | |
| "execution_count": 146, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "Started job 7458190\nJob 7458190 done in 115 seconds\n" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "scrolled": false, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run_remotely(rows=100_000, cols=1000, file_exists=True)", | |
| "execution_count": 147, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "Started job 7458196\nJob 7458196 done in 196 seconds\n" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": false, | |
| "read_only": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "#### 1,000,000 rows with 100 -> 500 -> 1000 -> 2000 columns" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "scrolled": true, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run_remotely(rows=1_000_000, cols=100, file_exists=True)", | |
| "execution_count": 144, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "Started job 7460936\nJob 7460936 done in 168 seconds\n" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "scrolled": true, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run_remotely(rows=1_000_000, cols=500, file_exists=True)", | |
| "execution_count": 143, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "Started job 7460912\nJob 7460912 done in 717 seconds\n" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "read_only": false, | |
| "frozen": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "Aha! Reproduced." | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "scrolled": true, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run_remotely(rows=1_000_000, cols=1000, file_exists=True)", | |
| "execution_count": 137, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "Started job 7458260\nJob 7458260 done in 11277 seconds\n" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "read_only": false, | |
| "frozen": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "Also a bump at 2k cols, but now growing so much far away from 1k, is there a threshold at some point that trigger this performance degration?" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "scrolled": true, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run_remotely(rows=1_000_000, cols=2000, file_exists=True)", | |
| "execution_count": 142, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "Started job 7458264\nJob 7458264 done in 12442 seconds\n" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "read_only": false, | |
| "frozen": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "Let's see if CSV having the same issue" | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "frozen": true, | |
| "read_only": true | |
| }, | |
| "scrolled": true, | |
| "trusted": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run_remotely(rows=1_000_000, cols=1000, delimiter=',')", | |
| "execution_count": 199, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "Started job 7465732\nJob 7465732 done in 1263 seconds\n" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "trusted": true, | |
| "run_control": { | |
| "read_only": true, | |
| "frozen": true | |
| }, | |
| "scrolled": true | |
| }, | |
| "cell_type": "code", | |
| "source": "run_remotely(rows=1_000_000, cols=2000, delimiter=',')", | |
| "execution_count": 202, | |
| "outputs": [ | |
| { | |
| "name": "stdout", | |
| "output_type": "stream", | |
| "text": "Started job 7466811\nJob 7466811 done in 2118 seconds\n" | |
| } | |
| ] | |
| }, | |
| { | |
| "metadata": { | |
| "run_control": { | |
| "read_only": false, | |
| "frozen": false | |
| } | |
| }, | |
| "cell_type": "markdown", | |
| "source": "Nope, it' not. So the problem is really with TSV (but only when running remotely)" | |
| } | |
| ], | |
| "metadata": { | |
| "_draft": { | |
| "nbviewer_url": "https://gist.github.com/d43f5e8e32b389a96d513bc64d505fde" | |
| }, | |
| "gist": { | |
| "id": "d43f5e8e32b389a96d513bc64d505fde", | |
| "data": { | |
| "description": "embulk_s3_input_tsv_perf", | |
| "public": true | |
| } | |
| }, | |
| "kernelspec": { | |
| "name": "python3", | |
| "display_name": "Python 3", | |
| "language": "python" | |
| }, | |
| "language_info": { | |
| "name": "python", | |
| "version": "3.6.5", | |
| "mimetype": "text/x-python", | |
| "codemirror_mode": { | |
| "name": "ipython", | |
| "version": 3 | |
| }, | |
| "pygments_lexer": "ipython3", | |
| "nbconvert_exporter": "python", | |
| "file_extension": ".py" | |
| }, | |
| "toc": { | |
| "toc_cell": false, | |
| "toc_number_sections": false, | |
| "toc_threshold": 4, | |
| "toc_window_display": false | |
| } | |
| }, | |
| "nbformat": 4, | |
| "nbformat_minor": 2 | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment