Skip to content

Instantly share code, notes, and snippets.

@ayaksvals
Created September 23, 2024 17:33
Show Gist options
  • Save ayaksvals/c9f29fe99e0a5192829ef9a29e09d7cc to your computer and use it in GitHub Desktop.
Save ayaksvals/c9f29fe99e0a5192829ef9a29e09d7cc to your computer and use it in GitHub Desktop.
Sorting pairs.gz files
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 44,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"import os\n",
"import tempfile\n",
"import pyarrow.parquet as pq\n",
"\n",
"import concurrent.futures\n",
"from functools import partial\n",
"from concurrent.futures import as_completed"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\"\"\" 1.4G comressed file \"\"\"\n",
"\n",
"\" Pairtools \"\n",
"# real 7m31.938s\n",
"# user 19m48.562s\n",
"# sys 13m21.282s\n",
"\n",
"\n",
"\" Pandas merge sort \" \n",
"# 15 min just to chunk data and to sort blocks\n",
"# 85 min to merge files\n",
"# 99\n",
"\n",
"\" Parquet merge sort \"\n",
"# 10 min to chunk data + sort blocks\n",
"# 5 min to merge\n",
"# 15\n",
"\"\"\n",
"\n",
"\" Parquet merge sort parallel\"\n",
"# 5 min to chunk data + sort blocks\n",
"# 5 min to merge\n",
"# 10\n",
"\n",
"\"\""
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>read_id</th>\n",
" <th>chr1</th>\n",
" <th>pos1</th>\n",
" <th>chr2</th>\n",
" <th>pos2</th>\n",
" <th>strand1</th>\n",
" <th>strand2</th>\n",
" <th>pairs_type</th>\n",
" <th>score1</th>\n",
" <th>score2</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003235</td>\n",
" <td>chr1</td>\n",
" <td>3003446</td>\n",
" <td>+</td>\n",
" <td>-</td>\n",
" <td>DD</td>\n",
" <td>60</td>\n",
" <td>60</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003235</td>\n",
" <td>chr1</td>\n",
" <td>3003446</td>\n",
" <td>+</td>\n",
" <td>-</td>\n",
" <td>DD</td>\n",
" <td>60</td>\n",
" <td>60</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003256</td>\n",
" <td>chr1</td>\n",
" <td>3005828</td>\n",
" <td>-</td>\n",
" <td>+</td>\n",
" <td>DD</td>\n",
" <td>57</td>\n",
" <td>60</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003264</td>\n",
" <td>chr1</td>\n",
" <td>3003342</td>\n",
" <td>-</td>\n",
" <td>-</td>\n",
" <td>DD</td>\n",
" <td>60</td>\n",
" <td>60</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>.</td>\n",
" <td>chr1</td>\n",
" <td>3003279</td>\n",
" <td>chr1</td>\n",
" <td>88256867</td>\n",
" <td>-</td>\n",
" <td>-</td>\n",
" <td>DD</td>\n",
" <td>60</td>\n",
" <td>60</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" read_id chr1 pos1 chr2 pos2 strand1 strand2 pairs_type score1 \\\n",
"0 . chr1 3003235 chr1 3003446 + - DD 60 \n",
"1 . chr1 3003235 chr1 3003446 + - DD 60 \n",
"2 . chr1 3003256 chr1 3005828 - + DD 57 \n",
"3 . chr1 3003264 chr1 3003342 - - DD 60 \n",
"4 . chr1 3003279 chr1 88256867 - - DD 60 \n",
"\n",
" score2 \n",
"0 60 \n",
"1 60 \n",
"2 60 \n",
"3 60 \n",
"4 60 "
]
},
"execution_count": 51,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"file_path='large_input.csv'\n",
"df = pd.read_csv(file_path, compression='gzip', nrows=5, skiprows=500, delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], header=None)\n",
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# PARQUET PARALLEL VERSION"
]
},
{
"cell_type": "code",
"execution_count": 60,
"metadata": {},
"outputs": [],
"source": [
"def process_block(block, block_idx, temp_dir, merge_chunk_size, sort_columns):\n",
" # Sort the block\n",
" block_sorted = block.sort_values(by=sort_columns)\n",
" \n",
" # Write to temp file as Parquet\n",
" temp_file = os.path.join(temp_dir, f\"block_{block_idx}.parquet\")\n",
" block_sorted.to_parquet(temp_file, index=False, engine='pyarrow', row_group_size=merge_chunk_size)\n",
" \n",
" print(f\" Processed block {block_idx}\")\n",
" return temp_file # Return the temp file name for tracking\n"
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [],
"source": [
"def process_in_parallel(input_file, temp_dir, input_chunk_size, merge_chunk_size, sort_columns, n_jobs=4):\n",
" print(f\"Temporary directory created at: {temp_dir}\")\n",
" \n",
" \n",
" # Step 1: Initial Chunking and Sorting\n",
" print(\"Step 1: Chunking and initial sorting...\")\n",
" \n",
"\n",
" # engine Pyarrow add \n",
" reader = pd.read_csv(input_file, chunksize=input_chunk_size, compression='gzip', skiprows=500, \n",
" delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \n",
" \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], \n",
" header=None)\n",
" temp_files = []\n",
" # Use a ProcessPoolExecutor to parallelize the processing\n",
" with concurrent.futures.ProcessPoolExecutor(max_workers=n_jobs) as executor:\n",
" futures = []\n",
" block_idx = 0\n",
" \n",
" for block in reader:\n",
" # Submit the task to the executor\n",
" futures.append(executor.submit(process_block, block, block_idx, temp_dir, merge_chunk_size, sort_columns))\n",
" print(f\"Added block: {block_idx}\")\n",
" block_idx += 1\n",
" \n",
" # Collect the results as they complete\n",
" \n",
" for futureTemp in as_completed(futures):\n",
" temp_files.append(futureTemp.result()) # Append the temp file name\n",
" \n",
" return temp_files"
]
},
{
"cell_type": "code",
"execution_count": 62,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Temporary directory created at: /users/slavska.olesia/scratch/slavska.olesia\n",
"Step 1: Chunking and initial sorting...\n",
"Added block: 0\n",
"Added block: 1\n",
" Processed block 0\n",
"Added block: 2\n",
" Processed block 1\n",
"Added block: 3\n",
" Processed block 2\n",
"Added block: 4\n",
" Processed block 3\n",
"Added block: 5\n",
" Processed block 4\n",
"Added block: 6\n",
" Processed block 5\n",
"Added block: 7\n",
" Processed block 6\n",
"Added block: 8\n",
" Processed block 7\n",
"Added block: 9\n",
" Processed block 8\n",
"Added block: 10\n",
" Processed block 9\n",
"Added block: 11\n",
" Processed block 10\n",
"Added block: 12\n",
" Processed block 11\n",
"Added block: 13\n",
" Processed block 12\n",
"Added block: 14\n",
" Processed block 13\n",
"Added block: 15\n",
" Processed block 14\n",
"Added block: 16\n",
" Processed block 15\n",
"Added block: 17\n",
" Processed block 16\n",
"Added block: 18\n",
" Processed block 17\n",
"Added block: 19\n",
" Processed block 18\n",
"Added block: 20\n",
"Added block: 21\n",
" Processed block 21\n",
" Processed block 19\n",
" Processed block 20\n"
]
}
],
"source": [
"input_file = 'large_input.csv'\n",
"temp_dir = 'temp_dir/'\n",
"input_chunk_size = 10000000 \n",
"merge_chunk_size = input_chunk_size // 10\n",
"sort_columns = [\"chr1\", \"pos1\", \"chr2\", \"pos2\"] # Columns to sort by\n",
"n_jobs = 4 \n",
"\n",
"\n",
"# Run the parallelized processing\n",
"temp_files = process_in_parallel(input_file, temp_dir, input_chunk_size, merge_chunk_size, sort_columns, n_jobs)\n"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Temporary directory created at: /users/slavska.olesia/scratch/slavska.olesia\n",
"Step 1: Chunking and initial sorting...\n",
" Processed chunk 1\n",
" Processed chunk 2\n",
" Processed chunk 3\n",
" Processed chunk 4\n",
" Processed chunk 5\n",
" Processed chunk 6\n",
" Processed chunk 7\n",
" Processed chunk 8\n",
" Processed chunk 9\n",
" Processed chunk 10\n",
" Processed chunk 11\n",
" Processed chunk 12\n",
" Processed chunk 13\n",
" Processed chunk 14\n",
" Processed chunk 15\n",
" Processed chunk 16\n",
" Processed chunk 17\n",
" Processed chunk 18\n",
" Processed chunk 19\n",
" Processed chunk 20\n",
" Processed chunk 21\n",
" Processed chunk 22\n"
]
}
],
"source": [
"# Not a parallel Version of first step, use as alternative\n",
"\n",
"# Read csv, divide into blocks, sort and save to parquet\n",
"\n",
"temp_dir = 'temp_dir/'\n",
"print(f\"Temporary directory created at: {temp_dir}\")\n",
"temp_files = []\n",
"\n",
"# Step 1: Initial Chunking and Sorting\n",
"print(\"Step 1: Chunking and initial sorting...\")\n",
"reader = pd.read_csv(input_file, chunksize=input_chunk_size, compression='gzip', skiprows=500, delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], header=None)\n",
"block_idx = 0\n",
"\n",
"merge_chunk_size = input_chunk_size // 10\n",
"\n",
"for block in reader:\n",
" # Sort the block\n",
" block_sorted = block.sort_values(by=sort_columns)\n",
" # Write to temp file without 'last_in_chunk'\n",
" temp_file = os.path.join(temp_dir, f\"block_{block_idx}.csv\")\n",
" block_sorted.to_parquet(temp_file, index=False, engine='pyarrow', row_group_size=merge_chunk_size)\n",
" temp_files.append(temp_file)\n",
" block_idx += 1\n",
" print(f\" Processed block {block_idx}\")"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Step 2: Merging sorted chunks...\n"
]
}
],
"source": [
"print(\"Step 2: Merging sorted chunks...\")\n",
"# Initialize readers for each temp file\n",
"# adding the iterator itself to the blocks list, not the actual data\n",
"blocks = []\n",
"\n",
"for temp_file in temp_files:\n",
" block = pq.ParquetFile(temp_file)\n",
" blocks.append(block)\n",
"\n",
"# Initialize current chunks (the current data from each temp file)\n",
"current_chunks = []\n",
"active_files = len(blocks)\n",
"\n",
"\n",
"for block in blocks:\n",
" try:\n",
" current_chunk = block.read_row_group(0).to_pandas()\n",
" #print(f\"Current_chunk: {current_chunk}\")\n",
" # Add 'last_in_chunk' column here\n",
" current_chunk['last_in_chunk'] = False\n",
" current_chunk.iloc[-1, current_chunk.columns.get_loc('last_in_chunk')] = True\n",
" current_chunks.append(current_chunk)\n",
" except StopIteration:\n",
" current_chunks.append(pd.DataFrame())\n",
" active_files -= 1\n",
"\n",
"carryover_data = pd.DataFrame()\n",
"\n",
"max_row_groups = max([block.num_row_groups for block in blocks]) \n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with open(output_file, 'w') as outfile:\n",
" # This flag is used to write the header only once\n",
" header_written = False\n",
" print(\"File opened\")\n",
" for row_group_index in range(1, max_row_groups):\n",
" print(f\"More files to come {active_files}\")\n",
" # Concatenate current chunks and carryover data\n",
" data_frames = [df for df in current_chunks if not df.empty]\n",
" data_frames.append(carryover_data)\n",
" merged_chunk = pd.concat(data_frames, ignore_index=True)\n",
" \n",
" # If merged_chunk is empty, break the loop\n",
" if merged_chunk.empty:\n",
" break\n",
"\n",
" # Sort the merged chunk\n",
" merged_chunk.sort_values(by=sort_columns, inplace=True)\n",
" merged_chunk.reset_index(drop=True, inplace=True)\n",
" \n",
" # Find the position of the first 'last_in_chunk' == True\n",
" last_in_chunk_idx = merged_chunk.index[merged_chunk['last_in_chunk'] == True]\n",
" print(f\"Last in chunk id: {last_in_chunk_idx}\")\n",
" if not last_in_chunk_idx.empty:\n",
" split_idx = last_in_chunk_idx[0] + 1\n",
" # Data to write to output\n",
" data_to_output = merged_chunk.iloc[:split_idx]\n",
" print(f\"Output in file: {data_to_output}\")\n",
" # Data to carry over to next iteration\n",
" carryover_data = merged_chunk.iloc[split_idx:].copy()\n",
" print(f\"Continue to work with: {carryover_data}\")\n",
" carryover_data['last_in_chunk'] = False # Reset 'last_in_chunk' flags\n",
" else:\n",
" # If no 'last_in_chunk' found, carry all data to next iteration\n",
" data_to_output = pd.DataFrame()\n",
" carryover_data = merged_chunk\n",
"\n",
" # Write data_to_output to the output file\n",
"\n",
" if not data_to_output.empty:\n",
" if not header_written:\n",
" print(\"First Output\")\n",
" data_to_output.to_csv(outfile, index=False, header=True)\n",
" header_written = True\n",
" else:\n",
" data_to_output.to_csv(\n",
" outfile, index=False, header=False)\n",
" # Read next chunks from temp files\n",
" \n",
" for i in range(len(blocks)):\n",
" if (not current_chunks[i].empty) and (row_group_index < blocks[i].num_row_groups):\n",
" try:\n",
" # Read the next chunk\n",
" print(f\"There are more chunks to process: {i}\")\n",
" print(\"Before: \")\n",
" print(current_chunks[i])\n",
"\n",
" current_chunks[i] = blocks[i].read_row_group(row_group_index).to_pandas() \n",
" print(\"After: \")\n",
" print(current_chunks[i])\n",
"\n",
"\n",
" # Add 'last_in_chunk' column here\n",
" current_chunks[i]['last_in_chunk'] = False\n",
" current_chunks[i].iloc[-1, current_chunks[i].columns.get_loc('last_in_chunk')] = True\n",
" \n",
" except StopIteration:\n",
" current_chunks[i] = pd.DataFrame()\n",
" active_files -= 1\n",
" break \n",
"\n",
" # Write any remaining carryover data\n",
" if not carryover_data.empty:\n",
" carryover_data.drop(columns=['last_in_chunk'], inplace=True)\n",
" if not header_written:\n",
" carryover_data.to_csv(outfile, index=False, header=True)\n",
" else:\n",
" carryover_data.to_csv(outfile, index=False, header=False)\n",
" \n",
"\n",
"# Clean up temporary files\n",
"print(\"Cleaning up temporary files...\")\n",
"for temp_file in temp_files:\n",
" os.remove(temp_file)\n",
"#os.rmdir(temp_dir)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Parquet temp version\n",
"\n",
"def external_sort(input_file, output_file, sort_columns, input_chunk_size=10000000):\n",
" \"\"\"\n",
" Sorts a large CSV file using external sorting with pandas.\n",
" \n",
" Parameters:\n",
" - input_file: Path to the input CSV file.\n",
" - output_file: Path where the sorted CSV file will be saved.\n",
" - sort_columns: List of column names to sort by.\n",
" - input_chunk_size: Number of rows to read at a time during initial chunking.\n",
" - merge_chunk_size: Number of rows to read from each temp file during merging.\n",
" \"\"\"\n",
" temp_dir = ''\n",
" print(f\"Temporary directory created at: {temp_dir}\")\n",
" temp_files = []\n",
"\n",
" # Step 1: Initial Chunking and Sorting\n",
" print(\"Step 1: Chunking and initial sorting...\")\n",
" reader = pd.read_csv(input_file, chunksize=input_chunk_size, compression='gzip', skiprows=500, delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], header=None)\n",
" chunk_idx = 0\n",
"\n",
" merge_chunk_size = input_chunk_size // 10\n",
"\n",
" for chunk in reader:\n",
" # Sort the chunk\n",
" chunk_sorted = chunk.sort_values(by=sort_columns)\n",
" # Write to temp file without 'last_in_chunk'\n",
" temp_file = os.path.join(temp_dir, f\"chunk_{chunk_idx}.csv\")\n",
" chunk_sorted.to_parquet(temp_file, index=False, engine='pyarrow', row_group_size=merge_chunk_size)\n",
" temp_files.append(temp_file)\n",
" chunk_idx += 1\n",
" print(f\" Processed chunk {chunk_idx}\")\n",
"\n",
" # Step 2: Merging Sorted Chunks\n",
" print(\"Step 2: Merging sorted chunks...\")\n",
" \n",
" # Initialize readers for each temp file\n",
" # adding the iterator itself to the blocks list, not the actual data\n",
" blocks = []\n",
" \n",
" for temp_file in temp_files:\n",
" block = pq.ParquetFile(temp_file)\n",
" blocks.append(block)\n",
"\n",
"\n",
"\n",
" # Initialize current chunks (the current data from each temp file)\n",
" current_chunks = []\n",
" active_files = len(blocks)\n",
"\n",
"\n",
" for block in blocks:\n",
" try:\n",
" current_chunk = block.read_row_group(0).to_pandas()\n",
" #print(f\"Current_chunk: {current_chunk}\")\n",
" # Add 'last_in_chunk' column here\n",
" current_chunk['last_in_chunk'] = False\n",
" current_chunk.iloc[-1, current_chunk.columns.get_loc('last_in_chunk')] = True\n",
" current_chunks.append(current_chunk)\n",
" except StopIteration:\n",
" current_chunks.append(pd.DataFrame())\n",
" active_files -= 1\n",
"\n",
" carryover_data = pd.DataFrame()\n",
"\n",
" max_row_groups = max([block.num_row_groups for block in blocks]) \n",
"\n",
" # Open the output file for writing\n",
" with open(output_file, 'w') as outfile:\n",
" # This flag is used to write the header only once\n",
" header_written = False\n",
" print(\"File opened\")\n",
" for row_group_index in range(1, max_row_groups):\n",
" print(f\"More files to come {active_files}\")\n",
" # Concatenate current chunks and carryover data\n",
" data_frames = [df for df in current_chunks if not df.empty]\n",
" data_frames.append(carryover_data)\n",
" merged_chunk = pd.concat(data_frames, ignore_index=True)\n",
" \n",
" # If merged_chunk is empty, break the loop\n",
" if merged_chunk.empty:\n",
" break\n",
"\n",
" # Sort the merged chunk\n",
" merged_chunk.sort_values(by=sort_columns, inplace=True)\n",
" merged_chunk.reset_index(drop=True, inplace=True)\n",
" \n",
" # Find the position of the first 'last_in_chunk' == True\n",
" last_in_chunk_idx = merged_chunk.index[merged_chunk['last_in_chunk'] == True]\n",
" print(f\"Last in chunk id: {last_in_chunk_idx}\")\n",
" if not last_in_chunk_idx.empty:\n",
" split_idx = last_in_chunk_idx[0] + 1\n",
" # Data to write to output\n",
" data_to_output = merged_chunk.iloc[:split_idx]\n",
" print(f\"Output in file: {data_to_output}\")\n",
" # Data to carry over to next iteration\n",
" carryover_data = merged_chunk.iloc[split_idx:].copy()\n",
" print(f\"Continue to work with: {carryover_data}\")\n",
" carryover_data['last_in_chunk'] = False # Reset 'last_in_chunk' flags\n",
" else:\n",
" # If no 'last_in_chunk' found, carry all data to next iteration\n",
" data_to_output = pd.DataFrame()\n",
" carryover_data = merged_chunk\n",
"\n",
" # Write data_to_output to the output file\n",
"\n",
" if not data_to_output.empty:\n",
" if not header_written:\n",
" print(\"First Output\")\n",
" data_to_output.to_csv(outfile, index=False, header=True)\n",
" header_written = True\n",
" else:\n",
" data_to_output.to_csv(\n",
" outfile, index=False, header=False)\n",
" # Read next chunks from temp files\n",
" \n",
" for i in range(len(blocks)):\n",
" if not current_chunks[i].empty:\n",
" try:\n",
" # Read the next chunk\n",
"\n",
" print(f\"There are more chunks to process: {i}\")\n",
" print(\"Before: \")\n",
" print(current_chunks[i])\n",
"\n",
" current_chunk[i] = blocks[i].read_row_group(row_group_index).to_pandas() \n",
" print(\"After: \")\n",
" print(current_chunks[i])\n",
"\n",
"\n",
" # Add 'last_in_chunk' column here\n",
" current_chunks[i]['last_in_chunk'] = False\n",
" current_chunks[i].iloc[-1, current_chunks[i].columns.get_loc('last_in_chunk')] = True\n",
" \n",
" except StopIteration:\n",
" current_chunks[i] = pd.DataFrame()\n",
" active_files -= 1\n",
" break \n",
"\n",
" # Write any remaining carryover data\n",
" if not carryover_data.empty:\n",
" carryover_data.drop(columns=['last_in_chunk'], inplace=True)\n",
" if not header_written:\n",
" carryover_data.to_csv(outfile, index=False, header=True)\n",
" else:\n",
" carryover_data.to_csv(outfile, index=False, header=False)\n",
" \n",
"\n",
" # Clean up temporary files\n",
" print(\"Cleaning up temporary files...\")\n",
" for temp_file in temp_files:\n",
" os.remove(temp_file)\n",
" os.rmdir(temp_dir)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"\n",
"external_sort('large_input.csv', 'sorted_output.csv', sort_columns=['chr1', 'chr2', 'pos1', 'pos2'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# CSV temp version\n",
"\n",
"def external_sort(input_file, output_file, sort_columns, input_chunk_size=10000000):\n",
" \"\"\"\n",
" Sorts a large CSV file using external sorting with pandas.\n",
" \n",
" Parameters:\n",
" - input_file: Path to the input CSV file.\n",
" - output_file: Path where the sorted CSV file will be saved.\n",
" - sort_columns: List of column names to sort by.\n",
" - input_chunk_size: Number of rows to read at a time during initial chunking.\n",
" - merge_chunk_size: Number of rows to read from each temp file during merging.\n",
" \"\"\"\n",
" #temp_dir = tempfile.mkdtemp()\n",
" temp_dir = '/users/slavska.olesia/scratch/slavska.olesia'\n",
" print(f\"Temporary directory created at: {temp_dir}\")\n",
" temp_files = []\n",
"\n",
" # Step 1: Initial Chunking and Sorting\n",
" print(\"Step 1: Chunking and initial sorting...\")\n",
" reader = pd.read_csv(input_file, chunksize=input_chunk_size, compression='gzip', skiprows=500, delimiter='\\t', names=[\"read_id\", \"chr1\", \"pos1\", \"chr2\", \"pos2\", \"strand1\", \"strand2\", \"pairs_type\", \"score1\", \"score2\"], header=None)\n",
" chunk_idx = 0\n",
"\n",
" merge_chunk_size=input_chunk_size // chunk_idx\n",
"\n",
" for chunk in reader:\n",
" # Sort the chunk\n",
" chunk_sorted = chunk.sort_values(by=sort_columns)\n",
" # Write to temp file without 'last_in_chunk'\n",
" temp_file = os.path.join(temp_dir, f\"chunk_{chunk_idx}.csv\")\n",
" chunk_sorted.to_csv(temp_file, index=False)\n",
" temp_files.append(temp_file)\n",
" chunk_idx += 1\n",
" print(f\" Processed chunk {chunk_idx}\")\n",
"\n",
" # Step 2: Merging Sorted Chunks\n",
" print(\"Step 2: Merging sorted chunks...\")\n",
" \n",
" \n",
" \n",
" \n",
" # Initialize readers for each temp file\n",
"\n",
"\n",
" # adding the iterator itself to the blocks list, not the actual data\n",
" blocks = []\n",
" \n",
" for temp_file in temp_files:\n",
" block = pd.read_csv(temp_file) #returns iterator\n",
" blocks.append(block)\n",
"\n",
" # Initialize current chunks (the current data from each temp file)\n",
" current_chunks = []\n",
" active_files = len(blocks)\n",
" \n",
" for reader in blocks:\n",
" try:\n",
" current_chunk = next(reader)\n",
" print(f\"Current_chunk: {current_chunk}\")\n",
" # Add 'last_in_chunk' column here\n",
" current_chunk['last_in_chunk'] = False\n",
" current_chunk.iloc[-1, current_chunk.columns.get_loc('last_in_chunk')] = True\n",
" current_chunks.append(current_chunk)\n",
" except StopIteration:\n",
" current_chunks.append(pd.DataFrame())\n",
" active_files -= 1\n",
"\n",
" carryover_data = pd.DataFrame()\n",
"\n",
" # Open the output file for writing\n",
" with open(output_file, 'w') as outfile:\n",
" # This flag is used to write the header only once\n",
" header_written = False\n",
" print(\"File opened\")\n",
" while active_files > 0:\n",
" print(f\"More files to come {active_files}\")\n",
" # Concatenate current chunks and carryover data\n",
" data_frames = [df for df in current_chunks if not df.empty]\n",
" data_frames.append(carryover_data)\n",
" merged_chunk = pd.concat(data_frames, ignore_index=True)\n",
" \n",
" # If merged_chunk is empty, break the loop\n",
" if merged_chunk.empty:\n",
" break\n",
" # Sort the merged chunk\n",
" merged_chunk.sort_values(by=sort_columns, inplace=True)\n",
" merged_chunk.reset_index(drop=True, inplace=True)\n",
"\n",
" \n",
" \n",
" # Find the position of the first 'last_in_chunk' == True\n",
" last_in_chunk_idx = merged_chunk.index[merged_chunk['last_in_chunk'] == True]\n",
" print(f\"Last in chunk id: {last_in_chunk_idx}\")\n",
" if not last_in_chunk_idx.empty:\n",
" split_idx = last_in_chunk_idx[0] + 1\n",
" # Data to write to output\n",
" data_to_output = merged_chunk.iloc[:split_idx]\n",
" print(f\"Output in file: {data_to_output}\")\n",
" # Data to carry over to next iteration\n",
" carryover_data = merged_chunk.iloc[split_idx:].copy()\n",
" print(f\"Continue to work with: {carryover_data}\")\n",
" carryover_data['last_in_chunk'] = False # Reset 'last_in_chunk' flags\n",
" else:\n",
" # If no 'last_in_chunk' found, carry all data to next iteration\n",
" data_to_output = pd.DataFrame()\n",
" carryover_data = merged_chunk\n",
"\n",
" # Write data_to_output to the output file\n",
"\n",
" if not data_to_output.empty:\n",
" if not header_written:\n",
" print(\"First Output\")\n",
" data_to_output.to_csv(outfile, index=False, header=True)\n",
" header_written = True\n",
" else:\n",
" data_to_output.to_csv(\n",
" outfile, index=False, header=False)\n",
" # Read next chunks from temp files\n",
" \n",
" for i in range(len(blocks)):\n",
" if not current_chunks[i].empty:\n",
" try:\n",
" # Read the next chunk\n",
" print(f\"There are more chunks to process: {i}\")\n",
" print(\"Before: \")\n",
" print(current_chunks[i])\n",
"\n",
" current_chunks[i] = next(blocks[i])\n",
" \n",
" print(\"After: \")\n",
" print(current_chunks[i])\n",
"\n",
"\n",
" # Add 'last_in_chunk' column here\n",
" current_chunks[i]['last_in_chunk'] = False\n",
" current_chunks[i].iloc[-1, current_chunks[i].columns.get_loc('last_in_chunk')] = True\n",
" \n",
" except StopIteration:\n",
" current_chunks[i] = pd.DataFrame()\n",
" active_files -= 1\n",
" \n",
"\n",
" # Write any remaining carryover data\n",
" if not carryover_data.empty:\n",
" carryover_data.drop(columns=['last_in_chunk'], inplace=True)\n",
" if not header_written:\n",
" carryover_data.to_csv(outfile, index=False, header=True)\n",
" else:\n",
" carryover_data.to_csv(outfile, index=False, header=False)\n",
" \n",
"\n",
" # Clean up temporary files\n",
" print(\"Cleaning up temporary files...\")\n",
" for temp_file in temp_files:\n",
" os.remove(temp_file)\n",
" os.rmdir(temp_dir)\n",
"\n",
"# Example usage:\n",
"# external_sort('large_input.csv', 'sorted_output.csv', sort_columns=['column_to_sort_by'])"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Generate sample data\n",
"\n",
"num_rows = 1000000 # Adjust the size as needed for testing\n",
"data = {\n",
" 'id': np.arange(num_rows),\n",
" 'value': np.random.randint(0, 100000, size=num_rows)\n",
"}\n",
"df = pd.DataFrame(data)\n",
"df.to_csv('large_input.csv', index=False)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment