Skip to content

Instantly share code, notes, and snippets.

@ayaksvals
Created September 23, 2024 17:33
Show Gist options
  • Save ayaksvals/c9f29fe99e0a5192829ef9a29e09d7cc to your computer and use it in GitHub Desktop.
Save ayaksvals/c9f29fe99e0a5192829ef9a29e09d7cc to your computer and use it in GitHub Desktop.
Sorting pairs.gz files
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import duckdb"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"con = duckdb.connect(\":memory:\")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<duckdb.duckdb.DuckDBPyConnection at 0x2b02a82909f0>"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"con.execute(\"PRAGMA temp_directory='temp_dir';\")"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<duckdb.duckdb.DuckDBPyConnection at 0x2b02a82909f0>"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"con.execute(\"PRAGMA memory_limit='55GB';\")"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<duckdb.duckdb.DuckDBPyConnection at 0x2b02a82909f0>"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"con.execute(\"SET enable_progress_bar = true;\")"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 275 ms, sys: 43.1 ms, total: 318 ms\n",
"Wall time: 324 ms\n"
]
}
],
"source": [
"%%time\n",
"\n",
"input_file = 'large_csv.csv'\n",
"output_file = 'output.parquet'\n",
"\n",
"pairs = con.read_csv(\n",
" input_file, \n",
" sep=\"\\t\", \n",
" header=False, \n",
" parallel=True, \n",
" skiprows=10000,\n",
" names=[\"read_id\", \"chrom1\", \"pos1\", \"chrom2\", \"pos2\", \"strand1\", \"strand2\", \"pair_type\", \"mapq1\", \"mapq2\"]\n",
")\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.75 ms, sys: 0 ns, total: 1.75 ms\n",
"Wall time: 1.78 ms\n"
]
}
],
"source": [
"%%time\n",
"rel = pairs.order(\"chrom1, chrom2, pos1, pos2\")"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "fdeae9be975f4976a0440b9ec26ce3f4",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 18min 28s, sys: 21min 11s, total: 39min 40s\n",
"Wall time: 6min 24s\n"
]
}
],
"source": [
"%%time\n",
"rel.write_parquet(\"test.pq\", row_group_size=100_000_000, compression=\"zstd\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# 64G out of memory\n",
"# 120 OOM\n",
"# with hive_partitioning\n",
"# pragma memory_limit=64GB\n",
"# SET memory_limit=64GB Kernel crashed\n",
"\n",
"#30min without any sort\n",
"\n",
"#4.30min \n",
"#3.20, 3.55\n",
"\n",
"# 8 cores: 30 sec small file\n",
"# 6 sec 1 line of giant\n",
"\n",
"\n",
"# 2.5 hours for 7.5G file without any restrictions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# PREVIOUS VERSIONS"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"input_file = 'giant.parquet'\n",
"output_file = 'giant_sorted_duckdb.parquet'\n",
"schema = \"\"\"\n",
" read_id: VARCHAR,\n",
" chrom1: VARCHAR,\n",
" pos1: BIGINT,\n",
" chrom2: VARCHAR,\n",
" pos2: BIGINT,\n",
" strand1: VARCHAR,\n",
" strand2: VARCHAR,\n",
" pairs_type: VARCHAR\n",
"\"\"\"\n",
"con.execute(f\"\"\"\n",
" COPY (\n",
" SELECT *\n",
" FROM read_parquet('{input_file}')\n",
" ORDER BY chrom1, chrom2, pos1, pos2\n",
" \n",
" ) TO '{output_file}' (FORMAT PARQUET);\n",
"\"\"\")\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"input_file = 'giant.parquet'\n",
"output_file = 'giant_sorted_duckdb.parquet'\n",
"schema = \"\"\"\n",
" read_id: VARCHAR,\n",
" chrom1: VARCHAR,\n",
" pos1: BIGINT,\n",
" chrom2: VARCHAR,\n",
" pos2: BIGINT,\n",
" strand1: VARCHAR,\n",
" strand2: VARCHAR,\n",
" pairs_type: VARCHAR\n",
"\"\"\"\n",
"con.execute(f\"\"\"\n",
" COPY (\n",
" SELECT CAST(read_id AS VARCHAR), CAST(chrom1 AS VARCHAR), \n",
" CAST(pos1 AS BIGINT), CAST(chrom2 AS VARCHAR), \n",
" CAST(pos2 AS BIGINT), CAST(strand1 AS VARCHAR),\n",
" CAST(strand2 AS VARCHAR), CAST(pairs_type AS VARCHAR)\n",
" FROM read_parquet('{input_file}')\n",
" WHERE read_id NOT LIKE '#%' -- !!!!!!!!!!!!!! Filter out lines that start with '#samheader'\n",
" ORDER BY chrom1, chrom2, pos1, pos2\n",
" ) TO '{output_file}' (FORMAT PARQUET);\n",
"\"\"\")\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"input_file = 'giant.csv'\n",
"output_file = 'giant.parquet'\n",
"schema={\n",
" 'read_id': 'VARCHAR',\n",
" 'chrom1': 'VARCHAR',\n",
" 'pos1': 'INTEGER',\n",
" 'chrom2': 'VARCHAR',\n",
" 'pos2': 'INTEGER',\n",
" 'strand1': 'VARCHAR',\n",
" 'strand2': 'VARCHAR',\n",
" 'pairs_type': 'VARCHAR'\n",
"}\n",
"\n",
"con.execute(f\"\"\"\n",
" COPY (\n",
" SELECT *\n",
" FROM read_csv('{input_file}', delim='\\t', columns = {schema}, header=true, auto_detect=false, skip=1000)\n",
" ORDER BY chrom1, chrom2, pos1, pos2\n",
" \n",
" ) TO '{output_file}' (FORMAT PARQUET);\n",
"\"\"\")\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "main",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment