Last active
February 25, 2025 09:26
-
-
Save ivarref/0c7375a9fe244f83be3937f3517335d2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env -S python3 -u | |
import csv | |
import datetime | |
import glob | |
import gzip | |
import json | |
import os | |
import sys | |
import time | |
import hashlib | |
import concurrent.futures | |
import multiprocessing | |
import traceback | |
import subprocess | |
import getpass | |
import pathlib | |
valid_args = set() | |
def get_arg(argname, default=None): | |
assert default is not None | |
valid_args.add(argname) | |
if argname in sys.argv: | |
return sys.argv[sys.argv.index(argname)+1] | |
else: | |
return default | |
def has_arg(argname): | |
valid_args.add(argname) | |
return argname in sys.argv | |
def has_some_arg(args): | |
for arg in args: | |
valid_args.add(arg) | |
for arg in args: | |
if arg in sys.argv: | |
return True | |
return False | |
# Location of aws cli | |
AWS_CLI = get_arg('--aws-cli', 'aws') | |
# Where to store gzipped flattened CSV files | |
CSV_DIR = get_arg('--csv-dir', '/database/openalex/csv-file') | |
# The database name to use | |
DB_NAME = get_arg('--db-name', 'openalex_testivar') | |
# File to keep track of which files have been flattened and DB hash | |
DONE_DIR = get_arg('--done-dir', '/database/openalex/done_files') | |
_DONE_FILE_DEFAULT = os.path.join('/database/openalex/done_files', 'done_files_' + DB_NAME + '.txt') | |
DONE_FILE = get_arg('--done-file', _DONE_FILE_DEFAULT) | |
# If this flag is set, the DONE_FILE will be removed before running. | |
# This will re-populate the database from scratch. Useful if CSV-producing changes is made to this script. | |
FORCE_WIPE = has_arg('--force-wipe') | |
# Logs will be stored here | |
LOG_DIR = get_arg('--log-dir', os.path.join('/database/openalex/logs')) | |
_LOG_FILE_DEFAULT = os.path.join(LOG_DIR, 'flatten_openalex_jsonl_' + DB_NAME + '_' + datetime.datetime.today().strftime('%Y-%m-%d') + '.log') | |
LOG_FILE = get_arg('--log-file', _LOG_FILE_DEFAULT) | |
# Location of psql executeable | |
PSQL = get_arg('--psql', 'psql') | |
# Location of base schema file, defaults to same directory as this file | |
SCHEMA_FILE = get_arg('--schema-file', os.path.join(os.path.dirname(os.path.realpath(__file__)), 'openalex-pg-schema.sql')) | |
# Where to read jsonl snapshot files from | |
SNAPSHOT_DIR = get_arg('--snapshot-dir', '/database/openalex/openalex-snapshot') | |
# Only show config, check existence of AWS_CLI and PSQL and then exit | |
SHOW_CONFIG = has_some_arg(['--help', '-h', '--config']) | |
# Whether to sync SNAPSHOT_DIR by using the aws cli | |
SYNC = True | |
FILES_PER_ENTITY = int(os.environ.get('OPENALEX_DEMO_FILES_PER_ENTITY', '0')) | |
def default_worker_count(): | |
cpu_count = os.cpu_count() | |
if cpu_count is None: | |
return 2 | |
else: | |
return cpu_count | |
WORKER_COUNT = int(get_arg('--worker-count', default_worker_count())) | |
def makedirs_or_exit(dir, nam): | |
try: | |
os.makedirs(dir, exist_ok=True) | |
except Exception: | |
print(traceback.format_exc()) | |
print(f'Failed to create directory {dir}') | |
print(f'Please set {nam} correctly') | |
print('Exiting') | |
sys.exit(1) | |
makedirs_or_exit(CSV_DIR, 'CSV_DIR (--csv-dir)') | |
makedirs_or_exit(SNAPSHOT_DIR, 'SNAPSHOT_DIR (--snapshot-dir)') | |
def make_dirs_for_file_or_exit(filename, nam): | |
try: | |
path = pathlib.Path(filename) | |
path.parent.mkdir(parents=True, exist_ok=True) | |
except Exception: | |
print(traceback.format_exc()) | |
print('Failed to create directory for file', filename) | |
print(f'Please set {nam} correctly') | |
print('Exiting') | |
sys.exit(1) | |
make_dirs_for_file_or_exit(DONE_FILE, 'DONE_FILE (--done-file)') | |
make_dirs_for_file_or_exit(LOG_FILE, 'LOG_FILE (--log-file)') | |
# Running as a crontab entry as a specific user: | |
# https://serverfault.com/questions/352835/crontab-running-as-a-specific-user | |
# $ cat /etc/cron.d/flatten_openalex_jsonl | |
#PATH=/bin:/usr/bin:/sbin:/usr/sbin | |
# The output of the command will be logged to LOG_FILE /database/openalex/logs/flatten_openalex_jsonl_DB_NAME_YYYY_mm_DD.log | |
# Run flatten-openalex every day at 03:30 | |
# minute hour day-of-month month day-of-week user command | |
# 30 3 15 * * ire021 env PGHOST=127.0.0.1 /home/ire021/oalx3/flatten-openalex-jsonl.py --db-name openalex_test01 --aws-cli /usr/local/bin/aws | |
csv_files = { | |
'authors': { | |
'authors': { | |
'name': os.path.join(CSV_DIR, 'authors.csv.gz'), | |
'columns': [ | |
'id', 'orcid', 'display_name', 'display_name_alternatives', | |
'works_count', 'cited_by_count', | |
'last_known_institution', 'works_api_url', 'updated_date', | |
] | |
}, | |
'ids': { | |
'name': os.path.join(CSV_DIR, 'authors_ids.csv.gz'), | |
'columns': [ | |
'author_id', 'openalex', 'orcid', 'scopus', 'twitter', | |
'wikipedia', 'mag' | |
] | |
}, | |
'counts_by_year': { | |
'name': os.path.join(CSV_DIR, 'authors_counts_by_year.csv.gz'), | |
'columns': [ | |
'author_id', 'year', 'works_count', 'cited_by_count', | |
'oa_works_count' | |
] | |
} | |
}, | |
'concepts': { | |
'concepts': { | |
'name': os.path.join(CSV_DIR, 'concepts.csv.gz'), | |
'columns': [ | |
'id', 'wikidata', 'display_name', 'level', 'description', | |
'works_count', 'cited_by_count', 'image_url', | |
'image_thumbnail_url', 'works_api_url', 'updated_date' | |
] | |
}, | |
'ancestors': { | |
'name': os.path.join(CSV_DIR, 'concepts_ancestors.csv.gz'), | |
'columns': ['concept_id', 'ancestor_id'] | |
}, | |
'counts_by_year': { | |
'name': os.path.join(CSV_DIR, 'concepts_counts_by_year.csv.gz'), | |
'columns': ['concept_id', 'year', 'works_count', 'cited_by_count', | |
'oa_works_count'] | |
}, | |
'ids': { | |
'name': os.path.join(CSV_DIR, 'concepts_ids.csv.gz'), | |
'columns': ['concept_id', 'openalex', 'wikidata', 'wikipedia', | |
'umls_aui', 'umls_cui', 'mag'] | |
}, | |
'related_concepts': { | |
'name': os.path.join(CSV_DIR, 'concepts_related_concepts.csv.gz'), | |
'columns': ['concept_id', 'related_concept_id', 'score'] | |
} | |
}, | |
'topics': { | |
'topics': { | |
'name': os.path.join(CSV_DIR, 'topics.csv.gz'), | |
'columns': ['id', 'display_name', 'subfield_id', | |
'subfield_display_name', 'field_id', | |
'field_display_name', | |
'domain_id', 'domain_display_name', 'description', | |
'keywords', 'works_api_url', 'wikipedia_id', | |
'works_count', 'cited_by_count', 'updated_date'] | |
} | |
}, | |
'institutions': { | |
'institutions': { | |
'name': os.path.join(CSV_DIR, 'institutions.csv.gz'), | |
'columns': [ | |
'id', 'ror', 'display_name', 'country_code', 'type', | |
'homepage_url', 'image_url', 'image_thumbnail_url', | |
'display_name_acronyms', 'display_name_alternatives', | |
'works_count', 'cited_by_count', 'works_api_url', | |
'updated_date' | |
] | |
}, | |
'ids': { | |
'name': os.path.join(CSV_DIR, 'institutions_ids.csv.gz'), | |
'columns': [ | |
'institution_id', 'openalex', 'ror', 'grid', 'wikipedia', | |
'wikidata', 'mag' | |
] | |
}, | |
'geo': { | |
'name': os.path.join(CSV_DIR, 'institutions_geo.csv.gz'), | |
'columns': [ | |
'institution_id', 'city', 'geonames_city_id', 'region', | |
'country_code', 'country', 'latitude', | |
'longitude' | |
] | |
}, | |
'associated_institutions': { | |
'name': os.path.join(CSV_DIR, | |
'institutions_associated_institutions.csv.gz'), | |
'columns': [ | |
'institution_id', 'associated_institution_id', 'relationship' | |
] | |
}, | |
'counts_by_year': { | |
'name': os.path.join(CSV_DIR, 'institutions_counts_by_year.csv.gz'), | |
'columns': [ | |
'institution_id', 'year', 'works_count', 'cited_by_count', | |
'oa_works_count' | |
] | |
} | |
}, | |
'publishers': { | |
'publishers': { | |
'name': os.path.join(CSV_DIR, 'publishers.csv.gz'), | |
'columns': [ | |
'id', 'display_name', 'alternate_titles', 'country_codes', | |
'hierarchy_level', 'parent_publisher', | |
'works_count', 'cited_by_count', 'sources_api_url', | |
'updated_date' | |
] | |
}, | |
'counts_by_year': { | |
'name': os.path.join(CSV_DIR, 'publishers_counts_by_year.csv.gz'), | |
'columns': ['publisher_id', 'year', 'works_count', 'cited_by_count', | |
'oa_works_count'] | |
}, | |
'ids': { | |
'name': os.path.join(CSV_DIR, 'publishers_ids.csv.gz'), | |
'columns': ['publisher_id', 'openalex', 'ror', 'wikidata'] | |
}, | |
}, | |
'sources': { | |
'sources': { | |
'name': os.path.join(CSV_DIR, 'sources.csv.gz'), | |
'columns': [ | |
'id', 'issn_l', 'issn', 'display_name', 'publisher', | |
'works_count', 'cited_by_count', 'is_oa', | |
'is_in_doaj', 'homepage_url', 'works_api_url', 'updated_date' | |
] | |
}, | |
'ids': { | |
'name': os.path.join(CSV_DIR, 'sources_ids.csv.gz'), | |
'columns': ['source_id', 'openalex', 'issn_l', 'issn', 'mag', | |
'wikidata', 'fatcat'] | |
}, | |
'counts_by_year': { | |
'name': os.path.join(CSV_DIR, 'sources_counts_by_year.csv.gz'), | |
'columns': ['source_id', 'year', 'works_count', 'cited_by_count', | |
'oa_works_count'] | |
}, | |
}, | |
'works': { | |
'works': { | |
'name': os.path.join(CSV_DIR, 'works.csv.gz'), | |
'columns': [ | |
'id', 'doi', 'title', 'display_name', 'publication_year', | |
'publication_date', 'type', 'cited_by_count', | |
'is_retracted', 'is_paratext', 'cited_by_api_url', | |
'abstract_inverted_index', 'language' | |
] | |
}, | |
'primary_locations': { | |
'name': os.path.join(CSV_DIR, 'works_primary_locations.csv.gz'), | |
'columns': [ | |
'work_id', 'source_id', 'landing_page_url', 'pdf_url', 'is_oa', | |
'version', 'license' | |
] | |
}, | |
'locations': { | |
'name': os.path.join(CSV_DIR, 'works_locations.csv.gz'), | |
'columns': [ | |
'work_id', 'source_id', 'landing_page_url', 'pdf_url', 'is_oa', | |
'version', 'license' | |
] | |
}, | |
'best_oa_locations': { | |
'name': os.path.join(CSV_DIR, 'works_best_oa_locations.csv.gz'), | |
'columns': [ | |
'work_id', 'source_id', 'landing_page_url', 'pdf_url', 'is_oa', | |
'version', 'license' | |
] | |
}, | |
'authorships': { | |
'name': os.path.join(CSV_DIR, 'works_authorships.csv.gz'), | |
'columns': [ | |
'work_id', 'author_position', 'author_id', 'institution_id', | |
'raw_affiliation_string' | |
] | |
}, | |
'biblio': { | |
'name': os.path.join(CSV_DIR, 'works_biblio.csv.gz'), | |
'columns': [ | |
'work_id', 'volume', 'issue', 'first_page', 'last_page' | |
] | |
}, | |
'topics': { | |
'name': os.path.join(CSV_DIR, 'works_topics.csv.gz'), | |
'columns': [ | |
'work_id', 'topic_id', 'score' | |
] | |
}, | |
'concepts': { | |
'name': os.path.join(CSV_DIR, 'works_concepts.csv.gz'), | |
'columns': [ | |
'work_id', 'concept_id', 'score' | |
] | |
}, | |
'ids': { | |
'name': os.path.join(CSV_DIR, 'works_ids.csv.gz'), | |
'columns': [ | |
'work_id', 'openalex', 'doi', 'mag', 'pmid', 'pmcid' | |
] | |
}, | |
'mesh': { | |
'name': os.path.join(CSV_DIR, 'works_mesh.csv.gz'), | |
'columns': [ | |
'work_id', 'descriptor_ui', 'descriptor_name', 'qualifier_ui', | |
'qualifier_name', 'is_major_topic' | |
] | |
}, | |
'open_access': { | |
'name': os.path.join(CSV_DIR, 'works_open_access.csv.gz'), | |
'columns': [ | |
'work_id', 'is_oa', 'oa_status', 'oa_url', | |
'any_repository_has_fulltext' | |
] | |
}, | |
'referenced_works': { | |
'name': os.path.join(CSV_DIR, 'works_referenced_works.csv.gz'), | |
'columns': [ | |
'work_id', 'referenced_work_id' | |
] | |
}, | |
'related_works': { | |
'name': os.path.join(CSV_DIR, 'works_related_works.csv.gz'), | |
'columns': [ | |
'work_id', 'related_work_id' | |
] | |
}, | |
}, | |
} | |
data_types = ['authors', 'topics', 'concepts', 'institutions', 'publishers', 'sources', 'works'] | |
shutdown_event = None | |
log_lock = None | |
def add_log(lin): | |
with open(LOG_FILE, 'a', encoding='utf-8') as f: | |
f.write(lin + '\n') | |
def log_info(m): | |
global log_lock | |
assert log_lock is not None | |
now_str = str(datetime.datetime.now()) | |
now_str = now_str.split(".")[0] | |
s = f'{now_str} INFO {m}' | |
with log_lock: | |
print(s, flush=True) | |
add_log(s) | |
def log_warn(m): | |
global log_lock | |
assert log_lock is not None | |
now_str = str(datetime.datetime.now()) | |
now_str = now_str.split(".")[0] | |
s = f'{now_str} WARN {m}' | |
with log_lock: | |
print(s, flush=True) | |
add_log(s) | |
def log_error(m): | |
global log_lock | |
assert log_lock is not None | |
now_str = str(datetime.datetime.now()) | |
now_str = now_str.split(".")[0] | |
s = f'{now_str} ERROR {m}' | |
with log_lock: | |
print(s, flush=True) | |
add_log(s) | |
def log_exception(): | |
traceback.print_exc() | |
s = traceback.format_exc() | |
for lin in s.splitlines(): | |
log_error(lin) | |
def sha_filename(f, sha): | |
assert f.endswith('.csv.gz') | |
f = f[:-len('.csv.gz')] | |
return f'{f}.sha_{sha}.csv.gz' | |
def current_milli_time(): | |
return time.time_ns() // 1_000_000 | |
def current_seconds_time(): | |
return time.time_ns() // 1_000_000_000 | |
def ms_to_eta(milliseconds): | |
seconds = milliseconds / 1000 | |
(days, seconds) = divmod(seconds, int(24*3600)) | |
(hours, seconds) = divmod(seconds, 3600) | |
(minutes, seconds) = divmod(seconds, 60) | |
if days > 0: | |
return f"{int(days):} days, {hours:02.0f}:{minutes:02.0f}:{seconds:02.0f}" | |
elif hours > 0: | |
return f"{hours:02.0f}:{minutes:02.0f}:{seconds:02.0f}" | |
else: | |
return f"{minutes:02.0f}:{seconds:02.0f}" | |
def file_list_to_filesize(file_list): | |
total = 0 | |
for data_type, filename in file_list: | |
total += file_size(filename) | |
return total | |
def exec_verbose(args): | |
try: | |
res = subprocess.run(args, capture_output=True) | |
if res.returncode != 0: | |
log_error(f'Command {args}') | |
log_error(f'Failed with exit code {res.returncode}') | |
if 0 == len(res.stdout.splitlines()): | |
log_error('Stdout: <empty>') | |
else: | |
log_error('Stdout:') | |
log_error('='*80) | |
for lin in res.stdout.splitlines(): | |
try: | |
lin_decoded = lin.decode('utf-8') | |
log_error(lin_decoded) | |
except UnicodeDecodeError: | |
log_error(lin) | |
if 0 == len(res.stderr.splitlines()): | |
log_error('Stderr: <empty>') | |
else: | |
log_error('Stderr:') | |
log_error('='*80) | |
for lin in res.stderr.splitlines(): | |
try: | |
lin_decoded = lin.decode('utf-8') | |
log_error(lin_decoded) | |
except UnicodeDecodeError: | |
log_error(lin) | |
log_error('='*80) | |
log_error(f'Command {args} failed with exit code {res.returncode}') | |
raise ExecException() | |
else: | |
# info(f'CMD {str(args)} succeeded') | |
sout = '' | |
for lin in res.stdout.splitlines(): | |
try: | |
sout += lin.decode('utf-8') | |
except UnicodeDecodeError: | |
sout += str(lin) | |
sout += '\n' | |
return sout | |
except FileNotFoundError: | |
log_error(f'Executeable "{args[0]}" was not found!') | |
log_error(f'Full command: {args}') | |
raise ExecException() | |
def exec_verbose_no_capture(args): | |
res = subprocess.run(args, capture_output=False, stdout=sys.stdout, stderr=sys.stderr) | |
if res.returncode != 0: | |
log_error(f'Command {args}') | |
log_error(f'Failed with return code {res.returncode}') | |
raise ExecException() | |
def exec_ignore_errors(args): | |
res = subprocess.run(args, capture_output=True) | |
if res.returncode == 0: | |
return True | |
else: | |
return False | |
class ExecException(Exception): | |
pass | |
def sha256sum(filename): | |
with open(filename, 'rb', buffering=0) as f: | |
return hashlib.file_digest(f, 'sha1').hexdigest() | |
def file_size(f): | |
assert os.path.isfile(f) | |
return os.path.getsize(f) | |
def abbr_filename(all_files, f): | |
maxlen = 0 | |
for _typ, af in all_files: | |
lastpart = af.split(os.path.sep) | |
last = os.path.sep.join(lastpart[-3:]) | |
maxlen = max(len(last), maxlen) | |
paths = f.split(os.path.sep) | |
# (SNAPSHOT_DIR, 'data', 'topics', '*', '*.gz') | |
v = os.path.sep.join(paths[-3:]) | |
return v.ljust(maxlen) | |
def calc_sha_file_job(f): | |
global shutdown_event | |
assert shutdown_event is not None | |
try: | |
if shutdown_event.is_set(): | |
return None | |
else: | |
sha = sha256sum(f) | |
return f, sha | |
except KeyboardInterrupt: | |
shutdown_event.set() | |
raise | |
def get_file_to_sha_map(shutdown_evt, executor, all_files): | |
mapp = {} | |
futures = [] | |
for f in all_files: | |
fut = executor.submit(calc_sha_file_job, f) | |
futures.append(fut) | |
for fut in concurrent.futures.as_completed(futures): | |
if shutdown_evt.is_set(): | |
break | |
res = fut.result() | |
if res is None: | |
shutdown_evt.set_error() | |
else: | |
assert isinstance(res, tuple) | |
assert 2 == len(res) | |
f, sha = res | |
mapp[f] = sha | |
return mapp | |
def get_file_to_sha_map_verbose(shutdown_evt, executor, all_files): | |
log_info('Getting file to sha dict ...') | |
start_time_seconds_sha = current_seconds_time() | |
file_to_sha = get_file_to_sha_map(shutdown_evt, executor, [x[1] for x in all_files]) | |
log_info(f'Getting file to sha dict ... Done in {current_seconds_time() - start_time_seconds_sha}s') | |
return file_to_sha | |
STAGE_CREATE_CSV = 'create_csv' | |
STAGE_IMPORT_DB = 'import_db' | |
def mark_file_done(stage, typ, filename, sha): | |
assert '\n' not in filename | |
assert '\r' not in filename | |
assert ':' not in filename | |
assert typ in data_types or 'db_hash' == typ | |
assert stage in [STAGE_CREATE_CSV, STAGE_IMPORT_DB] | |
with open(DONE_FILE, 'a', encoding='utf-8') as f: | |
f.write(f'{stage}:{typ}:{sha}:{filename}\n') | |
def is_done(stage, typ, filename, sha): | |
assert '\n' not in filename | |
assert ':' not in filename | |
assert '\r' not in filename | |
assert typ in data_types or 'db_hash' == typ | |
assert stage in [STAGE_CREATE_CSV, STAGE_IMPORT_DB] | |
signature = f'{stage}:{typ}:{sha}:{filename}' | |
if os.path.exists(DONE_FILE): | |
with open(DONE_FILE, 'r', encoding='utf-8') as f: | |
for lin in f.readlines(): | |
lin = lin.strip() | |
if lin == signature: | |
return True | |
return False | |
def get_all_files(): | |
all_files = [] | |
for data_type in data_types: | |
assert os.path.exists(os.path.join(SNAPSHOT_DIR, 'data', data_type)) | |
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', data_type, '*', '*.gz')): | |
all_files.append((data_type, jsonl_file_name)) | |
all_files.sort(key=lambda x: x[1]) | |
return all_files | |
def get_all_files_allow_missing(): | |
all_files = [] | |
for data_type in data_types: | |
if os.path.exists(os.path.join(SNAPSHOT_DIR, 'data', data_type)): | |
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', data_type, '*', '*.gz')): | |
all_files.append((data_type, jsonl_file_name)) | |
all_files.sort(key=lambda x: x[1]) | |
return all_files | |
class Event2(object): | |
def __init__(self, *, ctx=None): | |
if ctx is None: | |
ctx = multiprocessing.get_context() | |
self._flag = ctx.Value('i', 0) | |
# Allocate a ctypes.c_ulonglong to hold the set_id: | |
# Represents the C unsigned long long datatype. | |
# The constructor accepts an optional integer initializer; no overflow checking is done. | |
# From https://docs.python.org/3/library/ctypes.html#ctypes.c_ulonglong | |
# See multiprocessing/sharedctypes.py for typecode to ctypes definitions | |
self._set_id = ctx.Value('Q', 0) | |
def is_set(self): | |
# From https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Value : | |
# If lock is True (the default) then a new recursive lock object is created to synchronize access to the value. | |
with self._flag: | |
return self._flag.value == 1 or self._flag.value == 2 | |
def is_error(self): | |
# From https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Value : | |
# If lock is True (the default) then a new recursive lock object is created to synchronize access to the value. | |
with self._flag: | |
return self._flag.value == 2 | |
def set_error(self): | |
with self._flag: | |
with self._set_id: | |
if self._flag.value == 0 or self._flag.value == 1: | |
self._flag.value = 2 | |
self._set_id.value += 1 | |
def set(self): | |
# If `set` is called from a signal handler, this is fine as the lock is recursive (i.e. it won't deadlock). | |
# If the thread interrupted by the signal handler is wait()-ing and the signal handler calls set(), | |
# this is fine as wait() spins on the value. Thus, after the signal handler is done, the thread will | |
# return from wait() | |
# Fixes https://github.com/python/cpython/issues/85772 | |
with self._flag: | |
with self._set_id: | |
if self._flag.value == 0: | |
# There is a theoretical chance of race here. It requires the following conditions: | |
# The interrupted thread must be wait()ing. | |
# Then set must be called reentrant for the maximum value of c_ulonglong times, | |
# and all interruptions must happen exactly after `if self._flag.value == 0:`. | |
# The _set_id value will then wrap around. Then clear() must be called | |
# before the original wait() code continues. The wait() code will then continue | |
# to (incorrectly) wait. I think this case is safe to ignore. The stack | |
# will grow too large before there is any chance of this actually happening. | |
self._flag.value = 1 | |
self._set_id.value += 1 | |
# There is no race here by reentrant set when reaching the maximum value for `self._set_id.value`. | |
# ctypes.c_ulonglong will overflow without any exception: | |
# https://docs.python.org/3/library/ctypes.html#ctypes.c_ulonglong | |
# > no overflow checking is done. | |
# This means that we do not need to check if some maximum value is reached: | |
# C will wrap around the value for us. | |
def clear(self): | |
with self._flag: | |
self._flag.value = 0 | |
def wait(self, timeout=None): | |
start_time = time.monotonic() | |
set_id = self._set_id.value | |
while True: | |
if self._flag.value == 1 or self._flag.value == 2: | |
return True | |
elif set_id != self._set_id.value: | |
return True # flag is unset, but set_id changed, so there must have been a `set` followed by a `clear` | |
# during `time.sleep()`. Fixes https://github.com/python/cpython/issues/95826 | |
elif timeout is not None and (time.monotonic() - start_time) > timeout: | |
return False | |
else: | |
# Fixes https://github.com/python/cpython/issues/85772 by spinning and sleeping. | |
time.sleep(0.010) # sleep 10 milliseconds | |
def __repr__(self) -> str: | |
set_status = 'set' if self.is_set() else 'unset' | |
return f"<{type(self).__qualname__} at {id(self):#x} {set_status}>" | |
def check_aws_cli(aws_cli): | |
log_info(f'Testing AWS_CLI {aws_cli} ...') | |
cmd = [aws_cli, '--version'] | |
sout = exec_verbose(cmd).strip() | |
log_info(f'Testing AWS_CLI {aws_cli} ... OK. Version: {sout}') | |
def aws_sync(aws_cli, snapshot_dir): | |
old_files = get_all_files_allow_missing() | |
cmd = [aws_cli, | |
's3', | |
'sync', | |
's3://openalex', | |
snapshot_dir, | |
'--no-sign-request', | |
'--delete'] | |
# If you download the snapshot into an existing folder, you'll need to use the aws s3 sync --delete flag to remove files from any previous downloads. You can also remove the contents of destination folder manually. If you don't, you will see duplicate Entities that have moved from one file to another between snapshot updates. | |
# https://docs.openalex.org/download-all-data/download-to-your-machine | |
log_info(f'Syncing s3://openalex by executing {cmd} ...') | |
exec_verbose_no_capture(cmd) | |
log_info(f'Syncing s3://openalex by executing {cmd} ... OK') | |
new_files = get_all_files() | |
file_count_diff = len(new_files) - len(old_files) | |
new_and_old_files = new_files.copy() | |
for fil in old_files: | |
if fil not in new_and_old_files: | |
new_and_old_files.append(fil) | |
if file_count_diff == 0: | |
log_info('No new files in s3://openalex') | |
else: | |
for fil in new_files: | |
if fil not in old_files: | |
filename = fil[1] | |
log_info(f'File {abbr_filename(new_and_old_files, filename)} was added') | |
for fil in old_files: | |
if fil not in new_files: | |
filename = fil[1] | |
log_info(f'File {abbr_filename(new_and_old_files, filename)} was removed') | |
def banner(txt, length=80): | |
banner_txt = ' ' + txt + ' ' | |
pad_left = 25 | |
pad_right = length - len(banner_txt) - pad_left | |
return '='* pad_left + banner_txt + '=' * pad_right | |
def report_create_csv_progress(all_files, total_size, processed_files, filename, data_type, spent_ms): | |
file_no = processed_files.index((data_type, filename)) + 1 | |
report_progress(all_files, total_size, file_list_to_filesize(processed_files), 'Created CSV for', file_no, filename, spent_ms) | |
def flatten_author(filename, sha): | |
file_spec = csv_files['authors'] | |
with gzip.open(sha_filename(file_spec['authors']['name'], sha), 'wt', encoding='utf-8') as authors_csv, \ | |
gzip.open(sha_filename(file_spec['ids']['name'], sha), 'wt', encoding='utf-8') as ids_csv, \ | |
gzip.open(sha_filename(file_spec['counts_by_year']['name'], sha), 'wt', encoding='utf-8') as counts_by_year_csv: | |
authors_writer = csv.DictWriter(authors_csv, fieldnames=file_spec['authors']['columns'], extrasaction='ignore') | |
authors_writer.writeheader() | |
ids_writer = csv.DictWriter(ids_csv, fieldnames=file_spec['ids']['columns']) | |
ids_writer.writeheader() | |
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=file_spec['counts_by_year']['columns']) | |
counts_by_year_writer.writeheader() | |
files_done = 0 | |
jsonl_file_name = filename | |
with gzip.open(jsonl_file_name, 'r') as authors_jsonl: | |
for author_json in authors_jsonl: | |
if not author_json.strip(): | |
continue | |
author = json.loads(author_json) | |
if not (author_id := author.get('id')): | |
continue | |
# authors | |
author['display_name_alternatives'] = json.dumps( | |
author.get('display_name_alternatives'), | |
ensure_ascii=False) | |
author['last_known_institution'] = ( | |
author.get('last_known_institution') or {}).get( | |
'id') | |
authors_writer.writerow(author) | |
# ids | |
if author_ids := author.get('ids'): | |
author_ids['author_id'] = author_id | |
ids_writer.writerow(author_ids) | |
# counts_by_year | |
if counts_by_year := author.get('counts_by_year'): | |
for count_by_year in counts_by_year: | |
count_by_year['author_id'] = author_id | |
counts_by_year_writer.writerow(count_by_year) | |
files_done += 1 | |
def flatten_topics(already_processed_files, all_files): | |
total_size = file_list_to_filesize(all_files) | |
data_type = 'topics' | |
with gzip.open(csv_files['topics']['topics']['name'], 'wt', encoding='utf-8') as topics_csv: | |
topics_writer = csv.DictWriter(topics_csv, fieldnames=csv_files['topics']['topics']['columns']) | |
topics_writer.writeheader() | |
seen_topic_ids = set() | |
files_done = 0 | |
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'topics', '*', '*.gz')): | |
start_time_ms = current_milli_time() | |
with gzip.open(jsonl_file_name, 'r') as topics_jsonl: | |
for line in topics_jsonl: | |
if not line.strip(): | |
continue | |
topic = json.loads(line) | |
topic['keywords'] = '; '.join(topic.get('keywords', '')) | |
if not ( | |
topic_id := topic.get('id')) or topic_id in seen_topic_ids: | |
continue | |
seen_topic_ids.add(topic_id) | |
for key in ('subfield', 'field', 'domain'): | |
topic[f'{key}_id'] = topic[key]['id'] | |
topic[f'{key}_display_name'] = topic[key]['display_name'] | |
del topic[key] | |
topic['updated_date'] = topic['updated'] | |
del topic['updated'] | |
topic['wikipedia_id'] = topic['ids'].get('wikipedia') | |
del topic['ids'] | |
del topic['created_date'] | |
del topic['siblings'] | |
# info(str(topic['siblings'])) | |
topics_writer.writerow(topic) | |
spent_ms = current_milli_time() - start_time_ms | |
already_processed_files.append((data_type, jsonl_file_name)) | |
report_create_csv_progress(all_files, total_size, already_processed_files, jsonl_file_name, data_type, spent_ms) | |
files_done += 1 | |
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY: | |
break | |
def flatten_concepts(already_processed_files, all_files): | |
total_size = file_list_to_filesize(all_files) | |
data_type = 'topics' | |
with gzip.open(csv_files['concepts']['concepts']['name'], 'wt', encoding='utf-8') as concepts_csv, \ | |
gzip.open(csv_files['concepts']['ancestors']['name'], 'wt', encoding='utf-8') as ancestors_csv, \ | |
gzip.open(csv_files['concepts']['counts_by_year']['name'], 'wt', encoding='utf-8') as counts_by_year_csv, \ | |
gzip.open(csv_files['concepts']['ids']['name'], 'wt', encoding='utf-8') as ids_csv, \ | |
gzip.open(csv_files['concepts']['related_concepts']['name'], 'wt', encoding='utf-8') as related_concepts_csv: | |
concepts_writer = csv.DictWriter(concepts_csv, fieldnames=csv_files['concepts']['concepts']['columns'],extrasaction='ignore') | |
concepts_writer.writeheader() | |
ancestors_writer = csv.DictWriter(ancestors_csv, fieldnames= | |
csv_files['concepts']['ancestors']['columns']) | |
ancestors_writer.writeheader() | |
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames= | |
csv_files['concepts']['counts_by_year']['columns']) | |
counts_by_year_writer.writeheader() | |
ids_writer = csv.DictWriter(ids_csv, | |
fieldnames=csv_files['concepts']['ids'][ | |
'columns']) | |
ids_writer.writeheader() | |
related_concepts_writer = csv.DictWriter(related_concepts_csv, | |
fieldnames= | |
csv_files['concepts'][ | |
'related_concepts'][ | |
'columns']) | |
related_concepts_writer.writeheader() | |
seen_concept_ids = set() | |
files_done = 0 | |
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'concepts', '*', '*.gz')): | |
start_time_ms = current_milli_time() | |
with gzip.open(jsonl_file_name, 'r') as concepts_jsonl: | |
for concept_json in concepts_jsonl: | |
if not concept_json.strip(): | |
continue | |
concept = json.loads(concept_json) | |
if not (concept_id := concept.get( | |
'id')) or concept_id in seen_concept_ids: | |
continue | |
seen_concept_ids.add(concept_id) | |
concepts_writer.writerow(concept) | |
if concept_ids := concept.get('ids'): | |
concept_ids['concept_id'] = concept_id | |
concept_ids['umls_aui'] = json.dumps( | |
concept_ids.get('umls_aui'), ensure_ascii=False) | |
concept_ids['umls_cui'] = json.dumps( | |
concept_ids.get('umls_cui'), ensure_ascii=False) | |
ids_writer.writerow(concept_ids) | |
if ancestors := concept.get('ancestors'): | |
for ancestor in ancestors: | |
if ancestor_id := ancestor.get('id'): | |
ancestors_writer.writerow({ | |
'concept_id': concept_id, | |
'ancestor_id': ancestor_id | |
}) | |
if counts_by_year := concept.get('counts_by_year'): | |
for count_by_year in counts_by_year: | |
count_by_year['concept_id'] = concept_id | |
counts_by_year_writer.writerow(count_by_year) | |
if related_concepts := concept.get('related_concepts'): | |
for related_concept in related_concepts: | |
if related_concept_id := related_concept.get('id'): | |
related_concepts_writer.writerow({ | |
'concept_id': concept_id, | |
'related_concept_id': related_concept_id, | |
'score': related_concept.get('score') | |
}) | |
spent_ms = current_milli_time() - start_time_ms | |
already_processed_files.append((data_type, jsonl_file_name)) | |
report_create_csv_progress(all_files, total_size, already_processed_files, jsonl_file_name, data_type, spent_ms) | |
files_done += 1 | |
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY: | |
break | |
def flatten_institutions(already_processed_files, all_files): | |
file_spec = csv_files['institutions'] | |
total_size = file_list_to_filesize(all_files) | |
with gzip.open(file_spec['institutions']['name'], 'wt', encoding='utf-8') as institutions_csv, \ | |
gzip.open(file_spec['ids']['name'], 'wt', encoding='utf-8') as ids_csv, \ | |
gzip.open(file_spec['geo']['name'], 'wt', encoding='utf-8') as geo_csv, \ | |
gzip.open(file_spec['associated_institutions']['name'], 'wt', encoding='utf-8') as associated_institutions_csv, \ | |
gzip.open(file_spec['counts_by_year']['name'], 'wt', encoding='utf-8') as counts_by_year_csv: | |
institutions_writer = csv.DictWriter( | |
institutions_csv, fieldnames=file_spec['institutions']['columns'], | |
extrasaction='ignore' | |
) | |
institutions_writer.writeheader() | |
ids_writer = csv.DictWriter(ids_csv, | |
fieldnames=file_spec['ids']['columns']) | |
ids_writer.writeheader() | |
geo_writer = csv.DictWriter(geo_csv, | |
fieldnames=file_spec['geo']['columns']) | |
geo_writer.writeheader() | |
associated_institutions_writer = csv.DictWriter( | |
associated_institutions_csv, | |
fieldnames=file_spec['associated_institutions']['columns'] | |
) | |
associated_institutions_writer.writeheader() | |
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames= | |
file_spec['counts_by_year']['columns']) | |
counts_by_year_writer.writeheader() | |
seen_institution_ids = set() | |
files_done = 0 | |
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'institutions', '*', '*.gz')): | |
start_time_ms = current_milli_time() | |
with gzip.open(jsonl_file_name, 'r') as institutions_jsonl: | |
for institution_json in institutions_jsonl: | |
if not institution_json.strip(): | |
continue | |
institution = json.loads(institution_json) | |
if not (institution_id := institution.get( | |
'id')) or institution_id in seen_institution_ids: | |
continue | |
seen_institution_ids.add(institution_id) | |
# institutions | |
institution['display_name_acronyms'] = json.dumps( | |
institution.get('display_name_acronyms'), | |
ensure_ascii=False) | |
institution['display_name_alternatives'] = json.dumps( | |
institution.get('display_name_alternatives'), | |
ensure_ascii=False) | |
institutions_writer.writerow(institution) | |
# ids | |
if institution_ids := institution.get('ids'): | |
institution_ids['institution_id'] = institution_id | |
ids_writer.writerow(institution_ids) | |
# geo | |
if institution_geo := institution.get('geo'): | |
institution_geo['institution_id'] = institution_id | |
geo_writer.writerow(institution_geo) | |
# associated_institutions | |
if associated_institutions := institution.get( | |
'associated_institutions', | |
institution.get('associated_insitutions') | |
# typo in api | |
): | |
for associated_institution in associated_institutions: | |
if associated_institution_id := associated_institution.get( | |
'id'): | |
associated_institutions_writer.writerow({ | |
'institution_id': institution_id, | |
'associated_institution_id': associated_institution_id, | |
'relationship': associated_institution.get( | |
'relationship') | |
}) | |
# counts_by_year | |
if counts_by_year := institution.get('counts_by_year'): | |
for count_by_year in counts_by_year: | |
count_by_year['institution_id'] = institution_id | |
counts_by_year_writer.writerow(count_by_year) | |
spent_ms = current_milli_time() - start_time_ms | |
data_type = 'institutions' | |
already_processed_files.append((data_type, jsonl_file_name)) | |
report_create_csv_progress(all_files, total_size, already_processed_files, jsonl_file_name, data_type, spent_ms) | |
files_done += 1 | |
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY: | |
break | |
def flatten_publishers(already_processed_files, all_files): | |
total_size = file_list_to_filesize(all_files) | |
with gzip.open(csv_files['publishers']['publishers']['name'], 'wt', encoding='utf-8') as publishers_csv, \ | |
gzip.open(csv_files['publishers']['counts_by_year']['name'], 'wt', encoding='utf-8') as counts_by_year_csv, \ | |
gzip.open(csv_files['publishers']['ids']['name'], 'wt', encoding='utf-8') as ids_csv: | |
publishers_writer = csv.DictWriter( | |
publishers_csv, | |
fieldnames=csv_files['publishers']['publishers']['columns'], | |
extrasaction='ignore' | |
) | |
publishers_writer.writeheader() | |
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames= | |
csv_files['publishers']['counts_by_year']['columns']) | |
counts_by_year_writer.writeheader() | |
ids_writer = csv.DictWriter(ids_csv, | |
fieldnames=csv_files['publishers']['ids'][ | |
'columns']) | |
ids_writer.writeheader() | |
seen_publisher_ids = set() | |
files_done = 0 | |
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'publishers', '*', '*.gz')): | |
start_time_ms = current_milli_time() | |
with gzip.open(jsonl_file_name, 'r') as concepts_jsonl: | |
for publisher_json in concepts_jsonl: | |
if not publisher_json.strip(): | |
continue | |
publisher = json.loads(publisher_json) | |
if not (publisher_id := publisher.get( | |
'id')) or publisher_id in seen_publisher_ids: | |
continue | |
seen_publisher_ids.add(publisher_id) | |
# publishers | |
publisher['alternate_titles'] = json.dumps( | |
publisher.get('alternate_titles'), ensure_ascii=False) | |
publisher['country_codes'] = json.dumps( | |
publisher.get('country_codes'), ensure_ascii=False) | |
publishers_writer.writerow(publisher) | |
if publisher_ids := publisher.get('ids'): | |
publisher_ids['publisher_id'] = publisher_id | |
ids_writer.writerow(publisher_ids) | |
if counts_by_year := publisher.get('counts_by_year'): | |
for count_by_year in counts_by_year: | |
count_by_year['publisher_id'] = publisher_id | |
counts_by_year_writer.writerow(count_by_year) | |
data_type = 'publishers' | |
spent_ms = current_milli_time() - start_time_ms | |
already_processed_files.append((data_type, jsonl_file_name)) | |
report_create_csv_progress(all_files, total_size, already_processed_files, jsonl_file_name, data_type, spent_ms) | |
files_done += 1 | |
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY: | |
break | |
def flatten_sources(already_processed_files, all_files): | |
total_size = file_list_to_filesize(all_files) | |
with gzip.open(csv_files['sources']['sources']['name'], 'wt', encoding='utf-8') as sources_csv, \ | |
gzip.open(csv_files['sources']['ids']['name'], 'wt', encoding='utf-8') as ids_csv, \ | |
gzip.open(csv_files['sources']['counts_by_year']['name'], 'wt', encoding='utf-8') as counts_by_year_csv: | |
sources_writer = csv.DictWriter( | |
sources_csv, fieldnames=csv_files['sources']['sources']['columns'], | |
extrasaction='ignore' | |
) | |
sources_writer.writeheader() | |
ids_writer = csv.DictWriter(ids_csv, | |
fieldnames=csv_files['sources']['ids'][ | |
'columns']) | |
ids_writer.writeheader() | |
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames= | |
csv_files['sources']['counts_by_year']['columns']) | |
counts_by_year_writer.writeheader() | |
seen_source_ids = set() | |
files_done = 0 | |
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'sources', '*', '*.gz')): | |
start_time_ms = current_milli_time() | |
with gzip.open(jsonl_file_name, 'r') as sources_jsonl: | |
for source_json in sources_jsonl: | |
if not source_json.strip(): | |
continue | |
source = json.loads(source_json) | |
if not (source_id := source.get( | |
'id')) or source_id in seen_source_ids: | |
continue | |
seen_source_ids.add(source_id) | |
source['issn'] = json.dumps(source.get('issn')) | |
sources_writer.writerow(source) | |
if source_ids := source.get('ids'): | |
source_ids['source_id'] = source_id | |
source_ids['issn'] = json.dumps(source_ids.get('issn')) | |
ids_writer.writerow(source_ids) | |
if counts_by_year := source.get('counts_by_year'): | |
for count_by_year in counts_by_year: | |
count_by_year['source_id'] = source_id | |
counts_by_year_writer.writerow(count_by_year) | |
data_type = 'sources' | |
spent_ms = current_milli_time() - start_time_ms | |
already_processed_files.append((data_type, jsonl_file_name)) | |
report_create_csv_progress(all_files, total_size, already_processed_files, jsonl_file_name, data_type, spent_ms) | |
files_done += 1 | |
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY: | |
break | |
def flatten_author_job(filename, sha): | |
global shutdown_event | |
global log_lock | |
assert log_lock is not None | |
assert shutdown_event is not None | |
start_time_ms = current_milli_time() | |
try: | |
assert shutdown_event is not None | |
if shutdown_event.is_set(): | |
return None | |
assert os.path.exists(filename) | |
flatten_author(filename, sha) | |
return os.getpid(), start_time_ms, 'authors', filename | |
except Exception: | |
log_info('Error occurred') | |
log_exception() | |
traceback.print_exc() | |
shutdown_event.set_error() | |
return None | |
def flatten_work_job(filename, sha): | |
global shutdown_event | |
global log_lock | |
assert log_lock is not None | |
assert shutdown_event is not None | |
try: | |
start_time_ms = current_milli_time() | |
assert shutdown_event is not None | |
if shutdown_event.is_set(): | |
return None | |
assert os.path.exists(filename) | |
flatten_work(filename, sha) | |
return os.getpid(), start_time_ms, 'works', filename | |
except Exception: | |
log_error('Error occurred') | |
log_exception() | |
traceback.print_exc() | |
shutdown_event.set_error() | |
return None | |
def flatten_work(filename, sha): | |
file_spec = csv_files['works'] | |
with gzip.open(sha_filename(file_spec['works']['name'], sha), 'wt', encoding='utf-8') as works_csv, \ | |
gzip.open(sha_filename(file_spec['primary_locations']['name'], sha), 'wt', encoding='utf-8') as primary_locations_csv, \ | |
gzip.open(sha_filename(file_spec['locations']['name'], sha), 'wt', encoding='utf-8') as locations, \ | |
gzip.open(sha_filename(file_spec['best_oa_locations']['name'], sha), 'wt', encoding='utf-8') as best_oa_locations, \ | |
gzip.open(sha_filename(file_spec['authorships']['name'], sha), 'wt', encoding='utf-8') as authorships_csv, \ | |
gzip.open(sha_filename(file_spec['biblio']['name'], sha), 'wt', encoding='utf-8') as biblio_csv, \ | |
gzip.open(sha_filename(file_spec['topics']['name'], sha), 'wt', encoding='utf-8') as topics_csv, \ | |
gzip.open(sha_filename(file_spec['concepts']['name'], sha), 'wt', encoding='utf-8') as concepts_csv, \ | |
gzip.open(sha_filename(file_spec['ids']['name'], sha), 'wt', encoding='utf-8') as ids_csv, \ | |
gzip.open(sha_filename(file_spec['mesh']['name'], sha), 'wt', encoding='utf-8') as mesh_csv, \ | |
gzip.open(sha_filename(file_spec['open_access']['name'], sha), 'wt', encoding='utf-8') as open_access_csv, \ | |
gzip.open(sha_filename(file_spec['referenced_works']['name'], sha), 'wt', encoding='utf-8') as referenced_works_csv, \ | |
gzip.open(sha_filename(file_spec['related_works']['name'], sha), 'wt', encoding='utf-8') as related_works_csv: | |
works_writer = init_dict_writer(works_csv, file_spec['works'], | |
extrasaction='ignore') | |
primary_locations_writer = init_dict_writer(primary_locations_csv, | |
file_spec[ | |
'primary_locations']) | |
locations_writer = init_dict_writer(locations, file_spec['locations']) | |
best_oa_locations_writer = init_dict_writer(best_oa_locations, | |
file_spec[ | |
'best_oa_locations']) | |
authorships_writer = init_dict_writer(authorships_csv, | |
file_spec['authorships']) | |
biblio_writer = init_dict_writer(biblio_csv, file_spec['biblio']) | |
topics_writer = init_dict_writer(topics_csv, file_spec['topics']) | |
concepts_writer = init_dict_writer(concepts_csv, file_spec['concepts']) | |
ids_writer = init_dict_writer(ids_csv, file_spec['ids'], | |
extrasaction='ignore') | |
mesh_writer = init_dict_writer(mesh_csv, file_spec['mesh']) | |
open_access_writer = init_dict_writer(open_access_csv, | |
file_spec['open_access']) | |
referenced_works_writer = init_dict_writer(referenced_works_csv, | |
file_spec[ | |
'referenced_works']) | |
related_works_writer = init_dict_writer(related_works_csv, | |
file_spec['related_works']) | |
jsonl_file_name = filename | |
with gzip.open(jsonl_file_name, 'r') as works_jsonl: | |
for work_json in works_jsonl: | |
if not work_json.strip(): | |
continue | |
work = json.loads(work_json) | |
if not (work_id := work.get('id')): | |
continue | |
# works | |
if (abstract := work.get( | |
'abstract_inverted_index')) is not None: | |
work['abstract_inverted_index'] = json.dumps(abstract, | |
ensure_ascii=False) | |
works_writer.writerow(work) | |
# primary_locations | |
if primary_location := (work.get('primary_location') or {}): | |
if primary_location.get( | |
'source') and primary_location.get( | |
'source').get('id'): | |
primary_locations_writer.writerow({ | |
'work_id': work_id, | |
'source_id': primary_location['source']['id'], | |
'landing_page_url': primary_location.get( | |
'landing_page_url'), | |
'pdf_url': primary_location.get('pdf_url'), | |
'is_oa': primary_location.get('is_oa'), | |
'version': primary_location.get('version'), | |
'license': primary_location.get('license'), | |
}) | |
# locations | |
if locations := work.get('locations'): | |
for location in locations: | |
if location.get('source') and location.get( | |
'source').get('id'): | |
locations_writer.writerow({ | |
'work_id': work_id, | |
'source_id': location['source']['id'], | |
'landing_page_url': location.get( | |
'landing_page_url'), | |
'pdf_url': location.get('pdf_url'), | |
'is_oa': location.get('is_oa'), | |
'version': location.get('version'), | |
'license': location.get('license'), | |
}) | |
# best_oa_locations | |
if best_oa_location := (work.get('best_oa_location') or {}): | |
if best_oa_location.get( | |
'source') and best_oa_location.get( | |
'source').get('id'): | |
best_oa_locations_writer.writerow({ | |
'work_id': work_id, | |
'source_id': best_oa_location['source']['id'], | |
'landing_page_url': best_oa_location.get( | |
'landing_page_url'), | |
'pdf_url': best_oa_location.get('pdf_url'), | |
'is_oa': best_oa_location.get('is_oa'), | |
'version': best_oa_location.get('version'), | |
'license': best_oa_location.get('license'), | |
}) | |
# authorships | |
if authorships := work.get('authorships'): | |
for authorship in authorships: | |
if author_id := authorship.get('author', {}).get( | |
'id'): | |
institutions = authorship.get('institutions') | |
institution_ids = [i.get('id') for i in | |
institutions] | |
institution_ids = [i for i in institution_ids if | |
i] | |
institution_ids = institution_ids or [None] | |
for institution_id in institution_ids: | |
authorships_writer.writerow({ | |
'work_id': work_id, | |
'author_position': authorship.get( | |
'author_position'), | |
'author_id': author_id, | |
'institution_id': institution_id, | |
'raw_affiliation_string': authorship.get( | |
'raw_affiliation_string'), | |
}) | |
# biblio | |
if biblio := work.get('biblio'): | |
biblio['work_id'] = work_id | |
biblio_writer.writerow(biblio) | |
# topics | |
for topic in work.get('topics', []): | |
if topic_id := topic.get('id'): | |
topics_writer.writerow({ | |
'work_id': work_id, | |
'topic_id': topic_id, | |
'score': topic.get('score') | |
}) | |
# concepts | |
for concept in work.get('concepts'): | |
if concept_id := concept.get('id'): | |
concepts_writer.writerow({ | |
'work_id': work_id, | |
'concept_id': concept_id, | |
'score': concept.get('score'), | |
}) | |
# ids | |
if ids := work.get('ids'): | |
ids['work_id'] = work_id | |
ids_writer.writerow(ids) | |
# mesh | |
for mesh in work.get('mesh'): | |
mesh['work_id'] = work_id | |
mesh_writer.writerow(mesh) | |
# open_access | |
if open_access := work.get('open_access'): | |
open_access['work_id'] = work_id | |
open_access_writer.writerow(open_access) | |
# referenced_works | |
for referenced_work in work.get('referenced_works'): | |
if referenced_work: | |
referenced_works_writer.writerow({ | |
'work_id': work_id, | |
'referenced_work_id': referenced_work | |
}) | |
# related_works | |
for related_work in work.get('related_works'): | |
if related_work: | |
related_works_writer.writerow({ | |
'work_id': work_id, | |
'related_work_id': related_work | |
}) | |
return filename | |
def init_dict_writer(csv_file, file_spec, **kwargs): | |
writer = csv.DictWriter( | |
csv_file, fieldnames=file_spec['columns'], **kwargs | |
) | |
writer.writeheader() | |
return writer | |
def init_worker_process(shutdown_evt, l_lock): | |
assert shutdown_evt is not None | |
assert l_lock is not None | |
global shutdown_event | |
global log_lock | |
assert shutdown_event is None | |
assert log_lock is None | |
shutdown_event = shutdown_evt | |
log_lock = l_lock | |
def launch_create_authors_works_csv_jobs(executor, file_to_sha): | |
processed_files = [] | |
do_process_files = [] | |
known_datatypes = ['authors', 'works'] | |
for data_type in known_datatypes: | |
assert os.path.exists(os.path.join(SNAPSHOT_DIR, 'data', data_type)) | |
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', data_type, '*', '*.gz')): | |
if jsonl_file_name in file_to_sha: | |
if is_done(STAGE_CREATE_CSV, data_type, jsonl_file_name, file_to_sha[jsonl_file_name]): | |
processed_files.append((data_type, jsonl_file_name)) | |
else: | |
do_process_files.append((data_type, jsonl_file_name)) | |
else: | |
log_error(f'File {jsonl_file_name} not found in file_to_sha dictionary.') | |
log_error('Is there some background sync job running?') | |
raise RuntimeError('File not found in file_to_sha dictionary') | |
log_info(f'Already processed {len(processed_files):_} authors and works SNAPSHOT files, {file_list_to_filesize(processed_files) // 1024 // 1024:_} MB') | |
log_info(f'Processing remaining {len(do_process_files):_} authors and works SNAPSHOT files, {file_list_to_filesize(do_process_files) // 1024 // 1024:_} MB') | |
futures = [] | |
for data_type, f in do_process_files: | |
assert data_type in data_types | |
sha = file_to_sha[f] | |
fut = None | |
if data_type == 'works': | |
fut = executor.submit(flatten_work_job, f, sha) | |
elif data_type == 'authors': | |
fut = executor.submit(flatten_author_job, f, sha) | |
else: | |
raise RuntimeError(f'Unhandled data type {data_type}') | |
assert fut is not None | |
futures.append(fut) | |
return processed_files, do_process_files, futures | |
def flatten_wait_authors_works_jobs(shutdown_evt, file_to_sha, all_files, already_processed_files, futures): | |
worker_pids = [] | |
total_size = file_list_to_filesize(all_files) | |
for fut in concurrent.futures.as_completed(futures): | |
if shutdown_evt.is_set(): | |
break | |
res = fut.result() | |
if res is None: | |
shutdown_evt.set_error() | |
else: | |
pid, start_time_ms, data_type, filename = res | |
assert data_type in data_types | |
spent_ms = current_milli_time() - start_time_ms | |
sha = file_to_sha[filename] | |
mark_file_done(STAGE_CREATE_CSV, data_type, filename, sha) | |
if pid not in worker_pids: | |
worker_pids.append(pid) | |
already_processed_files.append((data_type, filename)) | |
report_create_csv_progress(all_files, total_size, already_processed_files, filename, data_type, spent_ms) | |
def flatten_author_works(shutdown_evt, executor, file_to_sha, all_files): | |
log_info('Creating CSV files for authors and works ...') | |
already_processed_files, do_process_files, futures = launch_create_authors_works_csv_jobs(executor, file_to_sha) | |
flatten_wait_authors_works_jobs(shutdown_evt, file_to_sha, all_files, already_processed_files, futures) | |
return already_processed_files | |
def create_csv_files(shutdown_evt, executor, file_to_sha): | |
log_info('Creating CSV files ...') | |
all_files = get_all_files() | |
start_time_seconds = current_seconds_time() | |
processed_files = flatten_author_works(shutdown_evt, executor, file_to_sha, all_files) | |
done_time_seconds = current_seconds_time() - start_time_seconds | |
log_info(f'Created CSV files for authors and works ... Done! Spent {ms_to_eta(done_time_seconds*1000)}') | |
remaining_bytes = file_list_to_filesize(all_files) - file_list_to_filesize(processed_files) | |
log_info(f'Remaining MB to create CSV files for: {remaining_bytes//1024//1024:_}') | |
flatten_topics(processed_files, all_files) | |
flatten_concepts(processed_files, all_files) | |
flatten_institutions(processed_files, all_files) | |
flatten_publishers(processed_files, all_files) | |
flatten_sources(processed_files, all_files) | |
done_time_seconds = current_seconds_time() - start_time_seconds | |
log_info(f'Creating CSV files done! Spent {ms_to_eta(done_time_seconds * 1000)}') | |
def check_psql(): | |
log_info(f'Testing PSQL {PSQL} --version ...') | |
v = exec_verbose([PSQL, '--version']) | |
v = v.strip() | |
log_info(f'Testing PSQL {PSQL} --version ... OK. Version: {v}') | |
def wipe_init_db(): | |
# Kill connections: https://stackoverflow.com/questions/5408156/how-to-drop-a-postgresql-database-if-there-are-active-connections-to-it | |
log_info('Terminating previous sessions ...') | |
ok_kill = exec_ignore_errors([PSQL, '-d', 'postgres', '-c', f''' | |
SELECT pg_terminate_backend(pg_stat_activity.pid) | |
FROM pg_stat_activity | |
WHERE pg_stat_activity.datname = '{DB_NAME}' | |
AND pid <> pg_backend_pid(); | |
''']) | |
if ok_kill: | |
log_info('Terminating previous sessions ... OK') | |
else: | |
log_warn('Terminating previous sessions ... Failed! Continuing anyway') | |
log_info(f'Testing database {DB_NAME} is present ...') | |
ok_db_present = exec_ignore_errors([PSQL, '-d', DB_NAME, '-c', 'select 1']) | |
if ok_db_present: | |
log_info(f'Testing database {DB_NAME} is present ... OK') | |
else: | |
log_warn(f'Testing database {DB_NAME} is present ... Failed! Trying to create database ...') | |
log_info(f'Creating database {DB_NAME} ...') | |
exec_verbose([PSQL, '-d', 'postgres', '-c', f'CREATE DATABASE {DB_NAME}']) | |
log_info(f'Creating database {DB_NAME} ... OK') | |
log_info(f'Testing database {DB_NAME} is present again ...') | |
exec_verbose([PSQL, '-d', DB_NAME, '-c', 'select 1']) | |
log_info(f'Testing database {DB_NAME} is present again ... OK') | |
log_info('Dropping schema ...') | |
exec_verbose([PSQL, '-d', DB_NAME, '-c', 'DROP SCHEMA IF EXISTS openalex CASCADE']) | |
log_info('Dropping schema ... OK') | |
log_info('Creating base schema ...') | |
with open(SCHEMA_FILE, encoding='utf-8') as f: | |
exec_verbose([PSQL, '-d', DB_NAME, '-c', f.read()]) | |
log_info('Creating base schema ... OK') | |
def populate_subtable(prefix, subtable): | |
start_seconds = current_seconds_time() | |
field_names = ', '.join(csv_files[prefix][subtable]['columns']) | |
filename = csv_files[prefix][subtable]['name'] | |
sqlcmd = f"\\copy openalex.{prefix}_{subtable} ({field_names}) from program 'gunzip -c {filename}' csv header" | |
exec_verbose([PSQL, '-d', DB_NAME, '-c', sqlcmd]) | |
spent_seconds = current_seconds_time() - start_seconds | |
log_info(f'Populated openalex.{prefix}_{subtable} in {spent_seconds}s') | |
def populate_maintable(prefix): | |
start_seconds = current_seconds_time() | |
field_names = ', '.join(csv_files[prefix][prefix]['columns']) | |
filename = csv_files[prefix][prefix]['name'] | |
sqlcmd = f"\\copy openalex.{prefix} ({field_names}) from program 'gunzip -c {filename}' csv header" | |
exec_verbose([PSQL, '-d', DB_NAME, '-c', sqlcmd]) | |
spent_seconds = current_seconds_time() - start_seconds | |
log_info(f'Populated openalex.{prefix} in {spent_seconds}s') | |
def insert_small_tables(shutdown_evt, executor): | |
populate_maintable('concepts') | |
populate_subtable('concepts', 'ancestors') | |
populate_subtable('concepts', 'counts_by_year') | |
populate_subtable('concepts', 'ids') | |
populate_subtable('concepts', 'related_concepts') | |
populate_maintable('topics') | |
populate_maintable('institutions') | |
populate_subtable('institutions', 'ids') | |
populate_subtable('institutions', 'geo') | |
populate_subtable('institutions', 'associated_institutions') | |
populate_subtable('institutions', 'counts_by_year') | |
populate_maintable('publishers') | |
populate_subtable('publishers', 'counts_by_year') | |
populate_subtable('publishers', 'ids') | |
populate_maintable('sources') | |
populate_subtable('sources', 'ids') | |
populate_subtable('sources', 'counts_by_year') | |
def populate_table(table_name, columns, filename, sha): | |
assert isinstance(table_name, str) | |
assert isinstance(columns, list) | |
assert isinstance(filename, str) | |
filename = sha_filename(filename, sha) | |
assert os.path.exists(filename) | |
assert os.path.isfile(filename) | |
field_names = ', '.join(columns) | |
sqlcmd = f"\\copy openalex.{table_name} ({field_names}) from program 'gunzip -c {filename}' csv header" | |
exec_verbose([PSQL, '-d', DB_NAME, '-c', sqlcmd]) | |
def insert_authors_job(f, sha): | |
global shutdown_event | |
global log_lock | |
assert shutdown_event is not None | |
assert log_lock is not None | |
if shutdown_event.is_set(): | |
return None | |
start_ms = current_milli_time() | |
populate_table('authors', csv_files['authors']['authors']['columns'], csv_files['authors']['authors']['name'], sha) | |
for k, v in csv_files['authors'].items(): | |
if k == 'authors': | |
continue | |
populate_table(f'authors_{k}', v['columns'], v['name'], sha) | |
spent_ms = current_milli_time() - start_ms | |
return os.getpid(), 'author', f, spent_ms | |
def insert_works_job(f, sha): | |
global shutdown_event | |
global log_lock | |
assert shutdown_event is not None | |
assert log_lock is not None | |
if shutdown_event.is_set(): | |
return None | |
start_ms = current_milli_time() | |
populate_table('works', csv_files['works']['works']['columns'], csv_files['works']['works']['name'], sha) | |
for k, v in csv_files['works'].items(): | |
if k == 'works': | |
continue | |
populate_table(f'works_{k}', v['columns'], v['name'], sha) | |
spent_ms = current_milli_time() - start_ms | |
return os.getpid(), 'works', f, spent_ms | |
def report_progress(all_files, total_size, processed_size, prefix, file_no, filename, spent_ms): | |
fsize = file_size(filename) | |
if spent_ms == 0: | |
spent_ms = 1 | |
mb_per_second = (fsize / 1024 / 1024) / (spent_ms / 1000) | |
done_percent = processed_size*100 / total_size | |
fileno_str = f'{file_no:_}'.rjust(len(f'{len(all_files):_}'), ' ') | |
mb = f'{fsize // 1024 // 1024:_}'.rjust(len('1_234'), ' ') | |
mb_s = f'{mb_per_second:.1f}'.rjust(len('100.0'), ' ') | |
total_size_str = f'{total_size // 1024 // 1024:_}' | |
processed_mb = f'{processed_size // 1024 // 1024:_}'.rjust(len(total_size_str), ' ') | |
done_percent = f'{done_percent:.2f}'.rjust(len('100.00'), ' ') | |
abbr_filename_str = abbr_filename(all_files, filename) | |
log_info(f'{prefix} {fileno_str}/{len(all_files):_} {abbr_filename_str} {mb} MB, {mb_s} MB/s, total {processed_mb}/{total_size_str} MB. {done_percent}% done.') | |
def insert_wait_author_works_jobs(shutdown_evt, all_files, futures): | |
total_size = file_list_to_filesize(all_files) | |
log_info('Waiting for insert authors and works jobs to finish ...') | |
processed_size = 0 | |
for idx, fut in enumerate(concurrent.futures.as_completed(futures)): | |
if shutdown_evt.is_set(): | |
break | |
res = fut.result() | |
if res is None: | |
shutdown_evt.set_error() | |
else: | |
assert isinstance(res, tuple) | |
assert 4 == len(res) | |
pid, typ, filename, spent_ms = res | |
report_progress(all_files, total_size, processed_size, 'Inserted', idx + 1, filename, spent_ms) | |
def insert_author_works(shutdown_evt, executor, file_to_sha): | |
aw_files = [(typ, f) for typ, f in get_all_files() if typ in ['authors', 'works']] | |
log_info(f'Found {len(aw_files)} authors and works files') | |
filename_to_type = {} | |
for typ, f in aw_files: | |
filename_to_type[f] = typ | |
if typ == 'authors': | |
pass | |
elif typ == 'works': | |
pass | |
else: | |
raise RuntimeError(f'Unknown type: {typ}') | |
futures = [] | |
for typ, f in aw_files: | |
sha = file_to_sha[f] | |
if typ == 'authors': | |
fut = executor.submit(insert_authors_job, f, sha) | |
elif typ == 'works': | |
fut = executor.submit(insert_works_job, f, sha) | |
else: | |
raise RuntimeError(f'Unknown type {typ}') | |
assert fut is not None | |
futures.append(fut) | |
insert_wait_author_works_jobs(shutdown_evt, aw_files, futures) | |
def check_schema_file(): | |
if os.path.exists(SCHEMA_FILE): | |
if os.path.isfile(SCHEMA_FILE): | |
pass | |
else: | |
log_error(f'SCHEMA_FILE {SCHEMA_FILE} is not a file') | |
log_error(f'Exiting') | |
raise ExecException() | |
else: | |
log_error(f'SCHEMA_FILE {SCHEMA_FILE} does not exist') | |
log_error(f'Exiting') | |
raise ExecException() | |
def show_config(): | |
log_info(banner('Configuration and environment')) | |
log_info(f'Given command line arguments: {sys.argv[1:]}') | |
env_variables = ['PATH', 'PGHOST', 'PGUSER', 'PGPASSWORD', 'PGPASSFILE', 'PGPORT'] | |
env_variables.sort() | |
for env_var in env_variables: | |
if env_var in os.environ: | |
if env_var != 'PGPASSWORD' and env_var != 'PGPASSFILE': | |
log_info(f'Environment variable {env_var} is set to {os.environ[env_var]}') | |
else: | |
log_info(f'Environment variable {env_var} is set to ***') | |
else: | |
if env_var == 'PGUSER': | |
log_info(f'Environment variable {env_var} is not set. Will default to {getpass.getuser()}') | |
else: | |
log_info(f'Environment variable {env_var} is not set') | |
log_info(f'Using AWS_CLI {AWS_CLI}') | |
log_info(f'Using CSV_DIR {CSV_DIR}') | |
log_info(f'Using DB_NAME {DB_NAME}') | |
log_info(f'Using DONE_FILE {DONE_FILE}') | |
log_info(f'Using FORCE_WIPE {FORCE_WIPE}') | |
log_info(f'Using LOG_FILE {LOG_FILE}') | |
log_info(f'Using PSQL {PSQL}') | |
log_info(f'Using SCHEMA_FILE {SCHEMA_FILE}') | |
log_info(f'Using SNAPSHOT_DIR {SNAPSHOT_DIR}') | |
log_info(f'Using SYNC {SYNC}') | |
log_info(f'Using WORKER_COUNT {WORKER_COUNT}') | |
log_info(f'Running as USER {getpass.getuser()}') | |
log_info(f'CWD is {os.getcwd()}') | |
check_schema_file() | |
check_psql() | |
check_aws_cli(AWS_CLI) | |
log_info(banner('End of configuration and environment')) | |
def run_main(shutdown_evt, l_lock): | |
show_config() | |
if SHOW_CONFIG: | |
return | |
if SYNC: | |
aws_sync(AWS_CLI, SNAPSHOT_DIR) | |
else: | |
log_warn('Not syncing s3://openalex ...') | |
with concurrent.futures.ProcessPoolExecutor(max_workers=WORKER_COUNT, initargs=(shutdown_evt, l_lock), initializer=init_worker_process) as executor: | |
try: | |
all_files = get_all_files() | |
file_to_sha = get_file_to_sha_map_verbose(shutdown_evt, executor, all_files) | |
sha_str = '' | |
for _, filename in all_files: | |
sha_str += filename + ':' + file_to_sha[filename] + '\n' | |
hs = hashlib.sha256(sha_str.encode('utf-8')).hexdigest() | |
log_info(f'DB hash is {hs}') | |
if FORCE_WIPE: | |
log_warn('Forcing wipe of database and re-import') | |
if os.path.exists(DONE_FILE): | |
os.remove(DONE_FILE) | |
if is_done(STAGE_IMPORT_DB, 'db_hash', 'db_hash', hs): | |
log_info('DB is up to date, doing nothing') | |
else: | |
log_info('DB is out of date, creating CSV files and importing') | |
wipe_init_db() | |
create_csv_files(shutdown_evt, executor, file_to_sha) | |
insert_small_tables(shutdown_evt, executor) | |
insert_author_works(shutdown_evt, executor, file_to_sha) | |
mark_file_done(STAGE_IMPORT_DB, 'db_hash', 'db_hash', hs) | |
except KeyboardInterrupt as e: | |
log_info('CTRL-C received, shutting down') | |
shutdown_evt.set() | |
raise e | |
except ExecException as e: | |
shutdown_evt.set_error() | |
log_error('Executing command failed!') | |
raise e | |
except Exception: | |
log_exception() | |
log_error('Main program crashed!') | |
shutdown_evt.set_error() | |
finally: | |
shutdown_evt.set() | |
def run_main_outer(): | |
global log_lock | |
multiprocessing.set_start_method('spawn') | |
start_time = current_milli_time() | |
shutdown_evt = Event2() | |
log_lock = multiprocessing.Lock() | |
exit_code = 0 | |
for arg in sys.argv[1:]: | |
if arg.startswith('-'): | |
if arg not in valid_args: | |
log_error(f'Invalid argument given: {arg}') | |
log_error('Exiting') | |
return 1 | |
try: | |
run_main(shutdown_evt, log_lock) | |
if shutdown_evt.is_error(): | |
exit_code = 1 | |
else: | |
exit_code = 0 | |
return exit_code | |
except ExecException: | |
exit_code = 1 | |
return exit_code | |
except Exception: | |
log_exception() | |
log_error('Main program crashed!') | |
shutdown_evt.set_error() | |
exit_code = 1 | |
return exit_code | |
except KeyboardInterrupt: | |
shutdown_evt.set() | |
exit_code = 130 | |
return exit_code | |
finally: | |
log_info(f'Main exiting with exit code {exit_code}. Spent {ms_to_eta(current_milli_time() - start_time)}.') | |
return exit_code | |
if __name__ == '__main__': | |
sys.exit(run_main_outer()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment