Skip to content

Instantly share code, notes, and snippets.

@lomereiter
Created January 28, 2017 14:51
Show Gist options
  • Save lomereiter/61c47eba5b718151de8c162b3cde5cbd to your computer and use it in GitHub Desktop.
Save lomereiter/61c47eba5b718151de8c162b3cde5cbd to your computer and use it in GitHub Desktop.
Pythonic flagstat for ADAM Parquet files
# environment: python3; conda install -c conda-forge fastparquet=0.0.4post1 joblib
# usage: python flagstat.py <dataset.adam>
from collections import Counter
import sys
import fastparquet
from fastparquet.core import read_row_group_file
from fastparquet.schema import SchemaHelper
from joblib import Parallel, delayed
FLAGSTAT_COLUMNS = ['readMapped', 'mateMapped', 'readPaired', 'contigName', 'mateContigName',
'primaryAlignment', 'duplicateRead', 'readInFragment', 'properPair',
'mapq', 'failedVendorQualityChecks', 'supplementaryAlignment']
def count(fn, schema, dtypes, rg):
df, assign = fastparquet.api._pre_allocate(rg.num_rows, FLAGSTAT_COLUMNS, None, None, {}, dtypes)
read_row_group_file(fn, rg, FLAGSTAT_COLUMNS, None, SchemaHelper(schema), {},
open=open, selfmade=False, index=None, assign=assign)
primary = df['primaryAlignment'] & ~df['supplementaryAlignment']
paired = df['readPaired'] & primary
selfAndMateMapped = paired & df['readMapped'] & df['mateMapped']
mateMappedToDiffChrom = selfAndMateMapped & (df['contigName'] != df['mateContigName'])
return Counter({
'total': len(df),
'duplicatesPrimary': (df['duplicateRead'] & df['primaryAlignment']).sum(),
'duplicatesSecondary': (df['duplicateRead'] & ~df['primaryAlignment']).sum(),
'mapped': df['readMapped'].sum(),
'pairedInSequencing': paired.sum(),
'read1': (paired & (df['readInFragment'] == 0)).sum(),
'read2': (paired & (df['readInFragment'] == 1)).sum(),
'properlyPaired': (paired & df['properPair']).sum(),
'withSelfAndMateMapped': selfAndMateMapped.sum(),
'singleton': (paired & df['readMapped'] & ~df['mateMapped']).sum(),
'withMateMappedToDiffChromosome': mateMappedToDiffChrom.sum(),
'withMateMappedToDiffChromosomeMapQ5': (mateMappedToDiffChrom & (df['mapq'] >= 5)).sum(),
'failedQuality': df['failedVendorQualityChecks'].sum()
})
def flagstatCountsParallel(path):
f = fastparquet.ParquetFile(path)
args = [(f.row_group_filename(rg), f.schema, f.dtypes, rg) for rg in f.row_groups]
return Parallel(n_jobs=8, backend='multiprocessing')(
delayed(count)(*arg) for arg in args
)
counts = sum(flagstatCountsParallel(sys.argv[1]), Counter())
for key in counts:
print(key, ':', counts[key])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment