Skip to content

Instantly share code, notes, and snippets.

@ayaksvals
Created September 23, 2024 17:33
Show Gist options
  • Save ayaksvals/c9f29fe99e0a5192829ef9a29e09d7cc to your computer and use it in GitHub Desktop.
Save ayaksvals/c9f29fe99e0a5192829ef9a29e09d7cc to your computer and use it in GitHub Desktop.
Sorting pairs.gz files
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"import os\n",
"import tempfile\n",
"import pyarrow.parquet as pq\n",
"\n",
"import concurrent.futures\n",
"from functools import partial\n",
"from concurrent.futures import as_completed"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\"\"\" 1.4G comressed file \"\"\"\n",
"\n",
"\" Pairtools \"\n",
"# real 7m31.938s\n",
"# user 19m48.562s\n",
"# sys 13m21.282s\n",
"\n",
"\n",
"\" Pandas merge sort \" \n",
"# 15 min just to chunk data and to sort blocks\n",
"# 85 min to merge files\n",
"# 99\n",
"\n",
"\" Parquet merge sort \"\n",
"# 10 min to chunk data + sort blocks\n",
"# 5 min to merge\n",
"# 15\n",
"\"\"\n",
"\n",
"\" Parquet merge sort parallel\"\n",
"# 5 min to chunk data + sort blocks\n",
"# 5 min to merge\n",
"# 10\n",
"\n",
"\"\""
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>read_id</th>\n",
" <th>chr1</th>\n",
" <th>pos1</th>\n",
" <th>chr2</th>\n",
" <th>pos2</th>\n",
" <th>strand1</th>\n",
" <th>strand2</th>\n",
" <th>pairs_type</th>\n",
" <th>score1</th>\n",
" <th>score2</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003235</td>\n",
" <td>chr1</td>\n",
" <td>3003446</td>\n",
" <td>+</td>\n",
" <td>-</td>\n",
" <td>DD</td>\n",
" <td>60</td>\n",
" <td>60</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003235</td>\n",
" <td>chr1</td>\n",
" <td>3003446</td>\n",
" <td>+</td>\n",
" <td>-</td>\n",
" <td>DD</td>\n",
" <td>60</td>\n",
" <td>60</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003256</td>\n",
" <td>chr1</td>\n",
" <td>3005828</td>\n",
" <td>-</td>\n",
" <td>+</td>\n",
" <td>DD</td>\n",
" <td>57</td>\n",
" <td>60</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003264</td>\n",
" <td>chr1</td>\n",
" <td>3003342</td>\n",
" <td>-</td>\n",
" <td>-</td>\n",
" <td>DD</td>\n",
" <td>60</td>\n",
" <td>60</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003279</td>\n",
" <td>chr1</td>\n",
" <td>88256867</td>\n",
" <td>-</td>\n",
" <td>-</td>\n",
" <td>DD</td>\n",
" <td>60</td>\n",
" <td>60</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" read_id chr1 pos1 chr2 pos2 strand1 strand2 pairs_type score1 \\\n",
"0 . chr1 3003235 chr1 3003446 + - DD 60 \n",
"1 . chr1 3003235 chr1 3003446 + - DD 60 \n",
"2 . chr1 3003256 chr1 3005828 - + DD 57 \n",
"3 . chr1 3003264 chr1 3003342 - - DD 60 \n",
"4 . chr1 3003279 chr1 88256867 - - DD 60 \n",
"\n",
" score2 \n",
"0 60 \n",
"1 60 \n",
"2 60 \n",
"3 60 \n",
"4 60 "
]
},
"execution_count": 51,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"file_path='large_input.csv'\n",
"df = pd.read_csv(file_path, compression='gzip', nrows=5, skiprows=500, delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], header=None)\n",
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# PARQUET PARALLEL VERSION"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"def process_block(block, block_idx, temp_dir, merge_chunk_size, sort_columns):\n",
" # Sort the block\n",
" block_sorted = block.sort_values(by=sort_columns)\n",
" \n",
" # Write to temp file as Parquet\n",
" temp_file = os.path.join(temp_dir, f\"block_{block_idx}.parquet\")\n",
" block_sorted.to_parquet(temp_file, index=False, engine='pyarrow', row_group_size=merge_chunk_size)\n",
" \n",
" print(f\" Processed block {block_idx}\")\n",
" return temp_file # Return the temp file name for tracking\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"def process_in_parallel(input_file, temp_dir, input_chunk_size, merge_chunk_size, sort_columns, n_jobs=4):\n",
" print(f\"Temporary directory created at: {temp_dir}\")\n",
" \n",
" \n",
" # Step 1: Initial Chunking and Sorting\n",
" print(\"Step 1: Chunking and initial sorting...\")\n",
" \n",
" \n",
"\n",
"\n",
" # engine Pyarrow add \n",
" parquet_file = pq.ParquetFile(input_file)\n",
" temp_files = []\n",
" # Use a ProcessPoolExecutor to parallelize the processing\n",
" with concurrent.futures.ProcessPoolExecutor(max_workers=n_jobs) as executor:\n",
" futures = []\n",
" block_idx = 0\n",
" for row_group_idx in range(parquet_file.num_row_groups):\n",
" # Read a row group as a Pandas DataFrame\n",
" block = parquet_file.read_row_group(row_group_idx).to_pandas()\n",
" \n",
" # Submit the task to process and sort the row group\n",
" futures.append(executor.submit(process_block, block, block_idx, temp_dir, merge_chunk_size, sort_columns))\n",
" print(f\"Added block: {block_idx}\")\n",
" block_idx += 1\n",
" # Collect the results as they complete\n",
" \n",
" for futureTemp in as_completed(futures):\n",
" temp_files.append(futureTemp.result()) # Append the temp file name\n",
" \n",
" return temp_files"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Temporary directory created at: /users/slavska.olesia/scratch/slavska.olesia\n",
"Step 1: Chunking and initial sorting...\n",
"Added block: 0\n",
"Added block: 1\n",
"Added block: 2\n",
" Processed block 2\n",
" Processed block 0\n",
" Processed block 1\n"
]
}
],
"source": [
"input_file = '/users/slavska.olesia/test.pq'\n",
"temp_dir = '/users/slavska.olesia/scratch/slavska.olesia'\n",
"output_file='/users/slavska.olesia/projects/lesia/parquet_sorted.pq'\n",
"input_chunk_size = 100_000\n",
"merge_chunk_size = input_chunk_size // 10\n",
"sort_columns = [\"chrom1\", \"pos1\", \"chrom2\", \"pos2\"] # Columns to sort by\n",
"n_jobs = 4 \n",
"\n",
"\n",
"# Run the parallelized processing\n",
"temp_files = process_in_parallel(input_file, temp_dir, input_chunk_size, merge_chunk_size, sort_columns, n_jobs)\n"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Step 2: Merging sorted chunks...\n"
]
}
],
"source": [
"print(\"Step 2: Merging sorted chunks...\")\n",
"# Initialize readers for each temp file\n",
"# adding the iterator itself to the blocks list, not the actual data\n",
"blocks = []\n",
"\n",
"for temp_file in temp_files:\n",
" block = pq.ParquetFile(temp_file)\n",
" blocks.append(block)\n",
"\n",
"# Initialize current chunks (the current data from each temp file)\n",
"current_chunks = []\n",
"active_files = len(blocks)\n",
"\n",
"\n",
"for block in blocks:\n",
" try:\n",
" current_chunk = block.read_row_group(0).to_pandas()\n",
" #print(f\"Current_chunk: {current_chunk}\")\n",
" # Add 'last_in_chunk' column here\n",
" current_chunk['last_in_chunk'] = False\n",
" current_chunk.iloc[-1, current_chunk.columns.get_loc('last_in_chunk')] = True\n",
" current_chunks.append(current_chunk)\n",
" except StopIteration:\n",
" current_chunks.append(pd.DataFrame())\n",
" active_files -= 1\n",
"\n",
"carryover_data = pd.DataFrame()\n",
"\n",
"max_row_groups = max([block.num_row_groups for block in blocks]) \n"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"output_file='/users/slavska.olesia/projects/lesia/parquet_sorted.pq'"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"File opened\n",
"More files to come 3\n",
"Last in chunk id: Index([9999, 39998, 69998], dtype='int64')\n",
"Output in file: read_id chrom1 pos1 chrom2 pos2 strand1 strand2 \\\n",
"0 . chr1 3161595 chr1 3162007 + - \n",
"1 . chr1 3161595 chr1 3162007 + - \n",
"2 . chr1 3161632 chr1 3163821 + - \n",
"3 . chr1 3161632 chr1 3163821 + - \n",
"4 . chr1 3161685 chr11 33117922 + - \n",
"... ... ... ... ... ... ... ... \n",
"9995 . chr1 3312830 chr1 3314044 + + \n",
"9996 . chr1 3312849 chr1 3400069 - + \n",
"9997 . chr1 3312849 chrUn_GL456239 36362 - + \n",
"9998 . chr1 3312852 chr1 3314771 + + \n",
"9999 . chr1 3312852 chr1 3314771 + + \n",
"\n",
" pair_type mapq1 mapq2 last_in_chunk \n",
"0 DD 60 60 False \n",
"1 DD 60 60 False \n",
"2 DD 60 60 False \n",
"3 DD 60 60 False \n",
"4 DD 51 60 False \n",
"... ... ... ... ... \n",
"9995 DD 60 60 False \n",
"9996 DD 60 60 False \n",
"9997 DD 60 60 False \n",
"9998 DD 60 60 False \n",
"9999 DD 60 60 True \n",
"\n",
"[10000 rows x 11 columns]\n",
"Continue to work with: read_id chrom1 pos1 chrom2 pos2 strand1 \\\n",
"10000 . chr18 3000381 chr19 7178815 - \n",
"10001 . chr18 3000495 chr19 58113495 + \n",
"10002 . chr18 3000526 chr4_GL456216_random 49315 - \n",
"10003 . chr18 3000644 chrUn_JH584304 12857 - \n",
"10004 . chr18 3000848 chrUn_GL456387 1191 - \n",
"... ... ... ... ... ... ... \n",
"69995 . chr9 45116440 chr9 45116868 - \n",
"69996 . chr9 45116441 chr9 45116689 + \n",
"69997 . chr9 45116441 chr9 45116689 + \n",
"69998 . chr9 45116441 chr9 45119174 + \n",
"69999 . chr9 45116441 chr9 45119174 + \n",
"\n",
" strand2 pair_type mapq1 mapq2 last_in_chunk \n",
"10000 - DD 33 60 NaN \n",
"10001 + DD 7 15 NaN \n",
"10002 - DD 12 10 NaN \n",
"10003 - DD 9 31 NaN \n",
"10004 + DD 26 22 NaN \n",
"... ... ... ... ... ... \n",
"69995 + DD 60 60 NaN \n",
"69996 - DD 60 60 False \n",
"69997 - DD 60 60 NaN \n",
"69998 + DD 60 60 True \n",
"69999 + DD 60 60 NaN \n",
"\n",
"[60000 rows x 11 columns]\n",
"First Output\n",
"There are more chunks to process: 0\n",
"Before: \n",
" read_id chrom1 pos1 chrom2 pos2 strand1 strand2 pair_type \\\n",
"0 . chr9 25494731 chrX 71657825 + - DD \n",
"1 . chr9 25496082 chrX 9832149 + + DD \n",
"2 . chr9 25501055 chrX 23873659 + - DD \n",
"3 . chr9 25501055 chrX 23873659 + - DD \n",
"4 . chr9 25504469 chrX 97272382 + + DD \n",
"... ... ... ... ... ... ... ... ... \n",
"9995 . chr9 45116427 chr9 45124818 - + DD \n",
"9996 . chr9 45116427 chr9 45136760 - + DD \n",
"9997 . chr9 45116440 chr9 45116868 - + DD \n",
"9998 . chr9 45116441 chr9 45116689 + - DD \n",
"9999 . chr9 45116441 chr9 45119174 + + DD \n",
"\n",
" mapq1 mapq2 last_in_chunk \n",
"0 60 60 False \n",
"1 60 60 False \n",
"2 60 60 False \n",
"3 60 60 False \n",
"4 60 60 False \n",
"... ... ... ... \n",
"9995 60 60 False \n",
"9996 60 60 False \n",
"9997 60 60 False \n",
"9998 60 60 False \n",
"9999 60 60 True \n",
"\n",
"[10000 rows x 11 columns]\n",
"After: \n",
" read_id chrom1 pos1 chrom2 pos2 strand1 strand2 pair_type \\\n",
"0 . chr9 25494731 chrX 71657825 + - DD \n",
"1 . chr9 25496082 chrX 9832149 + + DD \n",
"2 . chr9 25501055 chrX 23873659 + - DD \n",
"3 . chr9 25501055 chrX 23873659 + - DD \n",
"4 . chr9 25504469 chrX 97272382 + + DD \n",
"... ... ... ... ... ... ... ... ... \n",
"9995 . chr9 45116427 chr9 45124818 - + DD \n",
"9996 . chr9 45116427 chr9 45136760 - + DD \n",
"9997 . chr9 45116440 chr9 45116868 - + DD \n",
"9998 . chr9 45116441 chr9 45116689 + - DD \n",
"9999 . chr9 45116441 chr9 45119174 + + DD \n",
"\n",
" mapq1 mapq2 \n",
"0 60 60 \n",
"1 60 60 \n",
"2 60 60 \n",
"3 60 60 \n",
"4 60 60 \n",
"... ... ... \n",
"9995 60 60 \n",
"9996 60 60 \n",
"9997 60 60 \n",
"9998 60 60 \n",
"9999 60 60 \n",
"\n",
"[10000 rows x 10 columns]\n",
"There are more chunks to process: 1\n",
"Before: \n",
" read_id chrom1 pos1 chrom2 pos2 strand1 strand2 \\\n",
"0 . chr1 3161595 chr1 3162007 + - \n",
"1 . chr1 3161595 chr1 3162007 + - \n",
"2 . chr1 3161632 chr1 3163821 + - \n",
"3 . chr1 3161632 chr1 3163821 + - \n",
"4 . chr1 3161685 chr11 33117922 + - \n",
"... ... ... ... ... ... ... ... \n",
"9995 . chr1 3312830 chr1 3314044 + + \n",
"9996 . chr1 3312849 chr1 3400069 - + \n",
"9997 . chr1 3312849 chrUn_GL456239 36362 - + \n",
"9998 . chr1 3312852 chr1 3314771 + + \n",
"9999 . chr1 3312852 chr1 3314771 + + \n",
"\n",
" pair_type mapq1 mapq2 last_in_chunk \n",
"0 DD 60 60 False \n",
"1 DD 60 60 False \n",
"2 DD 60 60 False \n",
"3 DD 60 60 False \n",
"4 DD 51 60 False \n",
"... ... ... ... ... \n",
"9995 DD 60 60 False \n",
"9996 DD 60 60 False \n",
"9997 DD 60 60 False \n",
"9998 DD 60 60 False \n",
"9999 DD 60 60 True \n",
"\n",
"[10000 rows x 11 columns]\n",
"After: \n",
" read_id chrom1 pos1 chrom2 pos2 strand1 strand2 \\\n",
"0 . chr1 3161595 chr1 3162007 + - \n",
"1 . chr1 3161595 chr1 3162007 + - \n",
"2 . chr1 3161632 chr1 3163821 + - \n",
"3 . chr1 3161632 chr1 3163821 + - \n",
"4 . chr1 3161685 chr11 33117922 + - \n",
"... ... ... ... ... ... ... ... \n",
"9995 . chr1 3312830 chr1 3314044 + + \n",
"9996 . chr1 3312849 chr1 3400069 - + \n",
"9997 . chr1 3312849 chrUn_GL456239 36362 - + \n",
"9998 . chr1 3312852 chr1 3314771 + + \n",
"9999 . chr1 3312852 chr1 3314771 + + \n",
"\n",
" pair_type mapq1 mapq2 \n",
"0 DD 60 60 \n",
"1 DD 60 60 \n",
"2 DD 60 60 \n",
"3 DD 60 60 \n",
"4 DD 51 60 \n",
"... ... ... ... \n",
"9995 DD 60 60 \n",
"9996 DD 60 60 \n",
"9997 DD 60 60 \n",
"9998 DD 60 60 \n",
"9999 DD 60 60 \n",
"\n",
"[10000 rows x 10 columns]\n",
"There are more chunks to process: 2\n",
"Before: \n",
" read_id chrom1 pos1 chrom2 pos2 strand1 strand2 \\\n",
"0 . chr18 12949247 chr19 61199832 + - \n",
"1 . chr18 12949247 chr19 61267134 + - \n",
"2 . chr18 12949247 chr19 61267150 + - \n",
"3 . chr18 12949247 chr19 61267152 + - \n",
"4 . chr18 12949247 chr19 61267152 + - \n",
"... ... ... ... ... ... ... ... \n",
"9995 . chr18 25819825 chrX 137630010 - - \n",
"9996 . chr18 25821477 chrUn_JH584304 23374 + + \n",
"9997 . chr18 25821493 chr19 3587273 - - \n",
"9998 . chr18 25822042 chr19 28054589 + - \n",
"9999 . chr18 25822878 chr19 29948244 - - \n",
"\n",
" pair_type mapq1 mapq2 last_in_chunk \n",
"0 DD 36 19 False \n",
"1 DD 18 54 False \n",
"2 DD 1 21 False \n",
"3 DD 24 60 False \n",
"4 DD 30 4 False \n",
"... ... ... ... ... \n",
"9995 DD 60 60 False \n",
"9996 DD 60 10 False \n",
"9997 DD 60 60 False \n",
"9998 DD 60 60 False \n",
"9999 DD 60 60 True \n",
"\n",
"[10000 rows x 11 columns]\n",
"After: \n",
" read_id chrom1 pos1 chrom2 pos2 strand1 strand2 \\\n",
"0 . chr18 12949247 chr19 61199832 + - \n",
"1 . chr18 12949247 chr19 61267134 + - \n",
"2 . chr18 12949247 chr19 61267150 + - \n",
"3 . chr18 12949247 chr19 61267152 + - \n",
"4 . chr18 12949247 chr19 61267152 + - \n",
"... ... ... ... ... ... ... ... \n",
"9995 . chr18 25819825 chrX 137630010 - - \n",
"9996 . chr18 25821477 chrUn_JH584304 23374 + + \n",
"9997 . chr18 25821493 chr19 3587273 - - \n",
"9998 . chr18 25822042 chr19 28054589 + - \n",
"9999 . chr18 25822878 chr19 29948244 - - \n",
"\n",
" pair_type mapq1 mapq2 \n",
"0 DD 36 19 \n",
"1 DD 18 54 \n",
"2 DD 1 21 \n",
"3 DD 24 60 \n",
"4 DD 30 4 \n",
"... ... ... ... \n",
"9995 DD 60 60 \n",
"9996 DD 60 10 \n",
"9997 DD 60 60 \n",
"9998 DD 60 60 \n",
"9999 DD 60 60 \n",
"\n",
"[10000 rows x 10 columns]\n"
]
}
],
"source": [
"with open(output_file, 'w') as outfile:\n",
" # This flag is used to write the header only once\n",
" header_written = False\n",
" print(\"File opened\")\n",
" for row_group_index in range(1, max_row_groups):\n",
" print(f\"More files to come {active_files}\")\n",
" # Concatenate current chunks and carryover data\n",
" data_frames = [df for df in current_chunks if not df.empty]\n",
" data_frames.append(carryover_data)\n",
" merged_chunk = pd.concat(data_frames, ignore_index=True)\n",
" \n",
" # If merged_chunk is empty, break the loop\n",
" if merged_chunk.empty:\n",
" break\n",
"\n",
" # Sort the merged chunk\n",
" merged_chunk.sort_values(by=sort_columns, inplace=True)\n",
" merged_chunk.reset_index(drop=True, inplace=True)\n",
" \n",
" # Find the position of the first 'last_in_chunk' == True\n",
" last_in_chunk_idx = merged_chunk.index[merged_chunk['last_in_chunk'] == True]\n",
" print(f\"Last in chunk id: {last_in_chunk_idx}\")\n",
" if not last_in_chunk_idx.empty:\n",
" split_idx = last_in_chunk_idx[0] + 1\n",
" # Data to write to output\n",
" data_to_output = merged_chunk.iloc[:split_idx]\n",
" print(f\"Output in file: {data_to_output}\")\n",
" # Data to carry over to next iteration\n",
" carryover_data = merged_chunk.iloc[split_idx:].copy()\n",
" print(f\"Continue to work with: {carryover_data}\")\n",
" carryover_data['last_in_chunk'] = False # Reset 'last_in_chunk' flags\n",
" else:\n",
" # If no 'last_in_chunk' found, carry all data to next iteration\n",
" data_to_output = pd.DataFrame()\n",
" carryover_data = merged_chunk\n",
"\n",
" # Write data_to_output to the output file\n",
"\n",
" if not data_to_output.empty:\n",
" if not header_written:\n",
" print(\"First Output\")\n",
" data_to_output.to_csv(outfile, index=False, header=True)\n",
" header_written = True\n",
" else:\n",
" data_to_output.to_csv(\n",
" outfile, index=False, header=False)\n",
" # Read next chunks from temp files\n",
" \n",
" for i in range(len(blocks)):\n",
" if (not current_chunks[i].empty) and (row_group_index < blocks[i].num_row_groups):\n",
" try:\n",
" # Read the next chunk\n",
" print(f\"There are more chunks to process: {i}\")\n",
" print(\"Before: \")\n",
" print(current_chunks[i])\n",
"\n",
" current_chunks[i] = blocks[i].read_row_group(row_group_index).to_pandas() \n",
" print(\"After: \")\n",
" print(current_chunks[i])\n",
"\n",
"\n",
" # Add 'last_in_chunk' column here\n",
" current_chunks[i]['last_in_chunk'] = False\n",
" current_chunks[i].iloc[-1, current_chunks[i].columns.get_loc('last_in_chunk')] = True\n",
" \n",
" except StopIteration:\n",
" current_chunks[i] = pd.DataFrame()\n",
" active_files -= 1\n",
" break \n",
"\n",
" # Write any remaining carryover data\n",
" if not carryover_data.empty:\n",
" carryover_data.drop(columns=['last_in_chunk'], inplace=True)\n",
" if not header_written:\n",
" carryover_data.to_csv(outfile, index=False, header=True)\n",
" else:\n",
" carryover_data.to_csv(outfile, index=False, header=False)\n"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Temporary directory created at: /users/slavska.olesia/scratch/slavska.olesia\n",
"Step 1: Chunking and initial sorting...\n",
" Processed chunk 1\n",
" Processed chunk 2\n",
" Processed chunk 3\n",
" Processed chunk 4\n",
" Processed chunk 5\n",
" Processed chunk 6\n",
" Processed chunk 7\n",
" Processed chunk 8\n",
" Processed chunk 9\n",
" Processed chunk 10\n",
" Processed chunk 11\n",
" Processed chunk 12\n",
" Processed chunk 13\n",
" Processed chunk 14\n",
" Processed chunk 15\n",
" Processed chunk 16\n",
" Processed chunk 17\n",
" Processed chunk 18\n",
" Processed chunk 19\n",
" Processed chunk 20\n",
" Processed chunk 21\n",
" Processed chunk 22\n"
]
}
],
"source": [
"# Not a parallel csv Version of first step, use as alternative\n",
"\n",
"# Read csv, divide into blocks, sort and save to parquet\n",
"\n",
"temp_dir = 'temp_dir/'\n",
"print(f\"Temporary directory created at: {temp_dir}\")\n",
"temp_files = []\n",
"\n",
"# Step 1: Initial Chunking and Sorting\n",
"print(\"Step 1: Chunking and initial sorting...\")\n",
"reader = pd.read_csv(input_file, chunksize=input_chunk_size, compression='gzip', skiprows=500, delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], header=None)\n",
"block_idx = 0\n",
"\n",
"merge_chunk_size = input_chunk_size // 10\n",
"\n",
"for block in reader:\n",
" # Sort the block\n",
" block_sorted = block.sort_values(by=sort_columns)\n",
" # Write to temp file without 'last_in_chunk'\n",
" temp_file = os.path.join(temp_dir, f\"block_{block_idx}.csv\")\n",
" block_sorted.to_parquet(temp_file, index=False, engine='pyarrow', row_group_size=merge_chunk_size)\n",
" temp_files.append(temp_file)\n",
" block_idx += 1\n",
" print(f\" Processed block {block_idx}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Parquet temp version\n",
"\n",
"def external_sort(input_file, output_file, sort_columns, input_chunk_size=10000000):\n",
" \"\"\"\n",
" Sorts a large CSV file using external sorting with pandas.\n",
" \n",
" Parameters:\n",
" - input_file: Path to the input CSV file.\n",
" - output_file: Path where the sorted CSV file will be saved.\n",
" - sort_columns: List of column names to sort by.\n",
" - input_chunk_size: Number of rows to read at a time during initial chunking.\n",
" - merge_chunk_size: Number of rows to read from each temp file during merging.\n",
" \"\"\"\n",
" temp_dir = ''\n",
" print(f\"Temporary directory created at: {temp_dir}\")\n",
" temp_files = []\n",
"\n",
" # Step 1: Initial Chunking and Sorting\n",
" print(\"Step 1: Chunking and initial sorting...\")\n",
" reader = pd.read_csv(input_file, chunksize=input_chunk_size, compression='gzip', skiprows=500, delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], header=None)\n",
" chunk_idx = 0\n",
"\n",
" merge_chunk_size = input_chunk_size // 10\n",
"\n",
" for chunk in reader:\n",
" # Sort the chunk\n",
" chunk_sorted = chunk.sort_values(by=sort_columns)\n",
" # Write to temp file without 'last_in_chunk'\n",
" temp_file = os.path.join(temp_dir, f\"chunk_{chunk_idx}.csv\")\n",
" chunk_sorted.to_parquet(temp_file, index=False, engine='pyarrow', row_group_size=merge_chunk_size)\n",
" temp_files.append(temp_file)\n",
" chunk_idx += 1\n",
" print(f\" Processed chunk {chunk_idx}\")\n",
"\n",
" # Step 2: Merging Sorted Chunks\n",
" print(\"Step 2: Merging sorted chunks...\")\n",
" \n",
" # Initialize readers for each temp file\n",
" # adding the iterator itself to the blocks list, not the actual data\n",
" blocks = []\n",
" \n",
" for temp_file in temp_files:\n",
" block = pq.ParquetFile(temp_file)\n",
" blocks.append(block)\n",
"\n",
"\n",
"\n",
" # Initialize current chunks (the current data from each temp file)\n",
" current_chunks = []\n",
" active_files = len(blocks)\n",
"\n",
"\n",
" for block in blocks:\n",
" try:\n",
" current_chunk = block.read_row_group(0).to_pandas()\n",
" #print(f\"Current_chunk: {current_chunk}\")\n",
" # Add 'last_in_chunk' column here\n",
" current_chunk['last_in_chunk'] = False\n",
" current_chunk.iloc[-1, current_chunk.columns.get_loc('last_in_chunk')] = True\n",
" current_chunks.append(current_chunk)\n",
" except StopIteration:\n",
" current_chunks.append(pd.DataFrame())\n",
" active_files -= 1\n",
"\n",
" carryover_data = pd.DataFrame()\n",
"\n",
" max_row_groups = max([block.num_row_groups for block in blocks]) \n",
"\n",
" # Open the output file for writing\n",
" with open(output_file, 'w') as outfile:\n",
" # This flag is used to write the header only once\n",
" header_written = False\n",
" print(\"File opened\")\n",
" for row_group_index in range(1, max_row_groups):\n",
" print(f\"More files to come {active_files}\")\n",
" # Concatenate current chunks and carryover data\n",
" data_frames = [df for df in current_chunks if not df.empty]\n",
" data_frames.append(carryover_data)\n",
" merged_chunk = pd.concat(data_frames, ignore_index=True)\n",
" \n",
" # If merged_chunk is empty, break the loop\n",
" if merged_chunk.empty:\n",
" break\n",
"\n",
" # Sort the merged chunk\n",
" merged_chunk.sort_values(by=sort_columns, inplace=True)\n",
" merged_chunk.reset_index(drop=True, inplace=True)\n",
" \n",
" # Find the position of the first 'last_in_chunk' == True\n",
" last_in_chunk_idx = merged_chunk.index[merged_chunk['last_in_chunk'] == True]\n",
" print(f\"Last in chunk id: {last_in_chunk_idx}\")\n",
" if not last_in_chunk_idx.empty:\n",
" split_idx = last_in_chunk_idx[0] + 1\n",
" # Data to write to output\n",
" data_to_output = merged_chunk.iloc[:split_idx]\n",
" print(f\"Output in file: {data_to_output}\")\n",
" # Data to carry over to next iteration\n",
" carryover_data = merged_chunk.iloc[split_idx:].copy()\n",
" print(f\"Continue to work with: {carryover_data}\")\n",
" carryover_data['last_in_chunk'] = False # Reset 'last_in_chunk' flags\n",
" else:\n",
" # If no 'last_in_chunk' found, carry all data to next iteration\n",
" data_to_output = pd.DataFrame()\n",
" carryover_data = merged_chunk\n",
"\n",
" # Write data_to_output to the output file\n",
"\n",
" if not data_to_output.empty:\n",
" if not header_written:\n",
" print(\"First Output\")\n",
" data_to_output.to_csv(outfile, index=False, header=True)\n",
" header_written = True\n",
" else:\n",
" data_to_output.to_csv(\n",
" outfile, index=False, header=False)\n",
" # Read next chunks from temp files\n",
" \n",
" for i in range(len(blocks)):\n",
" if not current_chunks[i].empty:\n",
" try:\n",
" # Read the next chunk\n",
"\n",
" print(f\"There are more chunks to process: {i}\")\n",
" print(\"Before: \")\n",
" print(current_chunks[i])\n",
"\n",
" current_chunk[i] = blocks[i].read_row_group(row_group_index).to_pandas() \n",
" print(\"After: \")\n",
" print(current_chunks[i])\n",
"\n",
"\n",
" # Add 'last_in_chunk' column here\n",
" current_chunks[i]['last_in_chunk'] = False\n",
" current_chunks[i].iloc[-1, current_chunks[i].columns.get_loc('last_in_chunk')] = True\n",
" \n",
" except StopIteration:\n",
" current_chunks[i] = pd.DataFrame()\n",
" active_files -= 1\n",
" break \n",
"\n",
" # Write any remaining carryover data\n",
" if not carryover_data.empty:\n",
" carryover_data.drop(columns=['last_in_chunk'], inplace=True)\n",
" if not header_written:\n",
" carryover_data.to_csv(outfile, index=False, header=True)\n",
" else:\n",
" carryover_data.to_csv(outfile, index=False, header=False)\n",
" \n",
"\n",
" # Clean up temporary files\n",
" print(\"Cleaning up temporary files...\")\n",
" for temp_file in temp_files:\n",
" os.remove(temp_file)\n",
" os.rmdir(temp_dir)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"external_sort('large_input.csv', 'sorted_output.csv', sort_columns=['chr1', 'chr2', 'pos1', 'pos2'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# CSV temp version\n",
"\n",
"def external_sort(input_file, output_file, sort_columns, input_chunk_size=10000000):\n",
" \"\"\"\n",
" Sorts a large CSV file using external sorting with pandas.\n",
" \n",
" Parameters:\n",
" - input_file: Path to the input CSV file.\n",
" - output_file: Path where the sorted CSV file will be saved.\n",
" - sort_columns: List of column names to sort by.\n",
" - input_chunk_size: Number of rows to read at a time during initial chunking.\n",
" - merge_chunk_size: Number of rows to read from each temp file during merging.\n",
" \"\"\"\n",
" #temp_dir = tempfile.mkdtemp()\n",
" temp_dir = '/users/slavska.olesia/scratch/slavska.olesia'\n",
" print(f\"Temporary directory created at: {temp_dir}\")\n",
" temp_files = []\n",
"\n",
" # Step 1: Initial Chunking and Sorting\n",
" print(\"Step 1: Chunking and initial sorting...\")\n",
" reader = pd.read_csv(input_file, chunksize=input_chunk_size, compression='gzip', skiprows=500, delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], header=None)\n",
" chunk_idx = 0\n",
"\n",
" merge_chunk_size=input_chunk_size // chunk_idx\n",
"\n",
" for chunk in reader:\n",
" # Sort the chunk\n",
" chunk_sorted = chunk.sort_values(by=sort_columns)\n",
" # Write to temp file without 'last_in_chunk'\n",
" temp_file = os.path.join(temp_dir, f\"chunk_{chunk_idx}.csv\")\n",
" chunk_sorted.to_csv(temp_file, index=False)\n",
" temp_files.append(temp_file)\n",
" chunk_idx += 1\n",
" print(f\" Processed chunk {chunk_idx}\")\n",
"\n",
" # Step 2: Merging Sorted Chunks\n",
" print(\"Step 2: Merging sorted chunks...\")\n",
" \n",
" \n",
" \n",
" \n",
" # Initialize readers for each temp file\n",
"\n",
"\n",
" # adding the iterator itself to the blocks list, not the actual data\n",
" blocks = []\n",
" \n",
" for temp_file in temp_files:\n",
" block = pd.read_csv(temp_file) #returns iterator\n",
" blocks.append(block)\n",
"\n",
" # Initialize current chunks (the current data from each temp file)\n",
" current_chunks = []\n",
" active_files = len(blocks)\n",
" \n",
" for reader in blocks:\n",
" try:\n",
" current_chunk = next(reader)\n",
" print(f\"Current_chunk: {current_chunk}\")\n",
" # Add 'last_in_chunk' column here\n",
" current_chunk['last_in_chunk'] = False\n",
" current_chunk.iloc[-1, current_chunk.columns.get_loc('last_in_chunk')] = True\n",
" current_chunks.append(current_chunk)\n",
" except StopIteration:\n",
" current_chunks.append(pd.DataFrame())\n",
" active_files -= 1\n",
"\n",
" carryover_data = pd.DataFrame()\n",
"\n",
" # Open the output file for writing\n",
" with open(output_file, 'w') as outfile:\n",
" # This flag is used to write the header only once\n",
" header_written = False\n",
" print(\"File opened\")\n",
" while active_files > 0:\n",
" print(f\"More files to come {active_files}\")\n",
" # Concatenate current chunks and carryover data\n",
" data_frames = [df for df in current_chunks if not df.empty]\n",
" data_frames.append(carryover_data)\n",
" merged_chunk = pd.concat(data_frames, ignore_index=True)\n",
" \n",
" # If merged_chunk is empty, break the loop\n",
" if merged_chunk.empty:\n",
" break\n",
" # Sort the merged chunk\n",
" merged_chunk.sort_values(by=sort_columns, inplace=True)\n",
" merged_chunk.reset_index(drop=True, inplace=True)\n",
"\n",
" \n",
" \n",
" # Find the position of the first 'last_in_chunk' == True\n",
" last_in_chunk_idx = merged_chunk.index[merged_chunk['last_in_chunk'] == True]\n",
" print(f\"Last in chunk id: {last_in_chunk_idx}\")\n",
" if not last_in_chunk_idx.empty:\n",
" split_idx = last_in_chunk_idx[0] + 1\n",
" # Data to write to output\n",
" data_to_output = merged_chunk.iloc[:split_idx]\n",
" print(f\"Output in file: {data_to_output}\")\n",
" # Data to carry over to next iteration\n",
" carryover_data = merged_chunk.iloc[split_idx:].copy()\n",
" print(f\"Continue to work with: {carryover_data}\")\n",
" carryover_data['last_in_chunk'] = False # Reset 'last_in_chunk' flags\n",
" else:\n",
" # If no 'last_in_chunk' found, carry all data to next iteration\n",
" data_to_output = pd.DataFrame()\n",
" carryover_data = merged_chunk\n",
"\n",
" # Write data_to_output to the output file\n",
"\n",
" if not data_to_output.empty:\n",
" if not header_written:\n",
" print(\"First Output\")\n",
" data_to_output.to_csv(outfile, index=False, header=True)\n",
" header_written = True\n",
" else:\n",
" data_to_output.to_csv(\n",
" outfile, index=False, header=False)\n",
" # Read next chunks from temp files\n",
" \n",
" for i in range(len(blocks)):\n",
" if not current_chunks[i].empty:\n",
" try:\n",
" # Read the next chunk\n",
" print(f\"There are more chunks to process: {i}\")\n",
" print(\"Before: \")\n",
" print(current_chunks[i])\n",
"\n",
" current_chunks[i] = next(blocks[i])\n",
" \n",
" print(\"After: \")\n",
" print(current_chunks[i])\n",
"\n",
"\n",
" # Add 'last_in_chunk' column here\n",
" current_chunks[i]['last_in_chunk'] = False\n",
" current_chunks[i].iloc[-1, current_chunks[i].columns.get_loc('last_in_chunk')] = True\n",
" \n",
" except StopIteration:\n",
" current_chunks[i] = pd.DataFrame()\n",
" active_files -= 1\n",
" \n",
"\n",
" # Write any remaining carryover data\n",
" if not carryover_data.empty:\n",
" carryover_data.drop(columns=['last_in_chunk'], inplace=True)\n",
" if not header_written:\n",
" carryover_data.to_csv(outfile, index=False, header=True)\n",
" else:\n",
" carryover_data.to_csv(outfile, index=False, header=False)\n",
" \n",
"\n",
" # Clean up temporary files\n",
" print(\"Cleaning up temporary files...\")\n",
" for temp_file in temp_files:\n",
" os.remove(temp_file)\n",
" os.rmdir(temp_dir)\n",
"\n",
"# Example usage:\n",
"# external_sort('large_input.csv', 'sorted_output.csv', sort_columns=['column_to_sort_by'])"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Generate sample data\n",
"\n",
"num_rows = 1000000 # Adjust the size as needed for testing\n",
"data = {\n",
" 'id': np.arange(num_rows),\n",
" 'value': np.random.randint(0, 100000, size=num_rows)\n",
"}\n",
"df = pd.DataFrame(data)\n",
"df.to_csv('large_input.csv', index=False)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment