-
-
Save Cortys/f349cdf22365d8c47159b8e5a5ab1691 to your computer and use it in GitHub Desktop.
MLFlow migration script from filesystem to database tracking data. Adapted to support MLFlow 1.18+ and SQLite.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Migration From FileBased store of MLFlow 0.8 up to 1.8.0 | |
Credits: | |
* Based on work of bnekolny https://gist.github.com/bnekolny/8ec96faa0d57913eea6bb7848c06b912 | |
* Latest version https://gist.github.com/weldpua2008/7f0c4644d247bd0fc7ba9a83c2d337d5 | |
requirements: | |
pyyaml version 5.1 was required | |
pip3.6 install -U pyyaml | |
Usage: | |
* Import all experiments | |
migrate_data.py.py \ | |
--wipe-db \ | |
--mlruns-dir /opt/mlflow/mlruns > /tmp/migration_inserts_full.sql | |
mysql -D mlflow_storage < /tmp/migration_inserts_full.sql 2>> /var/log/mlflow-migration-full.log | |
* Run periodically import(every 10 minutes by cron) of the latest runs | |
migrate_data.py.py \ | |
--mlruns-dir /opt/mlflow/mlruns --partial-update --partial-last-seconds 1000 \ | |
--mlruns-dir /opt/mlflow/mlruns > /tmp/migration_inserts_partial.sql | |
mysql -D mlflow_storage < /tmp/migration_inserts_partial.sql 2>> /var/log/mlflow-migration-full.log | |
Important: | |
* To fix Cannot import data ERROR 1406 (22001): Data too long (https://github.com/mlflow/mlflow/issues/2814) | |
You can change schema (uncomment ALTER TABLE `params` MODIFY value VARCHAR(6512) NOT NULL;) | |
""" | |
import os | |
from pathlib import Path | |
import sys | |
import yaml | |
import codecs | |
import argparse | |
from functools import partial | |
from datetime import datetime | |
error = partial(print, file=sys.stderr) | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--mlruns-dir", | |
type=Path, | |
default=Path("/opt/mlflow/mlruns/"), | |
help="Path to the MLflow runs data directory", | |
) | |
parser.add_argument( | |
"--experiment-name", | |
type=str, | |
default=None, | |
help="Name of the experiment to be migrated. All are selected by default.") | |
parser.add_argument( | |
'--wipe-db', | |
action="store_true", | |
default=False, | |
help="Add SQL statements for flush all data in database") | |
parser.add_argument( | |
'--partial-update', | |
action="store_true", | |
default=False, | |
help="Prepare partial SQL statements") | |
parser.add_argument( | |
"--partial-last-seconds", | |
type=int, | |
default=900, | |
help="Prepare dump for MLFlow Runs recorded since last seconds", | |
) | |
parser.add_argument( | |
"--partial-since-seconds", | |
type=int, | |
default=120, | |
help="Skip MLFlow Runs created less then since last seconds", | |
) | |
parser.add_argument( | |
"--only-last-metric", | |
action="store_true", | |
default=False, | |
help="Only include the last value of each metric.", | |
) | |
return parser.parse_args() | |
def progress(_cur, _max): | |
p = round(100*_cur/_max) | |
b = f"Progress: {p}% - ["+"."*int(p/5)+" "*(20-int(p/5))+"]" | |
error(b, end="\r") | |
def load_metadata_file(fpath): | |
if os.path.exists(fpath): | |
with open(fpath) as fp: | |
try: | |
return yaml.full_load(fp) | |
except AttributeError: | |
return yaml.load(fp) | |
def process_experiment(rootDir, experiment_id, experiment_names, partial_update, | |
partial_last_seconds, partial_since_seconds, filter_name=None, | |
only_last_metric=False): | |
status = {1: 'RUNNING', 2: 'SCHEDULED', 3: 'FINISHED', 4: 'FAILED', 5: 'KILLED'} | |
sourceType = {1: 'NOTEBOOK', 2: 'JOB', 3: 'PROJECT', 4: 'LOCAL', 5: 'UNKNOWN'} | |
EPATH = "{root}/{experiment}".format(root=rootDir, experiment=experiment_id) | |
NOW = datetime.now() | |
experiment = load_metadata_file("{experiment}/meta.yaml".format(experiment=EPATH)) | |
if experiment is None: | |
return True | |
experiment['experiment_id'] = experiment.get('experiment_id', experiment_id) | |
if experiment_id == 0 or experiment_id == '0': | |
print("--SET sql_mode='NO_AUTO_VALUE_ON_ZERO';") | |
if len(experiment['name']) < 1: | |
error("experiment name is empty at {experiment}/meta.yaml".format(experiment=EPATH)) | |
return True | |
if experiment['name'] in experiment_names: | |
error("experiment {name} exists, appending _".format(name=experiment['name'])) | |
experiment['name'] = "{}_".format(experiment['name']) | |
if filter_name is not None and experiment['name'] not in filter_name: | |
return False | |
experiment_names.add(experiment['name']) | |
experiment_insert = "INSERT OR IGNORE INTO `experiments` (`experiment_id`, `name`, `artifact_location`, `lifecycle_stage`) VALUES ({0}, '{1}', '{2}', '{3}');".format( | |
experiment['experiment_id'], | |
experiment['name'], | |
experiment['artifact_location'], | |
experiment.get('lifecycle_stage','active')) | |
print("-- {root}/{experiment}".format(root=rootDir, experiment=experiment['experiment_id'])) | |
print(experiment_insert) | |
for run_uuid in os.listdir("{experiment}".format(experiment=EPATH)): | |
if run_uuid in ['meta.yaml']: | |
continue | |
RPATH = "{experiment}/{run}".format(experiment=EPATH, run=run_uuid) | |
if partial_update is True: | |
diff = int(NOW.timestamp()) - int(min(os.path.getctime(RPATH), os.path.getmtime(RPATH))) | |
if (diff > int(partial_last_seconds)): | |
continue | |
if (diff < int(partial_since_seconds)): | |
continue | |
run = load_metadata_file("{run}/meta.yaml".format(run=RPATH)) | |
if run is None: | |
continue | |
run['run_uuid'] = run.get('run_uuid', run_uuid) | |
if run.get('name', '') == '': | |
run_name_fname = Path("{experiment}/{run}/tags/mlflow.runName".format(experiment=EPATH, run=run_uuid)) | |
if os.path.isfile(run_name_fname): | |
with codecs.open(run_name_fname, mode='r', encoding="utf-8") as f: | |
line = f.read() | |
run['name'] = line.strip().replace("\\", "\\\\").replace("'", "\\'") if "'" in line else line | |
run_insert = "INSERT OR IGNORE INTO `runs` (" \ | |
"`run_uuid`, `name`, `source_type`, `source_name`, `entry_point_name`, `user_id`, `status`, `start_time`, `end_time`, `source_version`, `lifecycle_stage`, `artifact_uri`, `experiment_id`" \ | |
") VALUES ( '{0}', '{1}', '{2}', '{3}', '{4}', '{5}', '{6}', {7}, {8}, '{9}', '{10}', '{11}', {12});".format( | |
run['run_uuid'], | |
run['name'], | |
sourceType[int(run['source_type'])], | |
run['source_name'], | |
run['entry_point_name'], | |
run['user_id'], | |
status[int(run['status'])], | |
run['start_time'], | |
"NULL" if run['end_time'] is None else run['end_time'], | |
run['source_version'], | |
run.get('lifecycle_stage', 'active'), | |
run['artifact_uri'], | |
experiment_id) | |
print(run_insert) | |
# Tags | |
tag_path = "{experiment}/{run}/tags".format(experiment=EPATH, run=run_uuid) | |
for tag_fname in Path(tag_path).rglob("*"): | |
if os.path.isdir(tag_fname): | |
continue | |
tag = str(tag_fname.relative_to(tag_path)) | |
with codecs.open(tag_fname, mode='r', encoding="utf-8") as f: | |
line = f.read() | |
tag_insert = "INSERT OR IGNORE INTO `tags` (" \ | |
"`key`, `value`, `run_uuid`" \ | |
") VALUES ( '{0}', '{1}', '{2}' );".format( | |
tag, | |
line.strip().replace("\\", "\\\\").replace("'", "\\'") if "'" in line else line, | |
run['run_uuid']) | |
print(tag_insert) | |
# Metrics | |
metrics_path = "{experiment}/{run}/metrics".format(experiment=EPATH, run=run_uuid) | |
for metrics_fname in Path(metrics_path).rglob("*"): | |
if os.path.isdir(metrics_fname): | |
continue | |
with open(metrics_fname,'r') as f: | |
lines = list(f.readlines()) | |
metric = str(metrics_fname.relative_to(metrics_path)) | |
if only_last_metric: | |
lines = [lines[-1]] | |
for line in lines: | |
#split | |
timestamp, val, step = line.split() | |
metric_insert = "INSERT OR IGNORE INTO `metrics` (" \ | |
"`key`, `value`, `timestamp`, `step`, `run_uuid`" \ | |
") VALUES ( '{0}', '{1}', {2}, {3}, '{4}' );".format( | |
metric, | |
val, | |
int(timestamp), int(step), | |
run_uuid) | |
print(metric_insert) | |
line = f.readline() | |
# Params | |
param_path = "{experiment}/{run}/params".format(experiment=EPATH, run=run_uuid) | |
for param_fname in Path(param_path).rglob("*"): | |
if os.path.isdir(param_fname): | |
continue | |
param = str(param_fname.relative_to(param_path)) | |
with codecs.open(param_fname, mode='r', encoding="utf-8") as f: | |
line = f.read() | |
param_insert = "INSERT OR IGNORE INTO `params` (" \ | |
"`key`, `value`, `run_uuid`" \ | |
") VALUES ( '{0}', '{1}', '{2}' );".format( | |
param.replace("'", "\\'") if "'" in param else param, | |
line.strip().replace("\\", "\\\\").replace("'", "\\'") if "'" in line else line, | |
run_uuid) | |
print(param_insert) | |
return True | |
def main(): | |
""" | |
Execution for me was: | |
`python migrate_data.py > ./migration_inserts.sql` | |
`mysq < ./migration_inserts.sql` | |
NOTE: A few things to know about the script here: | |
- Artifacts were stored remotely, so no artifact migration | |
- experiment source_version is never set | |
- experiment lifecycle_stage is always active for us, I avoided the mapping from int -> str | |
- metric timestamp is made up, since it was tracked as an array in filesystem and as an epoch in the DB | |
""" | |
args = parse_args() | |
selected_experiments: str = args.experiment_name | |
if selected_experiments is None: | |
num_experiments = len(os.listdir(args.mlruns_dir)) - 1 | |
error(f"Migration of {num_experiments} experiments") | |
else: | |
selected_experiments = set(selected_experiments.split(",")) | |
num_experiments = len(selected_experiments) | |
error( | |
f"Migration of {num_experiments} experiments: {args.experiment_name}") | |
print("-- MLFlow SQL Dump %s" % datetime.now()) | |
if args.wipe_db is True: | |
print(""" | |
SET FOREIGN_KEY_CHECKS=0; | |
TRUNCATE `runs`; | |
TRUNCATE `experiments`; | |
TRUNCATE `metrics`; | |
TRUNCATE `params`; | |
TRUNCATE `tags`; | |
SET FOREIGN_KEY_CHECKS=1; | |
-- ALTER TABLE `params` MODIFY value VARCHAR(6512) NOT NULL; | |
""") | |
experiment_names = set() | |
for _step, experiment_id in enumerate(os.listdir(args.mlruns_dir)): | |
if experiment_id in ['.trash']: | |
continue | |
considered = process_experiment(rootDir=args.mlruns_dir, experiment_id=experiment_id, | |
experiment_names=experiment_names, | |
partial_update=args.partial_update, | |
partial_last_seconds=args.partial_last_seconds, | |
partial_since_seconds=args.partial_since_seconds, | |
filter_name=selected_experiments, | |
only_last_metric=args.only_last_metric) | |
if considered: | |
progress(_step, num_experiments) | |
progress(num_experiments, num_experiments) | |
error("..."*5, end="\r") | |
if args.experiment_name is not None and args.experiment_name not in experiment_names: | |
error("Could not find selected experiment.") | |
error("DONE") | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- alembic_version definition | |
CREATE TABLE alembic_version ( | |
version_num VARCHAR(32) NOT NULL, | |
CONSTRAINT alembic_version_pkc PRIMARY KEY (version_num) | |
); | |
-- experiments definition | |
CREATE TABLE experiments ( | |
experiment_id INTEGER NOT NULL, | |
name VARCHAR(256) NOT NULL, | |
artifact_location VARCHAR(256), | |
lifecycle_stage VARCHAR(32), | |
CONSTRAINT experiment_pk PRIMARY KEY (experiment_id), | |
CONSTRAINT experiments_lifecycle_stage CHECK (lifecycle_stage IN ('active', 'deleted')), | |
UNIQUE (name) | |
); | |
-- registered_models definition | |
CREATE TABLE registered_models ( | |
name VARCHAR(256) NOT NULL, | |
creation_time BIGINT, | |
last_updated_time BIGINT, | |
description VARCHAR(5000), | |
CONSTRAINT registered_model_pk PRIMARY KEY (name), | |
UNIQUE (name) | |
); | |
-- experiment_tags definition | |
CREATE TABLE experiment_tags ( | |
"key" VARCHAR(250) NOT NULL, | |
value VARCHAR(5000), | |
experiment_id INTEGER NOT NULL, | |
CONSTRAINT experiment_tag_pk PRIMARY KEY ("key", experiment_id), | |
FOREIGN KEY(experiment_id) REFERENCES experiments (experiment_id) | |
); | |
-- model_versions definition | |
CREATE TABLE "model_versions" ( | |
name VARCHAR(256) NOT NULL, | |
version INTEGER NOT NULL, | |
creation_time BIGINT, | |
last_updated_time BIGINT, | |
description VARCHAR(5000), | |
user_id VARCHAR(256), | |
current_stage VARCHAR(20), | |
source VARCHAR(500), | |
run_id VARCHAR(32), | |
status VARCHAR(20), | |
status_message VARCHAR(500), | |
run_link VARCHAR(500), | |
CONSTRAINT model_version_pk PRIMARY KEY (name, version), | |
FOREIGN KEY(name) REFERENCES registered_models (name) ON UPDATE CASCADE | |
); | |
-- registered_model_tags definition | |
CREATE TABLE registered_model_tags ( | |
"key" VARCHAR(250) NOT NULL, | |
value VARCHAR(5000), | |
name VARCHAR(256) NOT NULL, | |
CONSTRAINT registered_model_tag_pk PRIMARY KEY ("key", name), | |
FOREIGN KEY(name) REFERENCES registered_models (name) ON UPDATE cascade | |
); | |
-- runs definition | |
CREATE TABLE "runs" ( | |
run_uuid VARCHAR(32) NOT NULL, | |
name VARCHAR(250), | |
source_type VARCHAR(20), | |
source_name VARCHAR(500), | |
entry_point_name VARCHAR(50), | |
user_id VARCHAR(256), | |
status VARCHAR(9), | |
start_time BIGINT, | |
end_time BIGINT, | |
source_version VARCHAR(50), | |
lifecycle_stage VARCHAR(20), | |
artifact_uri VARCHAR(200), | |
experiment_id INTEGER, | |
CONSTRAINT run_pk PRIMARY KEY (run_uuid), | |
CONSTRAINT source_type CHECK (source_type IN ('NOTEBOOK', 'JOB', 'LOCAL', 'UNKNOWN', 'PROJECT')), | |
CONSTRAINT runs_lifecycle_stage CHECK (lifecycle_stage IN ('active', 'deleted')), | |
CHECK (status IN ('SCHEDULED', 'FAILED', 'FINISHED', 'RUNNING', 'KILLED')), | |
FOREIGN KEY(experiment_id) REFERENCES experiments (experiment_id) | |
); | |
-- tags definition | |
CREATE TABLE "tags" ( | |
"key" VARCHAR(250) NOT NULL, | |
value VARCHAR(5000), | |
run_uuid VARCHAR(32) NOT NULL, | |
CONSTRAINT tag_pk PRIMARY KEY ("key", run_uuid), | |
FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid) | |
); | |
-- latest_metrics definition | |
CREATE TABLE "latest_metrics" ( | |
"key" VARCHAR(250) NOT NULL, | |
value FLOAT NOT NULL, | |
timestamp BIGINT, | |
step BIGINT NOT NULL, | |
is_nan BOOLEAN NOT NULL, | |
run_uuid VARCHAR(32) NOT NULL, | |
CONSTRAINT latest_metric_pk PRIMARY KEY ("key", run_uuid), | |
CHECK (is_nan IN (0, 1)), | |
FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid) | |
); | |
-- metrics definition | |
CREATE TABLE "metrics" ( | |
"key" VARCHAR(250) NOT NULL, | |
value FLOAT NOT NULL, | |
timestamp BIGINT NOT NULL, | |
run_uuid VARCHAR(32) NOT NULL, | |
step BIGINT DEFAULT '0' NOT NULL, | |
is_nan BOOLEAN DEFAULT '0' NOT NULL, | |
CONSTRAINT metric_pk PRIMARY KEY ("key", timestamp, step, run_uuid, value, is_nan), | |
CHECK (is_nan IN (0, 1)), | |
FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid) | |
); | |
-- model_version_tags definition | |
CREATE TABLE model_version_tags ( | |
"key" VARCHAR(250) NOT NULL, | |
value VARCHAR(5000), | |
name VARCHAR(256) NOT NULL, | |
version INTEGER NOT NULL, | |
CONSTRAINT model_version_tag_pk PRIMARY KEY ("key", name, version), | |
FOREIGN KEY(name, version) REFERENCES model_versions (name, version) ON UPDATE cascade | |
); | |
-- params definition | |
CREATE TABLE params ( | |
"key" VARCHAR(250) NOT NULL, | |
value VARCHAR(250) NOT NULL, | |
run_uuid VARCHAR(32) NOT NULL, | |
CONSTRAINT param_pk PRIMARY KEY ("key", run_uuid), | |
FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid) | |
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
CMD_DIR="${BASH_SOURCE%/*}" | |
SCRIPT=$(basename $0) | |
SOURCE=${1:-"mlruns"} | |
TARGET=${2:-"mlflow.db"} | |
EXP_NAME=${3:-$EXPERIMENT_NAME} | |
EXIT=0 | |
if [ -z "$SOURCE" ]; then | |
echo "Source directory missing." | |
EXIT=1 | |
fi | |
if [ -z "$TARGET" ]; then | |
echo "Target DB path missing." | |
EXIT=1 | |
fi | |
if [ ! -z "$EXP_NAME" ]; then | |
EXP_NAME="--experiment-name $EXP_NAME" | |
fi | |
if [ ! -z "$ONLY_LAST_METRIC" ]; then | |
OLM="--only-last-metric" | |
else | |
OLM="" | |
fi | |
if [ "$EXIT" == "1" ]; then | |
echo "Usage: $SCRIPT [source dir] [target db] [experiment name]" | |
exit 1 | |
fi | |
# Create DB: | |
echo "Initializing database at $TARGET..." | |
[ -f "$TARGET" ] && rm $TARGET | |
if cat $CMD_DIR/mlflow_schema.sql | sqlite3 $TARGET; then | |
echo "Created DB schema." | |
else | |
echo "Schema import failed." | |
exit 1 | |
fi | |
# Import MLFlow data: | |
echo "Importing MLFlow data from $SOURCE..." | |
PRAGMA="PRAGMA journal_mode = WAL; PRAGMA synchronous = OFF;" | |
{ echo $PRAGMA & $CMD_DIR/migrate_data.py --mlruns-dir $SOURCE $EXP_NAME $OLM; } \ | |
| sqlite3 $TARGET | |
if [ $? -eq 0 ]; then | |
echo "Successfully imported data." | |
else | |
echo "Data import failed." | |
exit 1 | |
fi |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment