Created
September 3, 2024 19:58
-
-
Save tsibley/d069064af42c6c1da670106a55666245 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
From 4b07dc59f847f08a51addf7a9826d572484accf2 Mon Sep 17 00:00:00 2001 | |
From: Thomas Sibley <[email protected]> | |
Date: Tue, 3 Sep 2024 12:55:14 -0700 | |
Subject: [PATCH] wip! merge: concurrent import | |
--- | |
augur/merge.py | 146 ++++++++++++++++++++++++++++++------------------- | |
1 file changed, 90 insertions(+), 56 deletions(-) | |
diff --git a/augur/merge.py b/augur/merge.py | |
index 134c344b..b43670a3 100644 | |
--- a/augur/merge.py | |
+++ b/augur/merge.py | |
@@ -33,6 +33,7 @@ future. The SQLite 3 CLI, sqlite3, must be available. If it's not on PATH (or | |
you want to use a version different from what's on PATH), set the SQLITE3 | |
environment variable to path of the desired sqlite3 executable. | |
""" | |
+import concurrent.futures as futures | |
import gettext | |
import os | |
import re | |
@@ -62,20 +63,70 @@ T = TypeVar('T') | |
_n = gettext.NullTranslations().ngettext | |
-class NamedMetadata(Metadata): | |
+# Locate how to re-invoke ourselves (_this_ specific Augur). | |
+if sys.executable: | |
+ AUGUR = f"{shquote(sys.executable)} -m augur" | |
+else: | |
+ # A bit unusual we don't know our own Python executable, but assume we | |
+ # can access ourselves as the ``augur`` command. | |
+ AUGUR = f"augur" | |
+ | |
+ | |
+class SQLiteMetadata(Metadata): | |
name: str | |
"""User-provided descriptive name for this metadata file.""" | |
+ db_path: str | |
+ """Temporary, on-disk SQLite database file path.""" | |
+ | |
table_name: str | |
"""Generated SQLite table name for this metadata file, based on *name*.""" | |
def __init__(self, name: str, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.name = name | |
+ | |
+ # Work with a temporary, on-disk SQLite database under a name we control so | |
+ # we can access it from multiple (serial) processes. | |
+ db_fd, db_path = mkstemp(prefix=f"augur-metadata-{self.name}-", suffix=".sqlite") | |
+ os.close(db_fd) | |
+ | |
+ self.db_path = db_path | |
self.table_name = f"metadata_{self.name}" | |
def __repr__(self): | |
- return f"<NamedMetadata {self.name}={self.path}>" | |
+ return f"<SQLiteMetadata {self.name}={self.path} db_path={self.db_path}>" | |
+ | |
+ def load_db(self): | |
+ """ | |
+ Import metadata file *path* into SQLite database at *db_path*. | |
+ """ | |
+ # All other metadata reading in Augur (i.e. via the csv module) | |
+ # uses Python's "universal newlines"¹ definition and accepts \n, | |
+ # \r\n, and \r as newlines interchangably (even mixed within the | |
+ # same file!). We accomplish the same behaviour here with SQLite's | |
+ # less flexible newline handling by relying on the universal | |
+ # newline translation of `augur read-file`. | |
+ # -trs, 24 July 2024 | |
+ # | |
+ # ¹ <https://docs.python.org/3/glossary.html#term-universal-newlines> | |
+ newline = os.linesep | |
+ | |
+ sqlite3(self.db_path, | |
+ f'.mode csv', | |
+ f'.separator {sqlite_quote_dot(self.delimiter)} {sqlite_quote_dot(newline)}', | |
+ f'.import {sqlite_quote_dot(f"|{AUGUR} read-file {shquote(self.path)}")} {sqlite_quote_dot(self.table_name)}', | |
+ | |
+ f'create unique index {sqlite_quote_id(f"{self.table_name}_id")} on {sqlite_quote_id(self.table_name)}({sqlite_quote_id(self.id_column)});', | |
+ | |
+ # <https://sqlite.org/pragma.html#pragma_optimize> | |
+ f'pragma optimize;') | |
+ | |
+ # Ensure our self.columns matches what SQLite's .import created so | |
+ # callers can rely on our method. We can address cases where SQLite | |
+ # renames columns on import later. | |
+ assert self.columns == (table_columns := sqlite3_table_columns(self.db_path, self.table_name)), \ | |
+ f"{self.columns!r} == {table_columns!r}" | |
def register_parser(parent_subparsers): | |
@@ -151,26 +202,12 @@ def run(args): | |
# Infer delimiters and id columns | |
metadata = [ | |
- NamedMetadata(name, path, [delim for name_, delim in metadata_delimiters if not name_ or name_ == name] or DEFAULT_DELIMITERS, | |
- [column for name_, column in metadata_id_columns if not name_ or name_ == name] or DEFAULT_ID_COLUMNS) | |
+ SQLiteMetadata(name, path, [delim for name_, delim in metadata_delimiters if not name_ or name_ == name] or DEFAULT_DELIMITERS, | |
+ [column for name_, column in metadata_id_columns if not name_ or name_ == name] or DEFAULT_ID_COLUMNS) | |
for name, path in metadata] | |
- # Locate how to re-invoke ourselves (_this_ specific Augur). | |
- if sys.executable: | |
- augur = f"{shquote(sys.executable)} -m augur" | |
- else: | |
- # A bit unusual we don't know our own Python executable, but assume we | |
- # can access ourselves as the ``augur`` command. | |
- augur = f"augur" | |
- | |
- | |
- # Work with a temporary, on-disk SQLite database under a name we control so | |
- # we can access it from multiple (serial) processes. | |
- db_fd, db_path = mkstemp(prefix="augur-merge-", suffix=".sqlite") | |
- os.close(db_fd) | |
- | |
- # Clean up database file by default | |
+ # Clean up database files by default | |
delete_db = True | |
# Track columns as we see them, in order. The first metadata's id column | |
@@ -197,36 +234,21 @@ def run(args): | |
try: | |
# Read all metadata files into a SQLite db | |
+ with futures.ThreadPoolExecutor() as executor: | |
+ def load(m): | |
+ # XXX FIXME: better UI here! use richer terminal interactions for async? | |
+ print_info(f"Reading {m.name!r} metadata from {m.path!r}…") | |
+ m.load_db() | |
+ | |
+ done, not_done = futures.wait([executor.submit(load, m) for m in metadata], return_when=futures.FIRST_EXCEPTION) | |
+ | |
+ for future in done: | |
+ future.result() # raise first exception, if any | |
+ | |
+ | |
+ # Track which columns appear in which metadata inputs, preserving | |
+ # the order of both. | |
for m in metadata: | |
- # All other metadata reading in Augur (i.e. via the csv module) | |
- # uses Python's "universal newlines"¹ definition and accepts \n, | |
- # \r\n, and \r as newlines interchangably (even mixed within the | |
- # same file!). We accomplish the same behaviour here with SQLite's | |
- # less flexible newline handling by relying on the universal | |
- # newline translation of `augur read-file`. | |
- # -trs, 24 July 2024 | |
- # | |
- # ¹ <https://docs.python.org/3/glossary.html#term-universal-newlines> | |
- newline = os.linesep | |
- | |
- print_info(f"Reading {m.name!r} metadata from {m.path!r}…") | |
- sqlite3(db_path, | |
- f'.mode csv', | |
- f'.separator {sqlite_quote_dot(m.delimiter)} {sqlite_quote_dot(newline)}', | |
- f'.import {sqlite_quote_dot(f"|{augur} read-file {shquote(m.path)}")} {sqlite_quote_dot(m.table_name)}', | |
- | |
- f'create unique index {sqlite_quote_id(f"{m.table_name}_id")} on {sqlite_quote_id(m.table_name)}({sqlite_quote_id(m.id_column)});', | |
- | |
- # <https://sqlite.org/pragma.html#pragma_optimize> | |
- f'pragma optimize;') | |
- | |
- # We're going to use Metadata.columns to generate the select | |
- # statement, so ensure it matches what SQLite's .import created. | |
- assert m.columns == (table_columns := sqlite3_table_columns(db_path, m.table_name)), \ | |
- f"{m.columns!r} == {table_columns!r}" | |
- | |
- # Track which columns appear in which metadata inputs, preserving | |
- # the order of both. | |
for column in m.columns: | |
# Match different id column names in different metadata files | |
# since they're logically equivalent. Any non-id columns that | |
@@ -236,7 +258,7 @@ def run(args): | |
output_column = output_id_column if column == m.id_column else column | |
output_columns.setdefault(output_column, []) | |
- output_columns[output_column] += [(m.table_name, column)] | |
+ output_columns[output_column] += [(m.name, m.table_name, column)] | |
# Construct query to produce merged metadata. | |
@@ -246,12 +268,12 @@ def run(args): | |
for output_column, input_columns in output_columns.items()), | |
# Source columns | |
- *(f"""{sqlite_quote_id(m.table_name, m.id_column)} is not null as {sqlite_quote_id(f'__source_metadata_{m.name}')}""" | |
+ *(f"""{sqlite_quote_id(m.name, m.table_name, m.id_column)} is not null as {sqlite_quote_id(f'__source_metadata_{m.name}')}""" | |
for m in metadata)] | |
from_list = [ | |
- sqlite_quote_id(metadata[0].table_name), | |
- *(f"full outer join {sqlite_quote_id(m.table_name)} on {sqlite_quote_id(m.table_name, m.id_column)} in ({', '.join(sqlite_quote_id(m.table_name, m.id_column) for m in reversed(preceding))})" | |
+ sqlite_quote_id(metadata[0].name, metadata[0].table_name), | |
+ *(f"full outer join {sqlite_quote_id(m.name, m.table_name)} on {sqlite_quote_id(m.name, m.table_name, m.id_column)} in ({', '.join(sqlite_quote_id(m.name, m.table_name, m.id_column) for m in reversed(preceding))})" | |
for m, preceding in [(m, metadata[:i]) for i, m in enumerate(metadata[1:], 1)])] | |
# Take some small pains to make the query readable since it makes | |
@@ -272,11 +294,13 @@ def run(args): | |
# Assume TSV like nearly all other extant --output-metadata options. | |
print_info(f"Merging metadata and writing to {args.output_metadata!r}…") | |
print_debug(query) | |
- sqlite3(db_path, | |
+ sqlite3("", | |
+ *[f'attach database {sqlite_quote_string(m.db_path)} as {sqlite_quote_id(m.name)};' | |
+ for m in metadata], | |
f'.mode csv', | |
f'.separator "\\t" "\\n"', | |
f'.headers on', | |
- f'.once {sqlite_quote_dot(f"|{augur} write-file {shquote(args.output_metadata)}")}', | |
+ f'.once {sqlite_quote_dot(f"|{AUGUR} write-file {shquote(args.output_metadata)}")}', | |
query) | |
except SQLiteError as err: | |
@@ -284,10 +308,20 @@ def run(args): | |
raise AugurError(str(err)) from err | |
finally: | |
+ db_paths = [m.db_path for m in metadata] | |
+ | |
if delete_db: | |
- os.unlink(db_path) | |
+ for db in db_paths: | |
+ os.unlink(db) | |
else: | |
- print_info(f"WARNING: Skipped deletion of {db_path} due to error, but you may want to clean it up yourself (e.g. if it's large).") | |
+ print_info(dedent(f"""\ | |
+ WARNING: Skipped deletion of temporary SQLite databases due to | |
+ error, but you may want to clean them up yourself with: | |
+ | |
+ rm -f {' '.join(map(shquote, db_paths))} | |
+ | |
+ as they may be large. | |
+ """)) | |
def sqlite3(*args, **kwargs): | |
-- | |
2.46.0 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment