Skip to content

Instantly share code, notes, and snippets.

@leipzig
Created April 13, 2026 20:50
Show Gist options
  • Select an option

  • Save leipzig/cb4bffc11f237a43193089e75db2dba5 to your computer and use it in GitHub Desktop.

Select an option

Save leipzig/cb4bffc11f237a43193089e75db2dba5 to your computer and use it in GitHub Desktop.
query_gnomad
def query_gnomad(
regions: List[str],
region_partition_count: Optional[int] = None,
region_partition_id: Optional[int] = None,
return_zeros_for_empty_gnomad=False,
) -> pyarrow.Table:
"""
This function queries the Gnomad data set, with the regions that are being passed to it
Arguments:
@param regions: list
@param region_partition_count: int
@param region_partition_id: int
Returns:
A pandas.DataFrame
"""
import re
import pandas
import pyarrow
import tiledbvcf
GNOMAD_URI = kwargs["gnomad_uri"]
print(f"Gnomad URI: {GNOMAD_URI}")
if region_partition_count is None:
cfg = tiledbvcf.ReadConfig()
else:
cfg = tiledbvcf.ReadConfig(
region_partition=(region_partition_id, region_partition_count),
)
gnomad_ds = tiledbvcf.Dataset(GNOMAD_URI, mode="r", cfg=cfg, stats=True)
clean_regions = []
for region in regions:
regexgroups = re.match("(chr[0-9XYMT]+):([0-9]+)(-([0-9]+))?", region.replace(" ", ""))
if regexgroups is None:
print("Invalid genomic coordinate: {genomic_coodinate}")
return None
regexchr = regexgroups.group(1)
regexstart = int(regexgroups.group(2))
if regexgroups.group(3) is None:
regexend = regexstart # point coordinate
else:
regexend = int(regexgroups.group(4)) # lose the dash
clean_regions += [f"{regexchr}:{regexstart}-{regexend}"]
gnomad_results_reads = []
print(f"clean regions: {clean_regions}")
gnomad_results_reads.append(
gnomad_ds.read_arrow(
attrs=["contig", "info_AF", "pos_start", "alleles"], regions=clean_regions
)
)
while not gnomad_ds.read_completed():
gnomad_results_reads.append(gnomad_ds.continue_read_arrow())
gnomad_results = pyarrow.concat_tables(gnomad_results_reads).to_pandas()
print(f"gnomad results: ")
# print all columns in pandas
with pandas.option_context("display.max_rows", None, "display.max_columns", None):
print(gnomad_results.head())
# this might happen with 1bp regions
# test if gnomad_results has any rows
if gnomad_results.empty:
# artifically fill this in as all 0 gnomad allele frequencies
gnomad_results = pandas.DataFrame(columns=["contig", "info_AF", "pos_start", "alleles"])
if return_zeros_for_empty_gnomad:
gnomad_results.append(
{
"contig": clean_regions[0].split(":")[0],
"info_AF": [0],
"pos_start": clean_regions[0].split(":")[1].split("-")[0],
"alleles": ["N"],
},
ignore_index=True,
)
# converting to float strips the array
gnomad_results["gnomad_af"] = gnomad_results["info_AF"].astype(numpy.float32)
return pyarrow.Table.from_pandas(gnomad_results, preserve_index=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment