Skip to content

Instantly share code, notes, and snippets.

@ivarref
Last active February 25, 2025 09:26
Show Gist options
  • Save ivarref/0c7375a9fe244f83be3937f3517335d2 to your computer and use it in GitHub Desktop.
Save ivarref/0c7375a9fe244f83be3937f3517335d2 to your computer and use it in GitHub Desktop.
#!/usr/bin/env -S python3 -u
import csv
import datetime
import glob
import gzip
import json
import os
import sys
import time
import hashlib
import concurrent.futures
import multiprocessing
import traceback
import subprocess
import getpass
import pathlib
valid_args = set()
def get_arg(argname, default=None):
assert default is not None
valid_args.add(argname)
if argname in sys.argv:
return sys.argv[sys.argv.index(argname)+1]
else:
return default
def has_arg(argname):
valid_args.add(argname)
return argname in sys.argv
def has_some_arg(args):
for arg in args:
valid_args.add(arg)
for arg in args:
if arg in sys.argv:
return True
return False
# Location of aws cli
AWS_CLI = get_arg('--aws-cli', 'aws')
# Where to store gzipped flattened CSV files
CSV_DIR = get_arg('--csv-dir', '/database/openalex/csv-file')
# The database name to use
DB_NAME = get_arg('--db-name', 'openalex_testivar')
# File to keep track of which files have been flattened and DB hash
DONE_DIR = get_arg('--done-dir', '/database/openalex/done_files')
_DONE_FILE_DEFAULT = os.path.join('/database/openalex/done_files', 'done_files_' + DB_NAME + '.txt')
DONE_FILE = get_arg('--done-file', _DONE_FILE_DEFAULT)
# If this flag is set, the DONE_FILE will be removed before running.
# This will re-populate the database from scratch. Useful if CSV-producing changes is made to this script.
FORCE_WIPE = has_arg('--force-wipe')
# Logs will be stored here
LOG_DIR = get_arg('--log-dir', os.path.join('/database/openalex/logs'))
_LOG_FILE_DEFAULT = os.path.join(LOG_DIR, 'flatten_openalex_jsonl_' + DB_NAME + '_' + datetime.datetime.today().strftime('%Y-%m-%d') + '.log')
LOG_FILE = get_arg('--log-file', _LOG_FILE_DEFAULT)
# Location of psql executeable
PSQL = get_arg('--psql', 'psql')
# Location of base schema file, defaults to same directory as this file
SCHEMA_FILE = get_arg('--schema-file', os.path.join(os.path.dirname(os.path.realpath(__file__)), 'openalex-pg-schema.sql'))
# Where to read jsonl snapshot files from
SNAPSHOT_DIR = get_arg('--snapshot-dir', '/database/openalex/openalex-snapshot')
# Only show config, check existence of AWS_CLI and PSQL and then exit
SHOW_CONFIG = has_some_arg(['--help', '-h', '--config'])
# Whether to sync SNAPSHOT_DIR by using the aws cli
SYNC = True
FILES_PER_ENTITY = int(os.environ.get('OPENALEX_DEMO_FILES_PER_ENTITY', '0'))
def default_worker_count():
cpu_count = os.cpu_count()
if cpu_count is None:
return 2
else:
return cpu_count
WORKER_COUNT = int(get_arg('--worker-count', default_worker_count()))
def makedirs_or_exit(dir, nam):
try:
os.makedirs(dir, exist_ok=True)
except Exception:
print(traceback.format_exc())
print(f'Failed to create directory {dir}')
print(f'Please set {nam} correctly')
print('Exiting')
sys.exit(1)
makedirs_or_exit(CSV_DIR, 'CSV_DIR (--csv-dir)')
makedirs_or_exit(SNAPSHOT_DIR, 'SNAPSHOT_DIR (--snapshot-dir)')
def make_dirs_for_file_or_exit(filename, nam):
try:
path = pathlib.Path(filename)
path.parent.mkdir(parents=True, exist_ok=True)
except Exception:
print(traceback.format_exc())
print('Failed to create directory for file', filename)
print(f'Please set {nam} correctly')
print('Exiting')
sys.exit(1)
make_dirs_for_file_or_exit(DONE_FILE, 'DONE_FILE (--done-file)')
make_dirs_for_file_or_exit(LOG_FILE, 'LOG_FILE (--log-file)')
# Running as a crontab entry as a specific user:
# https://serverfault.com/questions/352835/crontab-running-as-a-specific-user
# $ cat /etc/cron.d/flatten_openalex_jsonl
#PATH=/bin:/usr/bin:/sbin:/usr/sbin
# The output of the command will be logged to LOG_FILE /database/openalex/logs/flatten_openalex_jsonl_DB_NAME_YYYY_mm_DD.log
# Run flatten-openalex every day at 03:30
# minute hour day-of-month month day-of-week user command
# 30 3 15 * * ire021 env PGHOST=127.0.0.1 /home/ire021/oalx3/flatten-openalex-jsonl.py --db-name openalex_test01 --aws-cli /usr/local/bin/aws
csv_files = {
'authors': {
'authors': {
'name': os.path.join(CSV_DIR, 'authors.csv.gz'),
'columns': [
'id', 'orcid', 'display_name', 'display_name_alternatives',
'works_count', 'cited_by_count',
'last_known_institution', 'works_api_url', 'updated_date',
]
},
'ids': {
'name': os.path.join(CSV_DIR, 'authors_ids.csv.gz'),
'columns': [
'author_id', 'openalex', 'orcid', 'scopus', 'twitter',
'wikipedia', 'mag'
]
},
'counts_by_year': {
'name': os.path.join(CSV_DIR, 'authors_counts_by_year.csv.gz'),
'columns': [
'author_id', 'year', 'works_count', 'cited_by_count',
'oa_works_count'
]
}
},
'concepts': {
'concepts': {
'name': os.path.join(CSV_DIR, 'concepts.csv.gz'),
'columns': [
'id', 'wikidata', 'display_name', 'level', 'description',
'works_count', 'cited_by_count', 'image_url',
'image_thumbnail_url', 'works_api_url', 'updated_date'
]
},
'ancestors': {
'name': os.path.join(CSV_DIR, 'concepts_ancestors.csv.gz'),
'columns': ['concept_id', 'ancestor_id']
},
'counts_by_year': {
'name': os.path.join(CSV_DIR, 'concepts_counts_by_year.csv.gz'),
'columns': ['concept_id', 'year', 'works_count', 'cited_by_count',
'oa_works_count']
},
'ids': {
'name': os.path.join(CSV_DIR, 'concepts_ids.csv.gz'),
'columns': ['concept_id', 'openalex', 'wikidata', 'wikipedia',
'umls_aui', 'umls_cui', 'mag']
},
'related_concepts': {
'name': os.path.join(CSV_DIR, 'concepts_related_concepts.csv.gz'),
'columns': ['concept_id', 'related_concept_id', 'score']
}
},
'topics': {
'topics': {
'name': os.path.join(CSV_DIR, 'topics.csv.gz'),
'columns': ['id', 'display_name', 'subfield_id',
'subfield_display_name', 'field_id',
'field_display_name',
'domain_id', 'domain_display_name', 'description',
'keywords', 'works_api_url', 'wikipedia_id',
'works_count', 'cited_by_count', 'updated_date']
}
},
'institutions': {
'institutions': {
'name': os.path.join(CSV_DIR, 'institutions.csv.gz'),
'columns': [
'id', 'ror', 'display_name', 'country_code', 'type',
'homepage_url', 'image_url', 'image_thumbnail_url',
'display_name_acronyms', 'display_name_alternatives',
'works_count', 'cited_by_count', 'works_api_url',
'updated_date'
]
},
'ids': {
'name': os.path.join(CSV_DIR, 'institutions_ids.csv.gz'),
'columns': [
'institution_id', 'openalex', 'ror', 'grid', 'wikipedia',
'wikidata', 'mag'
]
},
'geo': {
'name': os.path.join(CSV_DIR, 'institutions_geo.csv.gz'),
'columns': [
'institution_id', 'city', 'geonames_city_id', 'region',
'country_code', 'country', 'latitude',
'longitude'
]
},
'associated_institutions': {
'name': os.path.join(CSV_DIR,
'institutions_associated_institutions.csv.gz'),
'columns': [
'institution_id', 'associated_institution_id', 'relationship'
]
},
'counts_by_year': {
'name': os.path.join(CSV_DIR, 'institutions_counts_by_year.csv.gz'),
'columns': [
'institution_id', 'year', 'works_count', 'cited_by_count',
'oa_works_count'
]
}
},
'publishers': {
'publishers': {
'name': os.path.join(CSV_DIR, 'publishers.csv.gz'),
'columns': [
'id', 'display_name', 'alternate_titles', 'country_codes',
'hierarchy_level', 'parent_publisher',
'works_count', 'cited_by_count', 'sources_api_url',
'updated_date'
]
},
'counts_by_year': {
'name': os.path.join(CSV_DIR, 'publishers_counts_by_year.csv.gz'),
'columns': ['publisher_id', 'year', 'works_count', 'cited_by_count',
'oa_works_count']
},
'ids': {
'name': os.path.join(CSV_DIR, 'publishers_ids.csv.gz'),
'columns': ['publisher_id', 'openalex', 'ror', 'wikidata']
},
},
'sources': {
'sources': {
'name': os.path.join(CSV_DIR, 'sources.csv.gz'),
'columns': [
'id', 'issn_l', 'issn', 'display_name', 'publisher',
'works_count', 'cited_by_count', 'is_oa',
'is_in_doaj', 'homepage_url', 'works_api_url', 'updated_date'
]
},
'ids': {
'name': os.path.join(CSV_DIR, 'sources_ids.csv.gz'),
'columns': ['source_id', 'openalex', 'issn_l', 'issn', 'mag',
'wikidata', 'fatcat']
},
'counts_by_year': {
'name': os.path.join(CSV_DIR, 'sources_counts_by_year.csv.gz'),
'columns': ['source_id', 'year', 'works_count', 'cited_by_count',
'oa_works_count']
},
},
'works': {
'works': {
'name': os.path.join(CSV_DIR, 'works.csv.gz'),
'columns': [
'id', 'doi', 'title', 'display_name', 'publication_year',
'publication_date', 'type', 'cited_by_count',
'is_retracted', 'is_paratext', 'cited_by_api_url',
'abstract_inverted_index', 'language'
]
},
'primary_locations': {
'name': os.path.join(CSV_DIR, 'works_primary_locations.csv.gz'),
'columns': [
'work_id', 'source_id', 'landing_page_url', 'pdf_url', 'is_oa',
'version', 'license'
]
},
'locations': {
'name': os.path.join(CSV_DIR, 'works_locations.csv.gz'),
'columns': [
'work_id', 'source_id', 'landing_page_url', 'pdf_url', 'is_oa',
'version', 'license'
]
},
'best_oa_locations': {
'name': os.path.join(CSV_DIR, 'works_best_oa_locations.csv.gz'),
'columns': [
'work_id', 'source_id', 'landing_page_url', 'pdf_url', 'is_oa',
'version', 'license'
]
},
'authorships': {
'name': os.path.join(CSV_DIR, 'works_authorships.csv.gz'),
'columns': [
'work_id', 'author_position', 'author_id', 'institution_id',
'raw_affiliation_string'
]
},
'biblio': {
'name': os.path.join(CSV_DIR, 'works_biblio.csv.gz'),
'columns': [
'work_id', 'volume', 'issue', 'first_page', 'last_page'
]
},
'topics': {
'name': os.path.join(CSV_DIR, 'works_topics.csv.gz'),
'columns': [
'work_id', 'topic_id', 'score'
]
},
'concepts': {
'name': os.path.join(CSV_DIR, 'works_concepts.csv.gz'),
'columns': [
'work_id', 'concept_id', 'score'
]
},
'ids': {
'name': os.path.join(CSV_DIR, 'works_ids.csv.gz'),
'columns': [
'work_id', 'openalex', 'doi', 'mag', 'pmid', 'pmcid'
]
},
'mesh': {
'name': os.path.join(CSV_DIR, 'works_mesh.csv.gz'),
'columns': [
'work_id', 'descriptor_ui', 'descriptor_name', 'qualifier_ui',
'qualifier_name', 'is_major_topic'
]
},
'open_access': {
'name': os.path.join(CSV_DIR, 'works_open_access.csv.gz'),
'columns': [
'work_id', 'is_oa', 'oa_status', 'oa_url',
'any_repository_has_fulltext'
]
},
'referenced_works': {
'name': os.path.join(CSV_DIR, 'works_referenced_works.csv.gz'),
'columns': [
'work_id', 'referenced_work_id'
]
},
'related_works': {
'name': os.path.join(CSV_DIR, 'works_related_works.csv.gz'),
'columns': [
'work_id', 'related_work_id'
]
},
},
}
data_types = ['authors', 'topics', 'concepts', 'institutions', 'publishers', 'sources', 'works']
shutdown_event = None
log_lock = None
def add_log(lin):
with open(LOG_FILE, 'a', encoding='utf-8') as f:
f.write(lin + '\n')
def log_info(m):
global log_lock
assert log_lock is not None
now_str = str(datetime.datetime.now())
now_str = now_str.split(".")[0]
s = f'{now_str} INFO {m}'
with log_lock:
print(s, flush=True)
add_log(s)
def log_warn(m):
global log_lock
assert log_lock is not None
now_str = str(datetime.datetime.now())
now_str = now_str.split(".")[0]
s = f'{now_str} WARN {m}'
with log_lock:
print(s, flush=True)
add_log(s)
def log_error(m):
global log_lock
assert log_lock is not None
now_str = str(datetime.datetime.now())
now_str = now_str.split(".")[0]
s = f'{now_str} ERROR {m}'
with log_lock:
print(s, flush=True)
add_log(s)
def log_exception():
traceback.print_exc()
s = traceback.format_exc()
for lin in s.splitlines():
log_error(lin)
def sha_filename(f, sha):
assert f.endswith('.csv.gz')
f = f[:-len('.csv.gz')]
return f'{f}.sha_{sha}.csv.gz'
def current_milli_time():
return time.time_ns() // 1_000_000
def current_seconds_time():
return time.time_ns() // 1_000_000_000
def ms_to_eta(milliseconds):
seconds = milliseconds / 1000
(days, seconds) = divmod(seconds, int(24*3600))
(hours, seconds) = divmod(seconds, 3600)
(minutes, seconds) = divmod(seconds, 60)
if days > 0:
return f"{int(days):} days, {hours:02.0f}:{minutes:02.0f}:{seconds:02.0f}"
elif hours > 0:
return f"{hours:02.0f}:{minutes:02.0f}:{seconds:02.0f}"
else:
return f"{minutes:02.0f}:{seconds:02.0f}"
def file_list_to_filesize(file_list):
total = 0
for data_type, filename in file_list:
total += file_size(filename)
return total
def exec_verbose(args):
try:
res = subprocess.run(args, capture_output=True)
if res.returncode != 0:
log_error(f'Command {args}')
log_error(f'Failed with exit code {res.returncode}')
if 0 == len(res.stdout.splitlines()):
log_error('Stdout: <empty>')
else:
log_error('Stdout:')
log_error('='*80)
for lin in res.stdout.splitlines():
try:
lin_decoded = lin.decode('utf-8')
log_error(lin_decoded)
except UnicodeDecodeError:
log_error(lin)
if 0 == len(res.stderr.splitlines()):
log_error('Stderr: <empty>')
else:
log_error('Stderr:')
log_error('='*80)
for lin in res.stderr.splitlines():
try:
lin_decoded = lin.decode('utf-8')
log_error(lin_decoded)
except UnicodeDecodeError:
log_error(lin)
log_error('='*80)
log_error(f'Command {args} failed with exit code {res.returncode}')
raise ExecException()
else:
# info(f'CMD {str(args)} succeeded')
sout = ''
for lin in res.stdout.splitlines():
try:
sout += lin.decode('utf-8')
except UnicodeDecodeError:
sout += str(lin)
sout += '\n'
return sout
except FileNotFoundError:
log_error(f'Executeable "{args[0]}" was not found!')
log_error(f'Full command: {args}')
raise ExecException()
def exec_verbose_no_capture(args):
res = subprocess.run(args, capture_output=False, stdout=sys.stdout, stderr=sys.stderr)
if res.returncode != 0:
log_error(f'Command {args}')
log_error(f'Failed with return code {res.returncode}')
raise ExecException()
def exec_ignore_errors(args):
res = subprocess.run(args, capture_output=True)
if res.returncode == 0:
return True
else:
return False
class ExecException(Exception):
pass
def sha256sum(filename):
with open(filename, 'rb', buffering=0) as f:
return hashlib.file_digest(f, 'sha1').hexdigest()
def file_size(f):
assert os.path.isfile(f)
return os.path.getsize(f)
def abbr_filename(all_files, f):
maxlen = 0
for _typ, af in all_files:
lastpart = af.split(os.path.sep)
last = os.path.sep.join(lastpart[-3:])
maxlen = max(len(last), maxlen)
paths = f.split(os.path.sep)
# (SNAPSHOT_DIR, 'data', 'topics', '*', '*.gz')
v = os.path.sep.join(paths[-3:])
return v.ljust(maxlen)
def calc_sha_file_job(f):
global shutdown_event
assert shutdown_event is not None
try:
if shutdown_event.is_set():
return None
else:
sha = sha256sum(f)
return f, sha
except KeyboardInterrupt:
shutdown_event.set()
raise
def get_file_to_sha_map(shutdown_evt, executor, all_files):
mapp = {}
futures = []
for f in all_files:
fut = executor.submit(calc_sha_file_job, f)
futures.append(fut)
for fut in concurrent.futures.as_completed(futures):
if shutdown_evt.is_set():
break
res = fut.result()
if res is None:
shutdown_evt.set_error()
else:
assert isinstance(res, tuple)
assert 2 == len(res)
f, sha = res
mapp[f] = sha
return mapp
def get_file_to_sha_map_verbose(shutdown_evt, executor, all_files):
log_info('Getting file to sha dict ...')
start_time_seconds_sha = current_seconds_time()
file_to_sha = get_file_to_sha_map(shutdown_evt, executor, [x[1] for x in all_files])
log_info(f'Getting file to sha dict ... Done in {current_seconds_time() - start_time_seconds_sha}s')
return file_to_sha
STAGE_CREATE_CSV = 'create_csv'
STAGE_IMPORT_DB = 'import_db'
def mark_file_done(stage, typ, filename, sha):
assert '\n' not in filename
assert '\r' not in filename
assert ':' not in filename
assert typ in data_types or 'db_hash' == typ
assert stage in [STAGE_CREATE_CSV, STAGE_IMPORT_DB]
with open(DONE_FILE, 'a', encoding='utf-8') as f:
f.write(f'{stage}:{typ}:{sha}:{filename}\n')
def is_done(stage, typ, filename, sha):
assert '\n' not in filename
assert ':' not in filename
assert '\r' not in filename
assert typ in data_types or 'db_hash' == typ
assert stage in [STAGE_CREATE_CSV, STAGE_IMPORT_DB]
signature = f'{stage}:{typ}:{sha}:{filename}'
if os.path.exists(DONE_FILE):
with open(DONE_FILE, 'r', encoding='utf-8') as f:
for lin in f.readlines():
lin = lin.strip()
if lin == signature:
return True
return False
def get_all_files():
all_files = []
for data_type in data_types:
assert os.path.exists(os.path.join(SNAPSHOT_DIR, 'data', data_type))
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', data_type, '*', '*.gz')):
all_files.append((data_type, jsonl_file_name))
all_files.sort(key=lambda x: x[1])
return all_files
def get_all_files_allow_missing():
all_files = []
for data_type in data_types:
if os.path.exists(os.path.join(SNAPSHOT_DIR, 'data', data_type)):
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', data_type, '*', '*.gz')):
all_files.append((data_type, jsonl_file_name))
all_files.sort(key=lambda x: x[1])
return all_files
class Event2(object):
def __init__(self, *, ctx=None):
if ctx is None:
ctx = multiprocessing.get_context()
self._flag = ctx.Value('i', 0)
# Allocate a ctypes.c_ulonglong to hold the set_id:
# Represents the C unsigned long long datatype.
# The constructor accepts an optional integer initializer; no overflow checking is done.
# From https://docs.python.org/3/library/ctypes.html#ctypes.c_ulonglong
# See multiprocessing/sharedctypes.py for typecode to ctypes definitions
self._set_id = ctx.Value('Q', 0)
def is_set(self):
# From https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Value :
# If lock is True (the default) then a new recursive lock object is created to synchronize access to the value.
with self._flag:
return self._flag.value == 1 or self._flag.value == 2
def is_error(self):
# From https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Value :
# If lock is True (the default) then a new recursive lock object is created to synchronize access to the value.
with self._flag:
return self._flag.value == 2
def set_error(self):
with self._flag:
with self._set_id:
if self._flag.value == 0 or self._flag.value == 1:
self._flag.value = 2
self._set_id.value += 1
def set(self):
# If `set` is called from a signal handler, this is fine as the lock is recursive (i.e. it won't deadlock).
# If the thread interrupted by the signal handler is wait()-ing and the signal handler calls set(),
# this is fine as wait() spins on the value. Thus, after the signal handler is done, the thread will
# return from wait()
# Fixes https://github.com/python/cpython/issues/85772
with self._flag:
with self._set_id:
if self._flag.value == 0:
# There is a theoretical chance of race here. It requires the following conditions:
# The interrupted thread must be wait()ing.
# Then set must be called reentrant for the maximum value of c_ulonglong times,
# and all interruptions must happen exactly after `if self._flag.value == 0:`.
# The _set_id value will then wrap around. Then clear() must be called
# before the original wait() code continues. The wait() code will then continue
# to (incorrectly) wait. I think this case is safe to ignore. The stack
# will grow too large before there is any chance of this actually happening.
self._flag.value = 1
self._set_id.value += 1
# There is no race here by reentrant set when reaching the maximum value for `self._set_id.value`.
# ctypes.c_ulonglong will overflow without any exception:
# https://docs.python.org/3/library/ctypes.html#ctypes.c_ulonglong
# > no overflow checking is done.
# This means that we do not need to check if some maximum value is reached:
# C will wrap around the value for us.
def clear(self):
with self._flag:
self._flag.value = 0
def wait(self, timeout=None):
start_time = time.monotonic()
set_id = self._set_id.value
while True:
if self._flag.value == 1 or self._flag.value == 2:
return True
elif set_id != self._set_id.value:
return True # flag is unset, but set_id changed, so there must have been a `set` followed by a `clear`
# during `time.sleep()`. Fixes https://github.com/python/cpython/issues/95826
elif timeout is not None and (time.monotonic() - start_time) > timeout:
return False
else:
# Fixes https://github.com/python/cpython/issues/85772 by spinning and sleeping.
time.sleep(0.010) # sleep 10 milliseconds
def __repr__(self) -> str:
set_status = 'set' if self.is_set() else 'unset'
return f"<{type(self).__qualname__} at {id(self):#x} {set_status}>"
def check_aws_cli(aws_cli):
log_info(f'Testing AWS_CLI {aws_cli} ...')
cmd = [aws_cli, '--version']
sout = exec_verbose(cmd).strip()
log_info(f'Testing AWS_CLI {aws_cli} ... OK. Version: {sout}')
def aws_sync(aws_cli, snapshot_dir):
old_files = get_all_files_allow_missing()
cmd = [aws_cli,
's3',
'sync',
's3://openalex',
snapshot_dir,
'--no-sign-request',
'--delete']
# If you download the snapshot into an existing folder, you'll need to use the aws s3 sync --delete flag to remove files from any previous downloads. You can also remove the contents of destination folder manually. If you don't, you will see duplicate Entities that have moved from one file to another between snapshot updates.
# https://docs.openalex.org/download-all-data/download-to-your-machine
log_info(f'Syncing s3://openalex by executing {cmd} ...')
exec_verbose_no_capture(cmd)
log_info(f'Syncing s3://openalex by executing {cmd} ... OK')
new_files = get_all_files()
file_count_diff = len(new_files) - len(old_files)
new_and_old_files = new_files.copy()
for fil in old_files:
if fil not in new_and_old_files:
new_and_old_files.append(fil)
if file_count_diff == 0:
log_info('No new files in s3://openalex')
else:
for fil in new_files:
if fil not in old_files:
filename = fil[1]
log_info(f'File {abbr_filename(new_and_old_files, filename)} was added')
for fil in old_files:
if fil not in new_files:
filename = fil[1]
log_info(f'File {abbr_filename(new_and_old_files, filename)} was removed')
def banner(txt, length=80):
banner_txt = ' ' + txt + ' '
pad_left = 25
pad_right = length - len(banner_txt) - pad_left
return '='* pad_left + banner_txt + '=' * pad_right
def report_create_csv_progress(all_files, total_size, processed_files, filename, data_type, spent_ms):
file_no = processed_files.index((data_type, filename)) + 1
report_progress(all_files, total_size, file_list_to_filesize(processed_files), 'Created CSV for', file_no, filename, spent_ms)
def flatten_author(filename, sha):
file_spec = csv_files['authors']
with gzip.open(sha_filename(file_spec['authors']['name'], sha), 'wt', encoding='utf-8') as authors_csv, \
gzip.open(sha_filename(file_spec['ids']['name'], sha), 'wt', encoding='utf-8') as ids_csv, \
gzip.open(sha_filename(file_spec['counts_by_year']['name'], sha), 'wt', encoding='utf-8') as counts_by_year_csv:
authors_writer = csv.DictWriter(authors_csv, fieldnames=file_spec['authors']['columns'], extrasaction='ignore')
authors_writer.writeheader()
ids_writer = csv.DictWriter(ids_csv, fieldnames=file_spec['ids']['columns'])
ids_writer.writeheader()
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=file_spec['counts_by_year']['columns'])
counts_by_year_writer.writeheader()
files_done = 0
jsonl_file_name = filename
with gzip.open(jsonl_file_name, 'r') as authors_jsonl:
for author_json in authors_jsonl:
if not author_json.strip():
continue
author = json.loads(author_json)
if not (author_id := author.get('id')):
continue
# authors
author['display_name_alternatives'] = json.dumps(
author.get('display_name_alternatives'),
ensure_ascii=False)
author['last_known_institution'] = (
author.get('last_known_institution') or {}).get(
'id')
authors_writer.writerow(author)
# ids
if author_ids := author.get('ids'):
author_ids['author_id'] = author_id
ids_writer.writerow(author_ids)
# counts_by_year
if counts_by_year := author.get('counts_by_year'):
for count_by_year in counts_by_year:
count_by_year['author_id'] = author_id
counts_by_year_writer.writerow(count_by_year)
files_done += 1
def flatten_topics(already_processed_files, all_files):
total_size = file_list_to_filesize(all_files)
data_type = 'topics'
with gzip.open(csv_files['topics']['topics']['name'], 'wt', encoding='utf-8') as topics_csv:
topics_writer = csv.DictWriter(topics_csv, fieldnames=csv_files['topics']['topics']['columns'])
topics_writer.writeheader()
seen_topic_ids = set()
files_done = 0
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'topics', '*', '*.gz')):
start_time_ms = current_milli_time()
with gzip.open(jsonl_file_name, 'r') as topics_jsonl:
for line in topics_jsonl:
if not line.strip():
continue
topic = json.loads(line)
topic['keywords'] = '; '.join(topic.get('keywords', ''))
if not (
topic_id := topic.get('id')) or topic_id in seen_topic_ids:
continue
seen_topic_ids.add(topic_id)
for key in ('subfield', 'field', 'domain'):
topic[f'{key}_id'] = topic[key]['id']
topic[f'{key}_display_name'] = topic[key]['display_name']
del topic[key]
topic['updated_date'] = topic['updated']
del topic['updated']
topic['wikipedia_id'] = topic['ids'].get('wikipedia')
del topic['ids']
del topic['created_date']
del topic['siblings']
# info(str(topic['siblings']))
topics_writer.writerow(topic)
spent_ms = current_milli_time() - start_time_ms
already_processed_files.append((data_type, jsonl_file_name))
report_create_csv_progress(all_files, total_size, already_processed_files, jsonl_file_name, data_type, spent_ms)
files_done += 1
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY:
break
def flatten_concepts(already_processed_files, all_files):
total_size = file_list_to_filesize(all_files)
data_type = 'topics'
with gzip.open(csv_files['concepts']['concepts']['name'], 'wt', encoding='utf-8') as concepts_csv, \
gzip.open(csv_files['concepts']['ancestors']['name'], 'wt', encoding='utf-8') as ancestors_csv, \
gzip.open(csv_files['concepts']['counts_by_year']['name'], 'wt', encoding='utf-8') as counts_by_year_csv, \
gzip.open(csv_files['concepts']['ids']['name'], 'wt', encoding='utf-8') as ids_csv, \
gzip.open(csv_files['concepts']['related_concepts']['name'], 'wt', encoding='utf-8') as related_concepts_csv:
concepts_writer = csv.DictWriter(concepts_csv, fieldnames=csv_files['concepts']['concepts']['columns'],extrasaction='ignore')
concepts_writer.writeheader()
ancestors_writer = csv.DictWriter(ancestors_csv, fieldnames=
csv_files['concepts']['ancestors']['columns'])
ancestors_writer.writeheader()
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=
csv_files['concepts']['counts_by_year']['columns'])
counts_by_year_writer.writeheader()
ids_writer = csv.DictWriter(ids_csv,
fieldnames=csv_files['concepts']['ids'][
'columns'])
ids_writer.writeheader()
related_concepts_writer = csv.DictWriter(related_concepts_csv,
fieldnames=
csv_files['concepts'][
'related_concepts'][
'columns'])
related_concepts_writer.writeheader()
seen_concept_ids = set()
files_done = 0
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'concepts', '*', '*.gz')):
start_time_ms = current_milli_time()
with gzip.open(jsonl_file_name, 'r') as concepts_jsonl:
for concept_json in concepts_jsonl:
if not concept_json.strip():
continue
concept = json.loads(concept_json)
if not (concept_id := concept.get(
'id')) or concept_id in seen_concept_ids:
continue
seen_concept_ids.add(concept_id)
concepts_writer.writerow(concept)
if concept_ids := concept.get('ids'):
concept_ids['concept_id'] = concept_id
concept_ids['umls_aui'] = json.dumps(
concept_ids.get('umls_aui'), ensure_ascii=False)
concept_ids['umls_cui'] = json.dumps(
concept_ids.get('umls_cui'), ensure_ascii=False)
ids_writer.writerow(concept_ids)
if ancestors := concept.get('ancestors'):
for ancestor in ancestors:
if ancestor_id := ancestor.get('id'):
ancestors_writer.writerow({
'concept_id': concept_id,
'ancestor_id': ancestor_id
})
if counts_by_year := concept.get('counts_by_year'):
for count_by_year in counts_by_year:
count_by_year['concept_id'] = concept_id
counts_by_year_writer.writerow(count_by_year)
if related_concepts := concept.get('related_concepts'):
for related_concept in related_concepts:
if related_concept_id := related_concept.get('id'):
related_concepts_writer.writerow({
'concept_id': concept_id,
'related_concept_id': related_concept_id,
'score': related_concept.get('score')
})
spent_ms = current_milli_time() - start_time_ms
already_processed_files.append((data_type, jsonl_file_name))
report_create_csv_progress(all_files, total_size, already_processed_files, jsonl_file_name, data_type, spent_ms)
files_done += 1
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY:
break
def flatten_institutions(already_processed_files, all_files):
file_spec = csv_files['institutions']
total_size = file_list_to_filesize(all_files)
with gzip.open(file_spec['institutions']['name'], 'wt', encoding='utf-8') as institutions_csv, \
gzip.open(file_spec['ids']['name'], 'wt', encoding='utf-8') as ids_csv, \
gzip.open(file_spec['geo']['name'], 'wt', encoding='utf-8') as geo_csv, \
gzip.open(file_spec['associated_institutions']['name'], 'wt', encoding='utf-8') as associated_institutions_csv, \
gzip.open(file_spec['counts_by_year']['name'], 'wt', encoding='utf-8') as counts_by_year_csv:
institutions_writer = csv.DictWriter(
institutions_csv, fieldnames=file_spec['institutions']['columns'],
extrasaction='ignore'
)
institutions_writer.writeheader()
ids_writer = csv.DictWriter(ids_csv,
fieldnames=file_spec['ids']['columns'])
ids_writer.writeheader()
geo_writer = csv.DictWriter(geo_csv,
fieldnames=file_spec['geo']['columns'])
geo_writer.writeheader()
associated_institutions_writer = csv.DictWriter(
associated_institutions_csv,
fieldnames=file_spec['associated_institutions']['columns']
)
associated_institutions_writer.writeheader()
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=
file_spec['counts_by_year']['columns'])
counts_by_year_writer.writeheader()
seen_institution_ids = set()
files_done = 0
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'institutions', '*', '*.gz')):
start_time_ms = current_milli_time()
with gzip.open(jsonl_file_name, 'r') as institutions_jsonl:
for institution_json in institutions_jsonl:
if not institution_json.strip():
continue
institution = json.loads(institution_json)
if not (institution_id := institution.get(
'id')) or institution_id in seen_institution_ids:
continue
seen_institution_ids.add(institution_id)
# institutions
institution['display_name_acronyms'] = json.dumps(
institution.get('display_name_acronyms'),
ensure_ascii=False)
institution['display_name_alternatives'] = json.dumps(
institution.get('display_name_alternatives'),
ensure_ascii=False)
institutions_writer.writerow(institution)
# ids
if institution_ids := institution.get('ids'):
institution_ids['institution_id'] = institution_id
ids_writer.writerow(institution_ids)
# geo
if institution_geo := institution.get('geo'):
institution_geo['institution_id'] = institution_id
geo_writer.writerow(institution_geo)
# associated_institutions
if associated_institutions := institution.get(
'associated_institutions',
institution.get('associated_insitutions')
# typo in api
):
for associated_institution in associated_institutions:
if associated_institution_id := associated_institution.get(
'id'):
associated_institutions_writer.writerow({
'institution_id': institution_id,
'associated_institution_id': associated_institution_id,
'relationship': associated_institution.get(
'relationship')
})
# counts_by_year
if counts_by_year := institution.get('counts_by_year'):
for count_by_year in counts_by_year:
count_by_year['institution_id'] = institution_id
counts_by_year_writer.writerow(count_by_year)
spent_ms = current_milli_time() - start_time_ms
data_type = 'institutions'
already_processed_files.append((data_type, jsonl_file_name))
report_create_csv_progress(all_files, total_size, already_processed_files, jsonl_file_name, data_type, spent_ms)
files_done += 1
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY:
break
def flatten_publishers(already_processed_files, all_files):
total_size = file_list_to_filesize(all_files)
with gzip.open(csv_files['publishers']['publishers']['name'], 'wt', encoding='utf-8') as publishers_csv, \
gzip.open(csv_files['publishers']['counts_by_year']['name'], 'wt', encoding='utf-8') as counts_by_year_csv, \
gzip.open(csv_files['publishers']['ids']['name'], 'wt', encoding='utf-8') as ids_csv:
publishers_writer = csv.DictWriter(
publishers_csv,
fieldnames=csv_files['publishers']['publishers']['columns'],
extrasaction='ignore'
)
publishers_writer.writeheader()
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=
csv_files['publishers']['counts_by_year']['columns'])
counts_by_year_writer.writeheader()
ids_writer = csv.DictWriter(ids_csv,
fieldnames=csv_files['publishers']['ids'][
'columns'])
ids_writer.writeheader()
seen_publisher_ids = set()
files_done = 0
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'publishers', '*', '*.gz')):
start_time_ms = current_milli_time()
with gzip.open(jsonl_file_name, 'r') as concepts_jsonl:
for publisher_json in concepts_jsonl:
if not publisher_json.strip():
continue
publisher = json.loads(publisher_json)
if not (publisher_id := publisher.get(
'id')) or publisher_id in seen_publisher_ids:
continue
seen_publisher_ids.add(publisher_id)
# publishers
publisher['alternate_titles'] = json.dumps(
publisher.get('alternate_titles'), ensure_ascii=False)
publisher['country_codes'] = json.dumps(
publisher.get('country_codes'), ensure_ascii=False)
publishers_writer.writerow(publisher)
if publisher_ids := publisher.get('ids'):
publisher_ids['publisher_id'] = publisher_id
ids_writer.writerow(publisher_ids)
if counts_by_year := publisher.get('counts_by_year'):
for count_by_year in counts_by_year:
count_by_year['publisher_id'] = publisher_id
counts_by_year_writer.writerow(count_by_year)
data_type = 'publishers'
spent_ms = current_milli_time() - start_time_ms
already_processed_files.append((data_type, jsonl_file_name))
report_create_csv_progress(all_files, total_size, already_processed_files, jsonl_file_name, data_type, spent_ms)
files_done += 1
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY:
break
def flatten_sources(already_processed_files, all_files):
total_size = file_list_to_filesize(all_files)
with gzip.open(csv_files['sources']['sources']['name'], 'wt', encoding='utf-8') as sources_csv, \
gzip.open(csv_files['sources']['ids']['name'], 'wt', encoding='utf-8') as ids_csv, \
gzip.open(csv_files['sources']['counts_by_year']['name'], 'wt', encoding='utf-8') as counts_by_year_csv:
sources_writer = csv.DictWriter(
sources_csv, fieldnames=csv_files['sources']['sources']['columns'],
extrasaction='ignore'
)
sources_writer.writeheader()
ids_writer = csv.DictWriter(ids_csv,
fieldnames=csv_files['sources']['ids'][
'columns'])
ids_writer.writeheader()
counts_by_year_writer = csv.DictWriter(counts_by_year_csv, fieldnames=
csv_files['sources']['counts_by_year']['columns'])
counts_by_year_writer.writeheader()
seen_source_ids = set()
files_done = 0
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', 'sources', '*', '*.gz')):
start_time_ms = current_milli_time()
with gzip.open(jsonl_file_name, 'r') as sources_jsonl:
for source_json in sources_jsonl:
if not source_json.strip():
continue
source = json.loads(source_json)
if not (source_id := source.get(
'id')) or source_id in seen_source_ids:
continue
seen_source_ids.add(source_id)
source['issn'] = json.dumps(source.get('issn'))
sources_writer.writerow(source)
if source_ids := source.get('ids'):
source_ids['source_id'] = source_id
source_ids['issn'] = json.dumps(source_ids.get('issn'))
ids_writer.writerow(source_ids)
if counts_by_year := source.get('counts_by_year'):
for count_by_year in counts_by_year:
count_by_year['source_id'] = source_id
counts_by_year_writer.writerow(count_by_year)
data_type = 'sources'
spent_ms = current_milli_time() - start_time_ms
already_processed_files.append((data_type, jsonl_file_name))
report_create_csv_progress(all_files, total_size, already_processed_files, jsonl_file_name, data_type, spent_ms)
files_done += 1
if FILES_PER_ENTITY and files_done >= FILES_PER_ENTITY:
break
def flatten_author_job(filename, sha):
global shutdown_event
global log_lock
assert log_lock is not None
assert shutdown_event is not None
start_time_ms = current_milli_time()
try:
assert shutdown_event is not None
if shutdown_event.is_set():
return None
assert os.path.exists(filename)
flatten_author(filename, sha)
return os.getpid(), start_time_ms, 'authors', filename
except Exception:
log_info('Error occurred')
log_exception()
traceback.print_exc()
shutdown_event.set_error()
return None
def flatten_work_job(filename, sha):
global shutdown_event
global log_lock
assert log_lock is not None
assert shutdown_event is not None
try:
start_time_ms = current_milli_time()
assert shutdown_event is not None
if shutdown_event.is_set():
return None
assert os.path.exists(filename)
flatten_work(filename, sha)
return os.getpid(), start_time_ms, 'works', filename
except Exception:
log_error('Error occurred')
log_exception()
traceback.print_exc()
shutdown_event.set_error()
return None
def flatten_work(filename, sha):
file_spec = csv_files['works']
with gzip.open(sha_filename(file_spec['works']['name'], sha), 'wt', encoding='utf-8') as works_csv, \
gzip.open(sha_filename(file_spec['primary_locations']['name'], sha), 'wt', encoding='utf-8') as primary_locations_csv, \
gzip.open(sha_filename(file_spec['locations']['name'], sha), 'wt', encoding='utf-8') as locations, \
gzip.open(sha_filename(file_spec['best_oa_locations']['name'], sha), 'wt', encoding='utf-8') as best_oa_locations, \
gzip.open(sha_filename(file_spec['authorships']['name'], sha), 'wt', encoding='utf-8') as authorships_csv, \
gzip.open(sha_filename(file_spec['biblio']['name'], sha), 'wt', encoding='utf-8') as biblio_csv, \
gzip.open(sha_filename(file_spec['topics']['name'], sha), 'wt', encoding='utf-8') as topics_csv, \
gzip.open(sha_filename(file_spec['concepts']['name'], sha), 'wt', encoding='utf-8') as concepts_csv, \
gzip.open(sha_filename(file_spec['ids']['name'], sha), 'wt', encoding='utf-8') as ids_csv, \
gzip.open(sha_filename(file_spec['mesh']['name'], sha), 'wt', encoding='utf-8') as mesh_csv, \
gzip.open(sha_filename(file_spec['open_access']['name'], sha), 'wt', encoding='utf-8') as open_access_csv, \
gzip.open(sha_filename(file_spec['referenced_works']['name'], sha), 'wt', encoding='utf-8') as referenced_works_csv, \
gzip.open(sha_filename(file_spec['related_works']['name'], sha), 'wt', encoding='utf-8') as related_works_csv:
works_writer = init_dict_writer(works_csv, file_spec['works'],
extrasaction='ignore')
primary_locations_writer = init_dict_writer(primary_locations_csv,
file_spec[
'primary_locations'])
locations_writer = init_dict_writer(locations, file_spec['locations'])
best_oa_locations_writer = init_dict_writer(best_oa_locations,
file_spec[
'best_oa_locations'])
authorships_writer = init_dict_writer(authorships_csv,
file_spec['authorships'])
biblio_writer = init_dict_writer(biblio_csv, file_spec['biblio'])
topics_writer = init_dict_writer(topics_csv, file_spec['topics'])
concepts_writer = init_dict_writer(concepts_csv, file_spec['concepts'])
ids_writer = init_dict_writer(ids_csv, file_spec['ids'],
extrasaction='ignore')
mesh_writer = init_dict_writer(mesh_csv, file_spec['mesh'])
open_access_writer = init_dict_writer(open_access_csv,
file_spec['open_access'])
referenced_works_writer = init_dict_writer(referenced_works_csv,
file_spec[
'referenced_works'])
related_works_writer = init_dict_writer(related_works_csv,
file_spec['related_works'])
jsonl_file_name = filename
with gzip.open(jsonl_file_name, 'r') as works_jsonl:
for work_json in works_jsonl:
if not work_json.strip():
continue
work = json.loads(work_json)
if not (work_id := work.get('id')):
continue
# works
if (abstract := work.get(
'abstract_inverted_index')) is not None:
work['abstract_inverted_index'] = json.dumps(abstract,
ensure_ascii=False)
works_writer.writerow(work)
# primary_locations
if primary_location := (work.get('primary_location') or {}):
if primary_location.get(
'source') and primary_location.get(
'source').get('id'):
primary_locations_writer.writerow({
'work_id': work_id,
'source_id': primary_location['source']['id'],
'landing_page_url': primary_location.get(
'landing_page_url'),
'pdf_url': primary_location.get('pdf_url'),
'is_oa': primary_location.get('is_oa'),
'version': primary_location.get('version'),
'license': primary_location.get('license'),
})
# locations
if locations := work.get('locations'):
for location in locations:
if location.get('source') and location.get(
'source').get('id'):
locations_writer.writerow({
'work_id': work_id,
'source_id': location['source']['id'],
'landing_page_url': location.get(
'landing_page_url'),
'pdf_url': location.get('pdf_url'),
'is_oa': location.get('is_oa'),
'version': location.get('version'),
'license': location.get('license'),
})
# best_oa_locations
if best_oa_location := (work.get('best_oa_location') or {}):
if best_oa_location.get(
'source') and best_oa_location.get(
'source').get('id'):
best_oa_locations_writer.writerow({
'work_id': work_id,
'source_id': best_oa_location['source']['id'],
'landing_page_url': best_oa_location.get(
'landing_page_url'),
'pdf_url': best_oa_location.get('pdf_url'),
'is_oa': best_oa_location.get('is_oa'),
'version': best_oa_location.get('version'),
'license': best_oa_location.get('license'),
})
# authorships
if authorships := work.get('authorships'):
for authorship in authorships:
if author_id := authorship.get('author', {}).get(
'id'):
institutions = authorship.get('institutions')
institution_ids = [i.get('id') for i in
institutions]
institution_ids = [i for i in institution_ids if
i]
institution_ids = institution_ids or [None]
for institution_id in institution_ids:
authorships_writer.writerow({
'work_id': work_id,
'author_position': authorship.get(
'author_position'),
'author_id': author_id,
'institution_id': institution_id,
'raw_affiliation_string': authorship.get(
'raw_affiliation_string'),
})
# biblio
if biblio := work.get('biblio'):
biblio['work_id'] = work_id
biblio_writer.writerow(biblio)
# topics
for topic in work.get('topics', []):
if topic_id := topic.get('id'):
topics_writer.writerow({
'work_id': work_id,
'topic_id': topic_id,
'score': topic.get('score')
})
# concepts
for concept in work.get('concepts'):
if concept_id := concept.get('id'):
concepts_writer.writerow({
'work_id': work_id,
'concept_id': concept_id,
'score': concept.get('score'),
})
# ids
if ids := work.get('ids'):
ids['work_id'] = work_id
ids_writer.writerow(ids)
# mesh
for mesh in work.get('mesh'):
mesh['work_id'] = work_id
mesh_writer.writerow(mesh)
# open_access
if open_access := work.get('open_access'):
open_access['work_id'] = work_id
open_access_writer.writerow(open_access)
# referenced_works
for referenced_work in work.get('referenced_works'):
if referenced_work:
referenced_works_writer.writerow({
'work_id': work_id,
'referenced_work_id': referenced_work
})
# related_works
for related_work in work.get('related_works'):
if related_work:
related_works_writer.writerow({
'work_id': work_id,
'related_work_id': related_work
})
return filename
def init_dict_writer(csv_file, file_spec, **kwargs):
writer = csv.DictWriter(
csv_file, fieldnames=file_spec['columns'], **kwargs
)
writer.writeheader()
return writer
def init_worker_process(shutdown_evt, l_lock):
assert shutdown_evt is not None
assert l_lock is not None
global shutdown_event
global log_lock
assert shutdown_event is None
assert log_lock is None
shutdown_event = shutdown_evt
log_lock = l_lock
def launch_create_authors_works_csv_jobs(executor, file_to_sha):
processed_files = []
do_process_files = []
known_datatypes = ['authors', 'works']
for data_type in known_datatypes:
assert os.path.exists(os.path.join(SNAPSHOT_DIR, 'data', data_type))
for jsonl_file_name in glob.glob(os.path.join(SNAPSHOT_DIR, 'data', data_type, '*', '*.gz')):
if jsonl_file_name in file_to_sha:
if is_done(STAGE_CREATE_CSV, data_type, jsonl_file_name, file_to_sha[jsonl_file_name]):
processed_files.append((data_type, jsonl_file_name))
else:
do_process_files.append((data_type, jsonl_file_name))
else:
log_error(f'File {jsonl_file_name} not found in file_to_sha dictionary.')
log_error('Is there some background sync job running?')
raise RuntimeError('File not found in file_to_sha dictionary')
log_info(f'Already processed {len(processed_files):_} authors and works SNAPSHOT files, {file_list_to_filesize(processed_files) // 1024 // 1024:_} MB')
log_info(f'Processing remaining {len(do_process_files):_} authors and works SNAPSHOT files, {file_list_to_filesize(do_process_files) // 1024 // 1024:_} MB')
futures = []
for data_type, f in do_process_files:
assert data_type in data_types
sha = file_to_sha[f]
fut = None
if data_type == 'works':
fut = executor.submit(flatten_work_job, f, sha)
elif data_type == 'authors':
fut = executor.submit(flatten_author_job, f, sha)
else:
raise RuntimeError(f'Unhandled data type {data_type}')
assert fut is not None
futures.append(fut)
return processed_files, do_process_files, futures
def flatten_wait_authors_works_jobs(shutdown_evt, file_to_sha, all_files, already_processed_files, futures):
worker_pids = []
total_size = file_list_to_filesize(all_files)
for fut in concurrent.futures.as_completed(futures):
if shutdown_evt.is_set():
break
res = fut.result()
if res is None:
shutdown_evt.set_error()
else:
pid, start_time_ms, data_type, filename = res
assert data_type in data_types
spent_ms = current_milli_time() - start_time_ms
sha = file_to_sha[filename]
mark_file_done(STAGE_CREATE_CSV, data_type, filename, sha)
if pid not in worker_pids:
worker_pids.append(pid)
already_processed_files.append((data_type, filename))
report_create_csv_progress(all_files, total_size, already_processed_files, filename, data_type, spent_ms)
def flatten_author_works(shutdown_evt, executor, file_to_sha, all_files):
log_info('Creating CSV files for authors and works ...')
already_processed_files, do_process_files, futures = launch_create_authors_works_csv_jobs(executor, file_to_sha)
flatten_wait_authors_works_jobs(shutdown_evt, file_to_sha, all_files, already_processed_files, futures)
return already_processed_files
def create_csv_files(shutdown_evt, executor, file_to_sha):
log_info('Creating CSV files ...')
all_files = get_all_files()
start_time_seconds = current_seconds_time()
processed_files = flatten_author_works(shutdown_evt, executor, file_to_sha, all_files)
done_time_seconds = current_seconds_time() - start_time_seconds
log_info(f'Created CSV files for authors and works ... Done! Spent {ms_to_eta(done_time_seconds*1000)}')
remaining_bytes = file_list_to_filesize(all_files) - file_list_to_filesize(processed_files)
log_info(f'Remaining MB to create CSV files for: {remaining_bytes//1024//1024:_}')
flatten_topics(processed_files, all_files)
flatten_concepts(processed_files, all_files)
flatten_institutions(processed_files, all_files)
flatten_publishers(processed_files, all_files)
flatten_sources(processed_files, all_files)
done_time_seconds = current_seconds_time() - start_time_seconds
log_info(f'Creating CSV files done! Spent {ms_to_eta(done_time_seconds * 1000)}')
def check_psql():
log_info(f'Testing PSQL {PSQL} --version ...')
v = exec_verbose([PSQL, '--version'])
v = v.strip()
log_info(f'Testing PSQL {PSQL} --version ... OK. Version: {v}')
def wipe_init_db():
# Kill connections: https://stackoverflow.com/questions/5408156/how-to-drop-a-postgresql-database-if-there-are-active-connections-to-it
log_info('Terminating previous sessions ...')
ok_kill = exec_ignore_errors([PSQL, '-d', 'postgres', '-c', f'''
SELECT pg_terminate_backend(pg_stat_activity.pid)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = '{DB_NAME}'
AND pid <> pg_backend_pid();
'''])
if ok_kill:
log_info('Terminating previous sessions ... OK')
else:
log_warn('Terminating previous sessions ... Failed! Continuing anyway')
log_info(f'Testing database {DB_NAME} is present ...')
ok_db_present = exec_ignore_errors([PSQL, '-d', DB_NAME, '-c', 'select 1'])
if ok_db_present:
log_info(f'Testing database {DB_NAME} is present ... OK')
else:
log_warn(f'Testing database {DB_NAME} is present ... Failed! Trying to create database ...')
log_info(f'Creating database {DB_NAME} ...')
exec_verbose([PSQL, '-d', 'postgres', '-c', f'CREATE DATABASE {DB_NAME}'])
log_info(f'Creating database {DB_NAME} ... OK')
log_info(f'Testing database {DB_NAME} is present again ...')
exec_verbose([PSQL, '-d', DB_NAME, '-c', 'select 1'])
log_info(f'Testing database {DB_NAME} is present again ... OK')
log_info('Dropping schema ...')
exec_verbose([PSQL, '-d', DB_NAME, '-c', 'DROP SCHEMA IF EXISTS openalex CASCADE'])
log_info('Dropping schema ... OK')
log_info('Creating base schema ...')
with open(SCHEMA_FILE, encoding='utf-8') as f:
exec_verbose([PSQL, '-d', DB_NAME, '-c', f.read()])
log_info('Creating base schema ... OK')
def populate_subtable(prefix, subtable):
start_seconds = current_seconds_time()
field_names = ', '.join(csv_files[prefix][subtable]['columns'])
filename = csv_files[prefix][subtable]['name']
sqlcmd = f"\\copy openalex.{prefix}_{subtable} ({field_names}) from program 'gunzip -c {filename}' csv header"
exec_verbose([PSQL, '-d', DB_NAME, '-c', sqlcmd])
spent_seconds = current_seconds_time() - start_seconds
log_info(f'Populated openalex.{prefix}_{subtable} in {spent_seconds}s')
def populate_maintable(prefix):
start_seconds = current_seconds_time()
field_names = ', '.join(csv_files[prefix][prefix]['columns'])
filename = csv_files[prefix][prefix]['name']
sqlcmd = f"\\copy openalex.{prefix} ({field_names}) from program 'gunzip -c {filename}' csv header"
exec_verbose([PSQL, '-d', DB_NAME, '-c', sqlcmd])
spent_seconds = current_seconds_time() - start_seconds
log_info(f'Populated openalex.{prefix} in {spent_seconds}s')
def insert_small_tables(shutdown_evt, executor):
populate_maintable('concepts')
populate_subtable('concepts', 'ancestors')
populate_subtable('concepts', 'counts_by_year')
populate_subtable('concepts', 'ids')
populate_subtable('concepts', 'related_concepts')
populate_maintable('topics')
populate_maintable('institutions')
populate_subtable('institutions', 'ids')
populate_subtable('institutions', 'geo')
populate_subtable('institutions', 'associated_institutions')
populate_subtable('institutions', 'counts_by_year')
populate_maintable('publishers')
populate_subtable('publishers', 'counts_by_year')
populate_subtable('publishers', 'ids')
populate_maintable('sources')
populate_subtable('sources', 'ids')
populate_subtable('sources', 'counts_by_year')
def populate_table(table_name, columns, filename, sha):
assert isinstance(table_name, str)
assert isinstance(columns, list)
assert isinstance(filename, str)
filename = sha_filename(filename, sha)
assert os.path.exists(filename)
assert os.path.isfile(filename)
field_names = ', '.join(columns)
sqlcmd = f"\\copy openalex.{table_name} ({field_names}) from program 'gunzip -c {filename}' csv header"
exec_verbose([PSQL, '-d', DB_NAME, '-c', sqlcmd])
def insert_authors_job(f, sha):
global shutdown_event
global log_lock
assert shutdown_event is not None
assert log_lock is not None
if shutdown_event.is_set():
return None
start_ms = current_milli_time()
populate_table('authors', csv_files['authors']['authors']['columns'], csv_files['authors']['authors']['name'], sha)
for k, v in csv_files['authors'].items():
if k == 'authors':
continue
populate_table(f'authors_{k}', v['columns'], v['name'], sha)
spent_ms = current_milli_time() - start_ms
return os.getpid(), 'author', f, spent_ms
def insert_works_job(f, sha):
global shutdown_event
global log_lock
assert shutdown_event is not None
assert log_lock is not None
if shutdown_event.is_set():
return None
start_ms = current_milli_time()
populate_table('works', csv_files['works']['works']['columns'], csv_files['works']['works']['name'], sha)
for k, v in csv_files['works'].items():
if k == 'works':
continue
populate_table(f'works_{k}', v['columns'], v['name'], sha)
spent_ms = current_milli_time() - start_ms
return os.getpid(), 'works', f, spent_ms
def report_progress(all_files, total_size, processed_size, prefix, file_no, filename, spent_ms):
fsize = file_size(filename)
if spent_ms == 0:
spent_ms = 1
mb_per_second = (fsize / 1024 / 1024) / (spent_ms / 1000)
done_percent = processed_size*100 / total_size
fileno_str = f'{file_no:_}'.rjust(len(f'{len(all_files):_}'), ' ')
mb = f'{fsize // 1024 // 1024:_}'.rjust(len('1_234'), ' ')
mb_s = f'{mb_per_second:.1f}'.rjust(len('100.0'), ' ')
total_size_str = f'{total_size // 1024 // 1024:_}'
processed_mb = f'{processed_size // 1024 // 1024:_}'.rjust(len(total_size_str), ' ')
done_percent = f'{done_percent:.2f}'.rjust(len('100.00'), ' ')
abbr_filename_str = abbr_filename(all_files, filename)
log_info(f'{prefix} {fileno_str}/{len(all_files):_} {abbr_filename_str} {mb} MB, {mb_s} MB/s, total {processed_mb}/{total_size_str} MB. {done_percent}% done.')
def insert_wait_author_works_jobs(shutdown_evt, all_files, futures):
total_size = file_list_to_filesize(all_files)
log_info('Waiting for insert authors and works jobs to finish ...')
processed_size = 0
for idx, fut in enumerate(concurrent.futures.as_completed(futures)):
if shutdown_evt.is_set():
break
res = fut.result()
if res is None:
shutdown_evt.set_error()
else:
assert isinstance(res, tuple)
assert 4 == len(res)
pid, typ, filename, spent_ms = res
report_progress(all_files, total_size, processed_size, 'Inserted', idx + 1, filename, spent_ms)
def insert_author_works(shutdown_evt, executor, file_to_sha):
aw_files = [(typ, f) for typ, f in get_all_files() if typ in ['authors', 'works']]
log_info(f'Found {len(aw_files)} authors and works files')
filename_to_type = {}
for typ, f in aw_files:
filename_to_type[f] = typ
if typ == 'authors':
pass
elif typ == 'works':
pass
else:
raise RuntimeError(f'Unknown type: {typ}')
futures = []
for typ, f in aw_files:
sha = file_to_sha[f]
if typ == 'authors':
fut = executor.submit(insert_authors_job, f, sha)
elif typ == 'works':
fut = executor.submit(insert_works_job, f, sha)
else:
raise RuntimeError(f'Unknown type {typ}')
assert fut is not None
futures.append(fut)
insert_wait_author_works_jobs(shutdown_evt, aw_files, futures)
def check_schema_file():
if os.path.exists(SCHEMA_FILE):
if os.path.isfile(SCHEMA_FILE):
pass
else:
log_error(f'SCHEMA_FILE {SCHEMA_FILE} is not a file')
log_error(f'Exiting')
raise ExecException()
else:
log_error(f'SCHEMA_FILE {SCHEMA_FILE} does not exist')
log_error(f'Exiting')
raise ExecException()
def show_config():
log_info(banner('Configuration and environment'))
log_info(f'Given command line arguments: {sys.argv[1:]}')
env_variables = ['PATH', 'PGHOST', 'PGUSER', 'PGPASSWORD', 'PGPASSFILE', 'PGPORT']
env_variables.sort()
for env_var in env_variables:
if env_var in os.environ:
if env_var != 'PGPASSWORD' and env_var != 'PGPASSFILE':
log_info(f'Environment variable {env_var} is set to {os.environ[env_var]}')
else:
log_info(f'Environment variable {env_var} is set to ***')
else:
if env_var == 'PGUSER':
log_info(f'Environment variable {env_var} is not set. Will default to {getpass.getuser()}')
else:
log_info(f'Environment variable {env_var} is not set')
log_info(f'Using AWS_CLI {AWS_CLI}')
log_info(f'Using CSV_DIR {CSV_DIR}')
log_info(f'Using DB_NAME {DB_NAME}')
log_info(f'Using DONE_FILE {DONE_FILE}')
log_info(f'Using FORCE_WIPE {FORCE_WIPE}')
log_info(f'Using LOG_FILE {LOG_FILE}')
log_info(f'Using PSQL {PSQL}')
log_info(f'Using SCHEMA_FILE {SCHEMA_FILE}')
log_info(f'Using SNAPSHOT_DIR {SNAPSHOT_DIR}')
log_info(f'Using SYNC {SYNC}')
log_info(f'Using WORKER_COUNT {WORKER_COUNT}')
log_info(f'Running as USER {getpass.getuser()}')
log_info(f'CWD is {os.getcwd()}')
check_schema_file()
check_psql()
check_aws_cli(AWS_CLI)
log_info(banner('End of configuration and environment'))
def run_main(shutdown_evt, l_lock):
show_config()
if SHOW_CONFIG:
return
if SYNC:
aws_sync(AWS_CLI, SNAPSHOT_DIR)
else:
log_warn('Not syncing s3://openalex ...')
with concurrent.futures.ProcessPoolExecutor(max_workers=WORKER_COUNT, initargs=(shutdown_evt, l_lock), initializer=init_worker_process) as executor:
try:
all_files = get_all_files()
file_to_sha = get_file_to_sha_map_verbose(shutdown_evt, executor, all_files)
sha_str = ''
for _, filename in all_files:
sha_str += filename + ':' + file_to_sha[filename] + '\n'
hs = hashlib.sha256(sha_str.encode('utf-8')).hexdigest()
log_info(f'DB hash is {hs}')
if FORCE_WIPE:
log_warn('Forcing wipe of database and re-import')
if os.path.exists(DONE_FILE):
os.remove(DONE_FILE)
if is_done(STAGE_IMPORT_DB, 'db_hash', 'db_hash', hs):
log_info('DB is up to date, doing nothing')
else:
log_info('DB is out of date, creating CSV files and importing')
wipe_init_db()
create_csv_files(shutdown_evt, executor, file_to_sha)
insert_small_tables(shutdown_evt, executor)
insert_author_works(shutdown_evt, executor, file_to_sha)
mark_file_done(STAGE_IMPORT_DB, 'db_hash', 'db_hash', hs)
except KeyboardInterrupt as e:
log_info('CTRL-C received, shutting down')
shutdown_evt.set()
raise e
except ExecException as e:
shutdown_evt.set_error()
log_error('Executing command failed!')
raise e
except Exception:
log_exception()
log_error('Main program crashed!')
shutdown_evt.set_error()
finally:
shutdown_evt.set()
def run_main_outer():
global log_lock
multiprocessing.set_start_method('spawn')
start_time = current_milli_time()
shutdown_evt = Event2()
log_lock = multiprocessing.Lock()
exit_code = 0
for arg in sys.argv[1:]:
if arg.startswith('-'):
if arg not in valid_args:
log_error(f'Invalid argument given: {arg}')
log_error('Exiting')
return 1
try:
run_main(shutdown_evt, log_lock)
if shutdown_evt.is_error():
exit_code = 1
else:
exit_code = 0
return exit_code
except ExecException:
exit_code = 1
return exit_code
except Exception:
log_exception()
log_error('Main program crashed!')
shutdown_evt.set_error()
exit_code = 1
return exit_code
except KeyboardInterrupt:
shutdown_evt.set()
exit_code = 130
return exit_code
finally:
log_info(f'Main exiting with exit code {exit_code}. Spent {ms_to_eta(current_milli_time() - start_time)}.')
return exit_code
if __name__ == '__main__':
sys.exit(run_main_outer())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment