Created
June 3, 2025 17:10
-
-
Save cannin/ac6db1889b6b64953166de64eb8a554e to your computer and use it in GitHub Desktop.
Fetching PMC Content and Extracting Text
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from diskcache import Cache | |
import os | |
# Ensure cache directory exists | |
os.makedirs('cache', exist_ok=True) | |
cache = Cache('cache') | |
def check_pmcid(pmid, email="[email protected]"): | |
"""Check if a given PMID has a corresponding PMCID using NCBI's ID Converter API. | |
Queries the NCBI ID Converter API to find if a PubMed ID has a corresponding | |
PubMed Central ID (PMCID). | |
Parameters | |
---------- | |
pmid : str | |
The PubMed ID to check. | |
Returns | |
------- | |
str or None | |
The corresponding PMCID if available, or None if no PMCID exists. | |
""" | |
key = f"pmcid_{pmid}" | |
if key in cache: | |
return cache[key] | |
base_url = "https://www.ncbi.nlm.nih.gov/pmc/utils/idconv/v1.0/" | |
params = { | |
"ids": pmid, | |
"format": "json", | |
"tool": "return_pmcid", | |
"email": email | |
} | |
try: | |
response = requests.get(base_url, params=params) | |
response.raise_for_status() | |
data = response.json() | |
if "records" in data and data["records"]: | |
record = data["records"][0] | |
if "pmcid" in record: | |
cache[key] = record['pmcid'] | |
return record['pmcid'] | |
else: | |
cache[key] = None | |
return None | |
else: | |
cache[key] = None | |
return None | |
except requests.exceptions.RequestException: | |
# Log the error if needed, but don't expose the exception | |
print(f"Error looking up PMCID for {pmid}") | |
cache[key] = None | |
return None | |
# Example usage: | |
if __name__ == "__main__": | |
pmid_input = input("Enter a PMID: ").strip() | |
result = check_pmcid(pmid_input) | |
print(result or "No PMCID found") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import shutil | |
import logging | |
import boto3 | |
from botocore import UNSIGNED | |
from botocore.config import Config | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def download_pmc_s3(pmc_id, file_type='xml', output_dir='pmc', cache_dir='pmc', bucket_name='pmc-oa-opendata'): | |
"""Download PMC files from AWS S3 bucket. | |
Downloads PMC files from the specified AWS S3 bucket. The function will try to | |
download from different paths in the bucket if the initial attempt fails. | |
If the file exists in the cache directory, it will be copied to the output directory. | |
Parameters | |
---------- | |
pmc_id : str | |
PubMed Central ID. | |
file_type : str | |
File type (xml or txt). Default is 'xml'. | |
output_dir : str | |
Output directory. Default is 'pmc'. | |
cache_dir : str | |
Cache directory. Default is 'pmc'. | |
bucket_name : str | |
S3 bucket name. Default is 'pmc-oa-opendata'. | |
Returns | |
------- | |
None | |
The function does not return a value. | |
Examples | |
-------- | |
>>> download_pmc_s3('PMC3898398') | |
""" | |
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED)) | |
os.makedirs(output_dir, exist_ok=True) | |
os.makedirs(cache_dir, exist_ok=True) | |
output_path = os.path.join(output_dir, f'{pmc_id}.{file_type}') | |
cache_path = os.path.join(cache_dir, f'{pmc_id}.{file_type}') | |
if not os.path.exists(output_path): | |
if os.path.exists(cache_path): | |
shutil.copy(cache_path, output_path) | |
else: | |
logger.info(f"Attempting to download {pmc_id}.{file_type} to {output_path}") | |
try: | |
file_key = f'oa_comm/{file_type}/all/{pmc_id}.{file_type}' | |
s3.download_file(bucket_name, file_key, cache_path) | |
shutil.copy(cache_path, output_path) | |
except Exception as e: | |
try: | |
file_key = f'oa_noncomm/{file_type}/all/{pmc_id}.{file_type}' | |
s3.download_file(bucket_name, file_key, cache_path) | |
shutil.copy(cache_path, output_path) | |
except Exception as e: | |
try: | |
file_key = f'author_manuscript/{file_type}/all/{pmc_id}.{file_type}' | |
s3.download_file(bucket_name, file_key, cache_path) | |
shutil.copy(cache_path, output_path) | |
except Exception as e: | |
if not os.path.exists(cache_path): | |
logger.error(e) | |
if os.path.exists(cache_path): | |
logger.info(f"DONE: File: {output_path}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import, print_function, unicode_literals | |
from builtins import dict, str | |
import re | |
import logging | |
import os.path | |
import requests | |
from lxml import etree | |
from lxml.etree import QName | |
import xml.etree.ElementTree as ET | |
# from indra.literature import pubmed_client | |
# from indra.util import UnicodeXMLTreeBuilder as UTB | |
__file__ = "" | |
# Python 2 | |
try: | |
basestring | |
# Python 3 | |
except: | |
basestring = str | |
logger = logging.getLogger(__name__) | |
pmc_url = 'https://www.ncbi.nlm.nih.gov/pmc/oai/oai.cgi' | |
pmid_convert_url = 'https://www.ncbi.nlm.nih.gov/pmc/utils/idconv/v1.0/' | |
# Paths to resource files | |
pmids_fulltext_path = os.path.join(os.path.dirname(__file__), | |
'pmids_fulltext.txt') | |
pmids_oa_xml_path = os.path.join(os.path.dirname(__file__), | |
'pmids_oa_xml.txt') | |
pmids_oa_txt_path = os.path.join(os.path.dirname(__file__), | |
'pmids_oa_txt.txt') | |
pmids_auth_xml_path = os.path.join(os.path.dirname(__file__), | |
'pmids_auth_xml.txt') | |
# Define global dict containing lists of PMIDs among mineable PMCs | |
# to be lazily initialized | |
pmids_fulltext_dict = {} | |
# def id_lookup(paper_id, idtype=None): | |
# """Return PMID, DOI and PMCID based on an input ID. | |
# This function takes a Pubmed ID, Pubmed Central ID, or DOI | |
# and use the Pubmed ID mapping service and looks up all other IDs from one | |
# of these. The IDs are returned in a dictionary. | |
# Parameters | |
# ---------- | |
# paper_id : str | |
# A PubMed ID, PubMed Central ID, or DOI. | |
# idtype : Optional[str] | |
# The type of the input ID. If not given, the function will try to | |
# determine the type from the input ID. If given, it must be one of | |
# 'pmid', 'pmcid', or 'doi'. | |
# Returns | |
# ------- | |
# dict | |
# A dictionary with keys 'pmid', 'pmcid', and 'doi' containing the | |
# corresponding IDs, or an empty dict if lookup fails. | |
# """ | |
# if idtype is not None and idtype not in ('pmid', 'pmcid', 'doi'): | |
# raise ValueError("Invalid idtype %s; must be 'pmid', 'pmcid', " | |
# "or 'doi'." % idtype) | |
# if paper_id.upper().startswith('PMC'): | |
# idtype = 'pmcid' | |
# # Strip off any prefix | |
# if paper_id.upper().startswith('PMID'): | |
# paper_id = paper_id[4:] | |
# elif paper_id.upper().startswith('DOI'): | |
# paper_id = paper_id[3:] | |
# data = {'ids': paper_id} | |
# if idtype is not None: | |
# data['idtype'] = idtype | |
# try: | |
# tree = pubmed_client.send_request(pmid_convert_url, data) | |
# except Exception as e: | |
# logger.error('Error looking up PMID in PMC: %s' % e) | |
# return {} | |
# if tree is None: | |
# return {} | |
# record = tree.find('record') | |
# if record is None: | |
# return {} | |
# doi = record.attrib.get('doi') | |
# pmid = record.attrib.get('pmid') | |
# pmcid = record.attrib.get('pmcid') | |
# ids = {'doi': doi, | |
# 'pmid': pmid, | |
# 'pmcid': pmcid} | |
# return ids | |
# def get_ids(search_term, retmax=1000): | |
# return pubmed_client.get_ids(search_term, retmax=retmax, db='pmc') | |
def get_xml(pmc_id): | |
"""Get XML for the article corresponding to a PMC ID. | |
Retrieves the XML content from PubMed Central for the specified PMC ID | |
by making a request to the PMC OAI service. | |
Parameters | |
---------- | |
pmc_id : str | |
PubMed Central ID. | |
Returns | |
------- | |
str | |
XML content as a string, or None if retrieval fails. | |
""" | |
if pmc_id.upper().startswith('PMC'): | |
pmc_id = pmc_id[3:] | |
# Request params | |
params = {} | |
params['verb'] = 'GetRecord' | |
params['identifier'] = 'oai:pubmedcentral.nih.gov:%s' % pmc_id | |
params['metadataPrefix'] = 'pmc' | |
# Submit the request | |
res = requests.get(pmc_url, params) | |
if not res.status_code == 200: | |
logger.warning("Couldn't download %s" % pmc_id) | |
return None | |
# Read the bytestream | |
xml_bytes = res.content | |
# Check for any XML errors; xml_str should still be bytes | |
#tree = ET.XML(xml_bytes, parser=UTB()) | |
tree = ET.XML(xml_bytes) | |
xmlns = "http://www.openarchives.org/OAI/2.0/" | |
err_tag = tree.find('{%s}error' % xmlns) | |
if err_tag is not None: | |
err_code = err_tag.attrib['code'] | |
err_text = err_tag.text | |
logger.warning('PMC client returned with error %s: %s' | |
% (err_code, err_text)) | |
return None | |
# If no error, return the XML as a unicode string | |
else: | |
return xml_bytes.decode('utf-8') | |
def get_xml_from_file(file): | |
"""Get XML string from a local XML file. | |
Parses the XML file and returns its content as a string. | |
Parameters | |
---------- | |
file : str | |
Path to the XML file. | |
Returns | |
------- | |
str | |
The XML content as a string. | |
""" | |
# Check for any XML errors; xml_str should still be bytes | |
#tree = ET.XML(xml_bytes, parser=UTB()) | |
tree = ET.parse(file) | |
# Get the root element of the XML file | |
root = tree.getroot() | |
# Convert the entire ElementTree (from root) to a string | |
xml_string = ET.tostring(root, encoding='unicode') | |
return xml_string | |
def extract_text(xml_string): | |
"""Get plaintext from the body of the given NLM XML string. | |
This plaintext consists of all paragraphs returned by | |
indra.literature.pmc_client.extract_paragraphs separated | |
by newlines and then finally terminated by a newline. | |
See the DocString of extract_paragraphs for more information. | |
Parameters | |
---------- | |
xml_string : str | |
String containing valid NLM XML. | |
Returns | |
------- | |
str | |
Extracted plaintext. | |
""" | |
paragraphs = extract_paragraphs(xml_string) | |
if paragraphs: | |
return '\n'.join(paragraphs) + '\n' | |
else: | |
return None | |
def extract_paragraphs(xml_string): | |
"""Returns list of paragraphs in an NLM XML. | |
This returns a list of the plaintexts for each paragraph and title in | |
the input XML, excluding some paragraphs with text that should not | |
be relevant to biomedical text processing. | |
Parameters | |
---------- | |
xml_string : str | |
String containing valid NLM XML. | |
Returns | |
------- | |
list of str | |
List of extracted paragraphs from the input NLM XML | |
""" | |
output = [] | |
tree = etree.fromstring(xml_string.encode('utf-8')) | |
# Remove namespaces if any exist | |
if tree.tag.startswith('{'): | |
for element in tree.getiterator(): | |
# The following code will throw a ValueError for some | |
# exceptional tags such as comments and processing instructions. | |
# It's safe to just leave these tag names unchanged. | |
try: | |
element.tag = etree.QName(element).localname | |
except ValueError: | |
continue | |
etree.cleanup_namespaces(tree) | |
# Strip out latex | |
_remove_elements_by_tag(tree, 'tex-math') | |
# Strip out all content in unwanted elements except the captions | |
_replace_unwanted_elements_with_their_captions(tree) | |
# First process front element. Titles alt-titles and abstracts | |
# are pulled from here. | |
front_elements = _select_from_top_level(tree, 'front') | |
for element in front_elements: | |
output.extend(_extract_from_front(element)) | |
# All paragraphs except those in unwanted elements are extracted | |
# from the article body | |
body_elements = _select_from_top_level(tree, 'body') | |
for element in body_elements: | |
output.extend(_extract_from_body(element)) | |
# Only the body sections of subarticles are processed. All | |
# unwanted elements are removed entirely, including captions. | |
# Even boxed-text elements are removed. | |
subarticles = _select_from_top_level(tree, 'sub-article') | |
for element in subarticles: | |
output.extend(_extract_from_subarticle(element)) | |
return output | |
def filter_pmids(pmid_list, source_type): | |
"""Filter a list of PMIDs for ones with full text from PMC. | |
Parameters | |
---------- | |
pmid_list : list of str | |
List of PMIDs to filter. | |
source_type : string | |
One of 'fulltext', 'oa_xml', 'oa_txt', or 'auth_xml'. | |
Returns | |
------- | |
list of str | |
PMIDs available in the specified source/format type. | |
""" | |
global pmids_fulltext_dict | |
# Check args | |
if source_type not in ('fulltext', 'oa_xml', 'oa_txt', 'auth_xml'): | |
raise ValueError("source_type must be one of: 'fulltext', 'oa_xml', " | |
"'oa_txt', or 'auth_xml'.") | |
# Check if we've loaded this type, and lazily initialize | |
if pmids_fulltext_dict.get(source_type) is None: | |
fulltext_list_path = os.path.join(os.path.dirname(__file__), | |
'pmids_%s.txt' % source_type) | |
with open(fulltext_list_path, 'rb') as f: | |
fulltext_list = set([line.strip().decode('utf-8') | |
for line in f.readlines()]) | |
pmids_fulltext_dict[source_type] = fulltext_list | |
return list(set(pmid_list).intersection( | |
pmids_fulltext_dict.get(source_type))) | |
def _select_from_top_level(tree, tag): | |
"""Select direct children of the article element of a tree by tag. | |
Different versions of NLM XML place the article element in different | |
places. We cannot rely on a hard coded path to the article element. This | |
helper function helps select top level elements beneath article from their | |
tag name. We use this to pull out the front, body, and sub-article elements | |
of an article. | |
An assumption is made that there is only one article element in the input | |
XML tree. If this is not the case, only the firt article will be | |
processed. | |
Parameters | |
---------- | |
tree : :py:class:`lxml.etree._Element` | |
lxml element for entire tree of a valid NLM XML | |
tag : str | |
Tag of top level elements to return | |
Returns | |
------- | |
list | |
List containing lxml Element objects of selected top level elements. | |
Typically there is only one front and one body that are direct chilren | |
of the article element, but there can be multiple subarticles. | |
""" | |
if tree.tag == 'article': | |
article = tree | |
else: | |
article = tree.xpath('.//article') | |
if not len(article): | |
raise ValueError('Input XML contains no article element') | |
# Assume there is only one article | |
article = article[0] | |
output = [] | |
xpath = './%s' % tag | |
for element in article.xpath(xpath): | |
output.append(element) | |
return output | |
def _extract_from_front(front_element): | |
"""Return list of titles and paragraphs from front of NLM XML | |
Parameters | |
---------- | |
front_element : :py:class:`lxml.etree._Element` | |
etree element for front of a valid NLM XML | |
Returns | |
------- | |
list of str | |
List of relevant plain text titles and paragraphs taken from front | |
section of NLM XML. These include the article title, alt title, | |
and paragraphs within abstracts. Unwanted paragraphs such as | |
author statements are excluded. | |
""" | |
output = [] | |
title_xpath = './article-meta/title-group/article-title' | |
alt_title_xpath = './article-meta/title-group/alt-title' | |
abstracts_xpath = './article-meta/abstract' | |
for element in front_element.xpath(_xpath_union(title_xpath, | |
alt_title_xpath, | |
abstracts_xpath)): | |
if element.tag == 'abstract': | |
# Extract paragraphs from abstracts | |
output.extend(_extract_paragraphs_from_tree(element)) | |
else: | |
# No paragraphs in titles, Just strip tags | |
output.append(' '.join(element.itertext())) | |
return output | |
def _extract_from_body(body_element): | |
"""Return list of paragraphs from main article body of NLM XML | |
See DocString for extract_paragraphs for more info | |
""" | |
return _extract_paragraphs_from_tree(body_element) | |
def _extract_from_subarticle(subarticle_element): | |
"""Return list of relevant paragraphs from a subarticle | |
See DocString for extract_paragraphs for more info. | |
""" | |
# Get only body element | |
body = subarticle_element.xpath('./body') | |
if not body: | |
return [] | |
body = body[0] | |
# Remove float elements. From observation these do not appear to | |
# contain any meaningful information within sub-articles. | |
for element in body.xpath(".//*[@position='float']"): | |
element.getparent().remove(element) | |
return _extract_paragraphs_from_tree(body) | |
def _remove_elements_by_tag(tree, *tags): | |
"""Remove elements with given tags | |
Removes all element along with all of its content. | |
Modifies input tree inplace | |
Parameters | |
---------- | |
tree : :py:class:`lxml.etree._Element` | |
etree element for valid NLM XML | |
""" | |
bad_xpath = _xpath_union(*['.//%s' % tag for tag in tags]) | |
for element in tree.xpath(bad_xpath): | |
element.getparent().remove(element) | |
def _replace_unwanted_elements_with_their_captions(tree): | |
"""Replace all unwanted elements with their captions | |
Modifies input tree inplace. | |
Parameters | |
---------- | |
tree : :py:class:`lxml.etree._Element` | |
etree element for valid NLM XML | |
""" | |
floats_xpath = "//*[@position='float']" | |
figs_xpath = './/fig' | |
tables_xpath = './/table-wrap' | |
unwanted_xpath = _xpath_union(floats_xpath, figs_xpath, tables_xpath) | |
unwanted = tree.xpath(unwanted_xpath) | |
# Iterating through xpath nodes in reverse leads to processing these | |
# nodes from bottom up. | |
for element in unwanted[::-1]: | |
# Don't remove floats that are boxed-text elements. These often contain | |
# useful information | |
if element.tag == 'boxed-text': | |
continue | |
captions = element.xpath('./caption') | |
captions_element = etree.Element('captions') | |
for caption in captions: | |
captions_element.append(caption) | |
element.getparent().replace(element, captions_element) | |
def _retain_only_pars(tree): | |
"""Strip out all tags except title and p tags | |
Function also changes title tags into p tags. This is a helpful | |
preprocessing step that makes it easier to extract paragraphs in | |
the order of a pre-ordered traversal. | |
Modifies input tree inplace. | |
Parameters | |
---------- | |
tree : :py:class:`lxml.etree._Element` | |
etree element for valid NLM XML | |
""" | |
for element in tree.xpath('.//*'): | |
if element.tag == 'title': | |
element.tag = 'p' | |
for element in tree.xpath('.//*'): | |
parent = element.getparent() | |
if parent is not None and element.tag != 'p': | |
etree.strip_tags(element.getparent(), element.tag) | |
def _pull_nested_paragraphs_to_top(tree): | |
"""Flatten nested paragraphs in pre-ordered traversal | |
Requires _retain_only_pars to be run first. | |
Modifies input tree inplace. | |
Parameters | |
---------- | |
tree : :py:class:`lxml.etree._Element` | |
etree element for valid NLM XML | |
""" | |
# Since _retain_only_pars must be called first, input will contain only p | |
# tags except for possibly the outer most tag. p elements directly beneath | |
# the root will be called depth 1, those beneath depth 1 elements will be | |
# called depth 2 and so on. Proceed iteratively. At each step identify all | |
# p elements with depth 2. Cut all of the depth 2 p elements out of each | |
# parent and append them in order as siblings following the parent (these | |
# depth 2 elements may themselves be the parents of additional p elements). | |
# The algorithm terminates when there are no depth 2 elements remaining. | |
# Find depth 2 p elements | |
nested_paragraphs = tree.xpath('./p/p') | |
while nested_paragraphs: | |
# This points to the location where the next depth 2 p element will | |
# be appended | |
last = None | |
# Store parent of previously processed element to track when parent | |
# changes. | |
old_parent = None | |
for p in nested_paragraphs: | |
parent = p.getparent() | |
# When the parent changes last must be set to the new parent | |
# element. This ensures children will be appended in order | |
# after there parents. | |
if parent != old_parent: | |
last = parent | |
# Remove child element from its parent | |
parent.remove(p) | |
# The parents text occuring after the current child p but before | |
# p's following sibling is stored in p.tail. Append this text to | |
# the parent's text and then clear out p.tail | |
if not parent.text and p.tail: | |
parent.text = p.tail | |
p.tail = '' | |
elif parent.text and p.tail: | |
parent.text += ' ' + p.tail | |
p.tail = '' | |
# Place child in its new location | |
last.addnext(p) | |
last = p | |
nested_paragraphs = tree.xpath('./p/p') | |
def _extract_paragraphs_from_tree(tree): | |
"""Preprocess tree and return it's paragraphs.""" | |
_retain_only_pars(tree) | |
_pull_nested_paragraphs_to_top(tree) | |
paragraphs = [] | |
for element in tree.xpath('./p'): | |
paragraph = ''.join([x.strip() for x in element.itertext()]) | |
paragraphs.append(paragraph) | |
return paragraphs | |
def _xpath_union(*xpath_list): | |
"""Form union of xpath expressions""" | |
return ' | '.join(xpath_list) | |
def get_title(pmcid): | |
"""Get article title from PMC article. | |
Retrieves the XML content for the specified PMC ID and extracts | |
the article title. | |
Parameters | |
---------- | |
pmcid : str | |
PubMed Central ID. | |
Returns | |
------- | |
str | |
The article title as a string, or None if retrieval fails. | |
""" | |
xml_string = get_xml(pmcid) | |
if not xml_string: | |
return | |
tree = etree.fromstring(xml_string.encode('utf-8')) | |
# Remove namespaces if any exist | |
if tree.tag.startswith('{'): | |
for element in tree.getiterator(): | |
# The following code will throw a ValueError for some | |
# exceptional tags such as comments and processing instructions. | |
# It's safe to just leave these tag names unchanged. | |
try: | |
element.tag = etree.QName(element).localname | |
except ValueError: | |
continue | |
etree.cleanup_namespaces(tree) | |
# Strip out latex | |
_remove_elements_by_tag(tree, 'tex-math') | |
# Strip out all content in unwanted elements except the captions | |
_replace_unwanted_elements_with_their_captions(tree) | |
# First process front element. Titles alt-titles and abstracts | |
# are pulled from here. | |
front_elements = _select_from_top_level(tree, 'front') | |
title_xpath = './article-meta/title-group/article-title' | |
for front_element in front_elements: | |
for element in front_element.xpath(title_xpath): | |
return ' '.join(element.itertext()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment