|
import time |
|
import numpy as np |
|
import pandas as pd |
|
from joblib import Parallel, delayed |
|
|
|
def read_csv_joblib(csv_path, row_chunksize, column_chunksize=None, usecols=None, n_rows=None, n_columns=None, n_jobs=None, **kwargs): |
|
|
|
if usecols is not None and column_chunksize is not None: |
|
raise ValueError("`usecols` and `column_chunksize` are mutually exclusive") |
|
|
|
print("* Reading dimensions") |
|
start = time.time() |
|
|
|
if n_rows is None: |
|
n_rows = pd.read_csv(file_path, sep="\s+", usecols=[0]).shape[0] |
|
if n_columns is None: |
|
df_columns = pd.read_csv(file_path, sep="\s+", nrows=1) |
|
n_columns = df_columns.shape[1] |
|
# headers = df_columns.columns |
|
# dtypes = df_columns.dtypes |
|
|
|
if row_chunksize is None: |
|
row_chunksize = n_rows |
|
if column_chunksize is None: |
|
column_chunksize = n_columns |
|
|
|
print(f" * duration: {time.time() - start:.02f}sec") |
|
|
|
print("* Deriving intervals") |
|
start = time.time() |
|
row_steps = np.arange(0, n_rows, row_chunksize) |
|
row_steps = np.append(row_steps, n_rows) |
|
column_steps = np.arange(0, n_columns, column_chunksize) |
|
column_steps = np.append(column_steps, n_columns) |
|
print(f" * row steps: {len(row_steps) - 1}, column steps: {len(column_steps) - 1}") |
|
print(f" * duration: {time.time() - start:.02f}sec") |
|
|
|
print("* Loading dataframes") |
|
start = time.time() |
|
dfs = Parallel(n_jobs=n_jobs)( |
|
delayed(pd.read_csv)( |
|
file_path, |
|
usecols=np.arange(column_steps[c],column_steps[c+1]), |
|
skiprows=None if row_steps[r] == 0 else np.arange(1, row_steps[r]), |
|
nrows=row_steps[r + 1] - row_steps[r], |
|
**kwargs) |
|
for r in range(len(row_steps) - 1) |
|
for c in range(len(column_steps) - 1)) |
|
print(f" * loaded dataframes: {len(dfs)}") |
|
print(f" * duration: {time.time() - start:.02f}sec") |
|
|
|
# concat rows |
|
print("* Concatenating dataframe columns") |
|
start = time.time() |
|
len_row = len(column_steps) - 1 |
|
rows = [] |
|
for r in range(len(row_steps) - 1): |
|
row_chunks = dfs[r * len_row : ((r + 1) * len_row)] |
|
if len(row_chunks) > 0: |
|
row = pd.concat(row_chunks, axis=1) |
|
rows.append(row) |
|
print(f" * duration: {time.time() - start:.02f}sec") |
|
|
|
print("* Concatenating dataframe rows") |
|
start = time.time() |
|
df = pd.concat(rows).reset_index(drop=True) |
|
print(f" * duration: {time.time() - start:.02f}sec") |
|
|
|
return df |