Created
September 5, 2019 19:31
-
-
Save romanegloo/60b428098fea8b86ac697b57964709b6 to your computer and use it in GitHub Desktop.
build_ES_pubtator_index: Building ElasticSearch Index of PubTator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Parse PubTator raw data file, and creates ElasticSearch index, and populate the | |
corpus (index: 'pubtator') | |
You can download PubTator dataset from the following link: | |
ftp://ftp.ncbi.nlm.nih.gov/pub/lu/PubTator/bioconcepts2pubtator_offsets.gz | |
Place the file in 'data/' and run this code. Don't need to decompress the file. | |
We assume that ElasticSearch is properly installed and accessible via its API | |
channel. We use python elasticsearch client library. | |
""" | |
import sys | |
import re | |
import logging | |
import gzip | |
from pathlib import Path | |
import coloredlogs | |
from elasticsearch import Elasticsearch as ES | |
from elasticsearch.helpers import bulk | |
logger = logging.getLogger(__name__) | |
conf = { | |
'es_index': 'pubtator', | |
'es_host': 'localhost', | |
'es_port': 9200, | |
'batch_size': 10000, | |
} | |
def create_action(actions, doc): | |
"""Single ES API request to index a document""" | |
act = { | |
'_op_type': 'index', | |
'_index': conf['es_index'], | |
'_type': '_doc', | |
'_id': None, | |
'_source': { | |
'_title': None, | |
'_body': None, | |
'_annotations': None | |
} | |
} | |
p_title = re.compile(r'^((\d+)\|[t]\|)(.*)') | |
p_body = re.compile(r'^((\d+)\|[a]\|)(.*)') | |
annotations = [] | |
for line in doc: | |
m = p_title.match(line) | |
if m: # Title | |
act['_id'] = m.group(2) | |
act['_source']['_title'] = m.group(3) | |
continue | |
m = p_body.match(line) | |
if m: # Body | |
act['_source']['_body'] = m.group(3) | |
continue | |
# Annotation lines | |
annotations.append(line) | |
act['_source']['_annotations'] = '\n'.join(annotations) | |
actions.append(act) | |
def run_indexing(es, actions): | |
try: | |
resp = bulk(es, actions) | |
except Exception as ex: | |
logger.error('Error in run_indexing: ' + str(ex)) | |
else: | |
return resp[0] | |
def main(): | |
# Logger | |
coloredlogs.install(level='INFO', logger=logger) | |
# Instantiate ES client and check connection status | |
es = ES([{'host': conf['es_host'], 'port': conf['es_port']}]) | |
if es.ping(): | |
logger.info("ES Client connected") | |
logger.debug(es.info()) | |
else: | |
logger.error("Couldn't connect to the ES server. Terminating...") | |
sys.exit() | |
# Create an index | |
index_name = conf['es_index'] | |
index_settings = { | |
'settings': { | |
'number_of_shards': 1, | |
'number_of_replicas': 0 | |
}, | |
'mappings': { # https://is.gd/wVaD8I | |
'properties': { | |
'title': {'type': 'text'}, | |
'abstract': {'type': 'text'}, | |
'annotations': {'type': 'text'}, | |
} | |
} | |
} | |
try: | |
if not es.indices.exists(index_name): | |
es.indices.create(index=index_name, body=index_settings) | |
logger.info('ES index {} created'.format(index_name)) | |
else: | |
logger.info('ES index {} already exists. ' | |
'Skip creating an index...'.format(index_name)) | |
except Exception as ex: | |
logger.error('failed to create an ES index: {}'.format(str(ex))) | |
# Read pubtator datafile | |
file_pubtator = Path(__file__).resolve().parents[1] / \ | |
'data/bioconcepts2pubtator_offsets.gz' | |
with gzip.open(file_pubtator, 'rt') as f: | |
doc = [] | |
counter = [0, 0, 0] # [num_attempts, docs_indexed, batch_counter] | |
actions = [] | |
for line in f: | |
line = line.rstrip() | |
if len(line) > 0: | |
doc.append(line) | |
else: | |
create_action(actions, doc) | |
doc = [] | |
counter[2] += 1 | |
if counter[2] >= conf['batch_size']: | |
counter[0] += len(actions) | |
succ = run_indexing(es, actions) | |
succ = len(actions) # debug | |
counter[1] += succ | |
print('docs read: {}, docs indexed: {}, batch size: {}' | |
''.format(*counter), end='\r', flush=True) | |
counter[2] = 0 | |
actions = [] | |
if actions: # Run the rest of actions | |
counter[0] += len(actions) | |
succ = run_indexing(es, actions) | |
counter[1] += succ | |
# Print out the statistics | |
logger.info(f"Number of docs read: {counter[0]}") | |
logger.info(f"Number of docs indexed: {counter[1]}") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment