Created
April 13, 2026 20:50
-
-
Save leipzig/cb4bffc11f237a43193089e75db2dba5 to your computer and use it in GitHub Desktop.
query_gnomad
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def query_gnomad( | |
| regions: List[str], | |
| region_partition_count: Optional[int] = None, | |
| region_partition_id: Optional[int] = None, | |
| return_zeros_for_empty_gnomad=False, | |
| ) -> pyarrow.Table: | |
| """ | |
| This function queries the Gnomad data set, with the regions that are being passed to it | |
| Arguments: | |
| @param regions: list | |
| @param region_partition_count: int | |
| @param region_partition_id: int | |
| Returns: | |
| A pandas.DataFrame | |
| """ | |
| import re | |
| import pandas | |
| import pyarrow | |
| import tiledbvcf | |
| GNOMAD_URI = kwargs["gnomad_uri"] | |
| print(f"Gnomad URI: {GNOMAD_URI}") | |
| if region_partition_count is None: | |
| cfg = tiledbvcf.ReadConfig() | |
| else: | |
| cfg = tiledbvcf.ReadConfig( | |
| region_partition=(region_partition_id, region_partition_count), | |
| ) | |
| gnomad_ds = tiledbvcf.Dataset(GNOMAD_URI, mode="r", cfg=cfg, stats=True) | |
| clean_regions = [] | |
| for region in regions: | |
| regexgroups = re.match("(chr[0-9XYMT]+):([0-9]+)(-([0-9]+))?", region.replace(" ", "")) | |
| if regexgroups is None: | |
| print("Invalid genomic coordinate: {genomic_coodinate}") | |
| return None | |
| regexchr = regexgroups.group(1) | |
| regexstart = int(regexgroups.group(2)) | |
| if regexgroups.group(3) is None: | |
| regexend = regexstart # point coordinate | |
| else: | |
| regexend = int(regexgroups.group(4)) # lose the dash | |
| clean_regions += [f"{regexchr}:{regexstart}-{regexend}"] | |
| gnomad_results_reads = [] | |
| print(f"clean regions: {clean_regions}") | |
| gnomad_results_reads.append( | |
| gnomad_ds.read_arrow( | |
| attrs=["contig", "info_AF", "pos_start", "alleles"], regions=clean_regions | |
| ) | |
| ) | |
| while not gnomad_ds.read_completed(): | |
| gnomad_results_reads.append(gnomad_ds.continue_read_arrow()) | |
| gnomad_results = pyarrow.concat_tables(gnomad_results_reads).to_pandas() | |
| print(f"gnomad results: ") | |
| # print all columns in pandas | |
| with pandas.option_context("display.max_rows", None, "display.max_columns", None): | |
| print(gnomad_results.head()) | |
| # this might happen with 1bp regions | |
| # test if gnomad_results has any rows | |
| if gnomad_results.empty: | |
| # artifically fill this in as all 0 gnomad allele frequencies | |
| gnomad_results = pandas.DataFrame(columns=["contig", "info_AF", "pos_start", "alleles"]) | |
| if return_zeros_for_empty_gnomad: | |
| gnomad_results.append( | |
| { | |
| "contig": clean_regions[0].split(":")[0], | |
| "info_AF": [0], | |
| "pos_start": clean_regions[0].split(":")[1].split("-")[0], | |
| "alleles": ["N"], | |
| }, | |
| ignore_index=True, | |
| ) | |
| # converting to float strips the array | |
| gnomad_results["gnomad_af"] = gnomad_results["info_AF"].astype(numpy.float32) | |
| return pyarrow.Table.from_pandas(gnomad_results, preserve_index=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment