|
#!/usr/bin/env python3 |
|
from cyvcf2 import VCF |
|
import numpy as np |
|
import pandas as pd |
|
import pyarrow as pa |
|
import pyarrow.parquet as pq |
|
import pyarrow.dataset as ds |
|
import sys |
|
import time |
|
import argparse |
|
|
|
def prep(inp): |
|
t1 = time.time() |
|
batch_size = 100000 |
|
tot = 0 |
|
stor_idx = [] |
|
stor_chr = [] |
|
stor_pos = [] |
|
stor_ref = [] |
|
stor_alt = [] |
|
accm_gt = [] |
|
for idx, v in enumerate(VCF(inp, gts012=True, strict_gt=True, threads=2)): |
|
|
|
if idx % 1000 == 0: |
|
print(f'LOG> part .. {idx}') |
|
|
|
if idx % batch_size == 0 and idx != 0: |
|
np_gt = np.array(accm_gt) |
|
np_idx = np.array(stor_idx) |
|
np_chr = np.array(stor_chr) |
|
np_pos = np.array(stor_pos) |
|
np_ref = np.array(stor_ref) |
|
np_alt = np.array(stor_alt) |
|
|
|
df = pd.DataFrame({ |
|
'idx': pd.Series(np_idx, dtype='u4'), |
|
'chr': pd.Series(np_chr, dtype='category'), |
|
'pos': pd.Series(np_pos, dtype='u4'), |
|
'ref': pd.Series(np_ref, dtype='category'), |
|
'alt': pd.Series(np_alt, dtype='category') |
|
}) |
|
|
|
# create df of GT for all samples |
|
samples = [f's{s+1}' for s in range(np_gt.shape[1])] |
|
df2 = pd.DataFrame(np_gt[:,:], columns=samples) |
|
df2 = df2.astype('u1') |
|
|
|
df3 = pd.concat([df, df2], axis=1) |
|
write_out(df3, batch_size = batch_size) |
|
del(df3) |
|
|
|
# clear variables |
|
stor_idx.clear() |
|
stor_chr.clear() |
|
stor_pos.clear() |
|
stor_ref.clear() |
|
stor_alt.clear() |
|
accm_gt.clear() |
|
|
|
print(f"LOG> process .. {idx}") |
|
|
|
cgt = np.array(v.gt_types) |
|
stor_idx.append(idx) |
|
stor_chr.append(v.CHROM) |
|
stor_pos.append(v.POS) |
|
stor_ref.append(v.REF[0]) |
|
stor_alt.append(v.ALT[0]) |
|
accm_gt.append(cgt) |
|
|
|
tot += 1 |
|
|
|
if len(np_gt) > 0: |
|
np_gt = np.array(accm_gt) |
|
np_idx = np.array(stor_idx) |
|
np_chr = np.array(stor_chr) |
|
np_pos = np.array(stor_pos) |
|
np_ref = np.array(stor_ref) |
|
np_alt = np.array(stor_alt) |
|
|
|
df = pd.DataFrame({ |
|
'idx': pd.Series(np_idx, dtype='u4'), |
|
'chr': pd.Series(np_chr, dtype='category'), |
|
'pos': pd.Series(np_pos, dtype='u4'), |
|
'ref': pd.Series(np_ref, dtype='category'), |
|
'alt': pd.Series(np_alt, dtype='category') |
|
}) |
|
|
|
samples = [f's{s+1}' for s in range(np_gt.shape[1])] |
|
df2 = pd.DataFrame(np_gt[:,:], columns=samples) |
|
df2 = df2.astype('u1') |
|
|
|
df3 = pd.concat([df, df2], axis=1) |
|
write_out(df3, batch_size = batch_size) |
|
del(df3) |
|
|
|
def write_out(df, **kwargs): |
|
batch_size = 100000 |
|
for key, value in kwargs.items(): |
|
if key == 'batch_size': |
|
batch_size = int(value) |
|
|
|
df = df.assign(bucket=lambda x: x.idx // batch_size) |
|
table = pa.Table.from_pandas(df, preserve_index=True) |
|
pq.write_to_dataset(table, 'out', partition_cols=['bucket']) |
|
|
|
def read_out(dsD, **kwargs): |
|
batch_size = 500 |
|
for key, value in kwargs.items(): |
|
if key == 'batch_size': |
|
batch_size = int(value) |
|
|
|
ds1 = ds.dataset(dsD, format="parquet") |
|
for record_batch in ds1.to_batches(batch_size=batch_size): |
|
c = record_batch.to_pandas() |
|
print(c) |
|
|
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("-c", dest="create", help="create parquet") |
|
parser.add_argument("-q", dest="query", help="query parquet") |
|
args = parser.parse_args() |
|
|
|
if args.create is not None: |
|
print("LOG> create mode") |
|
prep(args.create) |
|
elif args.query is not None: |
|
print("LOG> query mode") |
|
read_out(args.query, batch_size=100000) |