Skip to content

Instantly share code, notes, and snippets.

@s4553711
Created January 12, 2025 10:19
Show Gist options
  • Save s4553711/01f718ce0b003b9e7382397db7a05a1e to your computer and use it in GitHub Desktop.
Save s4553711/01f718ce0b003b9e7382397db7a05a1e to your computer and use it in GitHub Desktop.
An example to trnasform 1kg vcf to parquet and query
#!/usr/bin/env python3
from cyvcf2 import VCF
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import sys
import time
import argparse
def prep(inp):
t1 = time.time()
batch_size = 100000
tot = 0
stor_idx = []
stor_chr = []
stor_pos = []
stor_ref = []
stor_alt = []
accm_gt = []
for idx, v in enumerate(VCF(inp, gts012=True, strict_gt=True, threads=2)):
if idx % 1000 == 0:
print(f'LOG> part .. {idx}')
if idx % batch_size == 0 and idx != 0:
np_gt = np.array(accm_gt)
np_idx = np.array(stor_idx)
np_chr = np.array(stor_chr)
np_pos = np.array(stor_pos)
np_ref = np.array(stor_ref)
np_alt = np.array(stor_alt)
df = pd.DataFrame({
'idx': pd.Series(np_idx, dtype='u4'),
'chr': pd.Series(np_chr, dtype='category'),
'pos': pd.Series(np_pos, dtype='u4'),
'ref': pd.Series(np_ref, dtype='category'),
'alt': pd.Series(np_alt, dtype='category')
})
# create df of GT for all samples
samples = [f's{s+1}' for s in range(np_gt.shape[1])]
df2 = pd.DataFrame(np_gt[:,:], columns=samples)
df2 = df2.astype('u1')
df3 = pd.concat([df, df2], axis=1)
write_out(df3, batch_size = batch_size)
del(df3)
# clear variables
stor_idx.clear()
stor_chr.clear()
stor_pos.clear()
stor_ref.clear()
stor_alt.clear()
accm_gt.clear()
print(f"LOG> process .. {idx}")
cgt = np.array(v.gt_types)
stor_idx.append(idx)
stor_chr.append(v.CHROM)
stor_pos.append(v.POS)
stor_ref.append(v.REF[0])
stor_alt.append(v.ALT[0])
accm_gt.append(cgt)
tot += 1
if len(np_gt) > 0:
np_gt = np.array(accm_gt)
np_idx = np.array(stor_idx)
np_chr = np.array(stor_chr)
np_pos = np.array(stor_pos)
np_ref = np.array(stor_ref)
np_alt = np.array(stor_alt)
df = pd.DataFrame({
'idx': pd.Series(np_idx, dtype='u4'),
'chr': pd.Series(np_chr, dtype='category'),
'pos': pd.Series(np_pos, dtype='u4'),
'ref': pd.Series(np_ref, dtype='category'),
'alt': pd.Series(np_alt, dtype='category')
})
samples = [f's{s+1}' for s in range(np_gt.shape[1])]
df2 = pd.DataFrame(np_gt[:,:], columns=samples)
df2 = df2.astype('u1')
df3 = pd.concat([df, df2], axis=1)
write_out(df3, batch_size = batch_size)
del(df3)
def write_out(df, **kwargs):
batch_size = 100000
for key, value in kwargs.items():
if key == 'batch_size':
batch_size = int(value)
df = df.assign(bucket=lambda x: x.idx // batch_size)
table = pa.Table.from_pandas(df, preserve_index=True)
pq.write_to_dataset(table, 'out', partition_cols=['bucket'])
def read_out(dsD, **kwargs):
batch_size = 500
for key, value in kwargs.items():
if key == 'batch_size':
batch_size = int(value)
ds1 = ds.dataset(dsD, format="parquet")
for record_batch in ds1.to_batches(batch_size=batch_size):
c = record_batch.to_pandas()
print(c)
parser = argparse.ArgumentParser()
parser.add_argument("-c", dest="create", help="create parquet")
parser.add_argument("-q", dest="query", help="query parquet")
args = parser.parse_args()
if args.create is not None:
print("LOG> create mode")
prep(args.create)
elif args.query is not None:
print("LOG> query mode")
read_out(args.query, batch_size=100000)

We use cyvcf2 to parse vcf and write out some columns from vcf to parquet datatset. The parquet dataset has multiple chunk which is partitioned by data row index, but you can also use other column to partition like pos or chromosome.

The 1kg vcf used here can downloads from the following s3 storeage

s3://1000genomes/1000G_2504_high_coverage/working/20201028_3202_raw_GT_with_annot/

Before the following example, it's recommended to create a python venv to isolate package environment

$ python3 -m venv .venv
$ source .venv/bin/active
$ pip3 install numpy pandas pyarrow cyvcf2 # if there are other missing package, add it to this command.

The first step is to create parquet dataset out to place all chunked data. This step will take a while to finish.

python3 1kg.py -c 20201028_CCDG_14151_B01_GRM_WGS_2020-08-05_chr22.recalibrated_variants.vcf.gz

It's also possible to streaming from aws s3 command like this

$ aws s3 cp --no-sign-request s3://1000genomes/1000G_2504_high_coverage/working/20201028_3202_raw_GT_with_annot/20201028_CCDG_14151_B01_GRM_WGS_2020-08-05_chr22.recalibrated_variants.vcf.gz | python3 1kg.py -c -

Then we can perform a simple query like this

python3 1kg.py -q out
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment