Skip to content

Instantly share code, notes, and snippets.

@Cortys
Forked from weldpua2008/migrate_data.py
Last active March 11, 2022 00:23
Show Gist options
  • Save Cortys/f349cdf22365d8c47159b8e5a5ab1691 to your computer and use it in GitHub Desktop.
Save Cortys/f349cdf22365d8c47159b8e5a5ab1691 to your computer and use it in GitHub Desktop.
MLFlow migration script from filesystem to database tracking data. Adapted to support MLFlow 1.18+ and SQLite.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Migration From FileBased store of MLFlow 0.8 up to 1.8.0
Credits:
* Based on work of bnekolny https://gist.github.com/bnekolny/8ec96faa0d57913eea6bb7848c06b912
* Latest version https://gist.github.com/weldpua2008/7f0c4644d247bd0fc7ba9a83c2d337d5
requirements:
pyyaml version 5.1 was required
pip3.6 install -U pyyaml
Usage:
* Import all experiments
migrate_data.py.py \
--wipe-db \
--mlruns-dir /opt/mlflow/mlruns > /tmp/migration_inserts_full.sql
mysql -D mlflow_storage < /tmp/migration_inserts_full.sql 2>> /var/log/mlflow-migration-full.log
* Run periodically import(every 10 minutes by cron) of the latest runs
migrate_data.py.py \
--mlruns-dir /opt/mlflow/mlruns --partial-update --partial-last-seconds 1000 \
--mlruns-dir /opt/mlflow/mlruns > /tmp/migration_inserts_partial.sql
mysql -D mlflow_storage < /tmp/migration_inserts_partial.sql 2>> /var/log/mlflow-migration-full.log
Important:
* To fix Cannot import data ERROR 1406 (22001): Data too long (https://github.com/mlflow/mlflow/issues/2814)
You can change schema (uncomment ALTER TABLE `params` MODIFY value VARCHAR(6512) NOT NULL;)
"""
import os
from pathlib import Path
import sys
import yaml
import codecs
import argparse
from functools import partial
from datetime import datetime
error = partial(print, file=sys.stderr)
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--mlruns-dir",
type=Path,
default=Path("/opt/mlflow/mlruns/"),
help="Path to the MLflow runs data directory",
)
parser.add_argument(
"--experiment-name",
type=str,
default=None,
help="Name of the experiment to be migrated. All are selected by default.")
parser.add_argument(
'--wipe-db',
action="store_true",
default=False,
help="Add SQL statements for flush all data in database")
parser.add_argument(
'--partial-update',
action="store_true",
default=False,
help="Prepare partial SQL statements")
parser.add_argument(
"--partial-last-seconds",
type=int,
default=900,
help="Prepare dump for MLFlow Runs recorded since last seconds",
)
parser.add_argument(
"--partial-since-seconds",
type=int,
default=120,
help="Skip MLFlow Runs created less then since last seconds",
)
parser.add_argument(
"--only-last-metric",
action="store_true",
default=False,
help="Only include the last value of each metric.",
)
return parser.parse_args()
def progress(_cur, _max):
p = round(100*_cur/_max)
b = f"Progress: {p}% - ["+"."*int(p/5)+" "*(20-int(p/5))+"]"
error(b, end="\r")
def load_metadata_file(fpath):
if os.path.exists(fpath):
with open(fpath) as fp:
try:
return yaml.full_load(fp)
except AttributeError:
return yaml.load(fp)
def process_experiment(rootDir, experiment_id, experiment_names, partial_update,
partial_last_seconds, partial_since_seconds, filter_name=None,
only_last_metric=False):
status = {1: 'RUNNING', 2: 'SCHEDULED', 3: 'FINISHED', 4: 'FAILED', 5: 'KILLED'}
sourceType = {1: 'NOTEBOOK', 2: 'JOB', 3: 'PROJECT', 4: 'LOCAL', 5: 'UNKNOWN'}
EPATH = "{root}/{experiment}".format(root=rootDir, experiment=experiment_id)
NOW = datetime.now()
experiment = load_metadata_file("{experiment}/meta.yaml".format(experiment=EPATH))
if experiment is None:
return True
experiment['experiment_id'] = experiment.get('experiment_id', experiment_id)
if experiment_id == 0 or experiment_id == '0':
print("--SET sql_mode='NO_AUTO_VALUE_ON_ZERO';")
if len(experiment['name']) < 1:
error("experiment name is empty at {experiment}/meta.yaml".format(experiment=EPATH))
return True
if experiment['name'] in experiment_names:
error("experiment {name} exists, appending _".format(name=experiment['name']))
experiment['name'] = "{}_".format(experiment['name'])
if filter_name is not None and experiment['name'] not in filter_name:
return False
experiment_names.add(experiment['name'])
experiment_insert = "INSERT OR IGNORE INTO `experiments` (`experiment_id`, `name`, `artifact_location`, `lifecycle_stage`) VALUES ({0}, '{1}', '{2}', '{3}');".format(
experiment['experiment_id'],
experiment['name'],
experiment['artifact_location'],
experiment.get('lifecycle_stage','active'))
print("-- {root}/{experiment}".format(root=rootDir, experiment=experiment['experiment_id']))
print(experiment_insert)
for run_uuid in os.listdir("{experiment}".format(experiment=EPATH)):
if run_uuid in ['meta.yaml']:
continue
RPATH = "{experiment}/{run}".format(experiment=EPATH, run=run_uuid)
if partial_update is True:
diff = int(NOW.timestamp()) - int(min(os.path.getctime(RPATH), os.path.getmtime(RPATH)))
if (diff > int(partial_last_seconds)):
continue
if (diff < int(partial_since_seconds)):
continue
run = load_metadata_file("{run}/meta.yaml".format(run=RPATH))
if run is None:
continue
run['run_uuid'] = run.get('run_uuid', run_uuid)
if run.get('name', '') == '':
run_name_fname = Path("{experiment}/{run}/tags/mlflow.runName".format(experiment=EPATH, run=run_uuid))
if os.path.isfile(run_name_fname):
with codecs.open(run_name_fname, mode='r', encoding="utf-8") as f:
line = f.read()
run['name'] = line.strip().replace("\\", "\\\\").replace("'", "\\'") if "'" in line else line
run_insert = "INSERT OR IGNORE INTO `runs` (" \
"`run_uuid`, `name`, `source_type`, `source_name`, `entry_point_name`, `user_id`, `status`, `start_time`, `end_time`, `source_version`, `lifecycle_stage`, `artifact_uri`, `experiment_id`" \
") VALUES ( '{0}', '{1}', '{2}', '{3}', '{4}', '{5}', '{6}', {7}, {8}, '{9}', '{10}', '{11}', {12});".format(
run['run_uuid'],
run['name'],
sourceType[int(run['source_type'])],
run['source_name'],
run['entry_point_name'],
run['user_id'],
status[int(run['status'])],
run['start_time'],
"NULL" if run['end_time'] is None else run['end_time'],
run['source_version'],
run.get('lifecycle_stage', 'active'),
run['artifact_uri'],
experiment_id)
print(run_insert)
# Tags
tag_path = "{experiment}/{run}/tags".format(experiment=EPATH, run=run_uuid)
for tag_fname in Path(tag_path).rglob("*"):
if os.path.isdir(tag_fname):
continue
tag = str(tag_fname.relative_to(tag_path))
with codecs.open(tag_fname, mode='r', encoding="utf-8") as f:
line = f.read()
tag_insert = "INSERT OR IGNORE INTO `tags` (" \
"`key`, `value`, `run_uuid`" \
") VALUES ( '{0}', '{1}', '{2}' );".format(
tag,
line.strip().replace("\\", "\\\\").replace("'", "\\'") if "'" in line else line,
run['run_uuid'])
print(tag_insert)
# Metrics
metrics_path = "{experiment}/{run}/metrics".format(experiment=EPATH, run=run_uuid)
for metrics_fname in Path(metrics_path).rglob("*"):
if os.path.isdir(metrics_fname):
continue
with open(metrics_fname,'r') as f:
lines = list(f.readlines())
metric = str(metrics_fname.relative_to(metrics_path))
if only_last_metric:
lines = [lines[-1]]
for line in lines:
#split
timestamp, val, step = line.split()
metric_insert = "INSERT OR IGNORE INTO `metrics` (" \
"`key`, `value`, `timestamp`, `step`, `run_uuid`" \
") VALUES ( '{0}', '{1}', {2}, {3}, '{4}' );".format(
metric,
val,
int(timestamp), int(step),
run_uuid)
print(metric_insert)
line = f.readline()
# Params
param_path = "{experiment}/{run}/params".format(experiment=EPATH, run=run_uuid)
for param_fname in Path(param_path).rglob("*"):
if os.path.isdir(param_fname):
continue
param = str(param_fname.relative_to(param_path))
with codecs.open(param_fname, mode='r', encoding="utf-8") as f:
line = f.read()
param_insert = "INSERT OR IGNORE INTO `params` (" \
"`key`, `value`, `run_uuid`" \
") VALUES ( '{0}', '{1}', '{2}' );".format(
param.replace("'", "\\'") if "'" in param else param,
line.strip().replace("\\", "\\\\").replace("'", "\\'") if "'" in line else line,
run_uuid)
print(param_insert)
return True
def main():
"""
Execution for me was:
`python migrate_data.py > ./migration_inserts.sql`
`mysq < ./migration_inserts.sql`
NOTE: A few things to know about the script here:
- Artifacts were stored remotely, so no artifact migration
- experiment source_version is never set
- experiment lifecycle_stage is always active for us, I avoided the mapping from int -> str
- metric timestamp is made up, since it was tracked as an array in filesystem and as an epoch in the DB
"""
args = parse_args()
selected_experiments: str = args.experiment_name
if selected_experiments is None:
num_experiments = len(os.listdir(args.mlruns_dir)) - 1
error(f"Migration of {num_experiments} experiments")
else:
selected_experiments = set(selected_experiments.split(","))
num_experiments = len(selected_experiments)
error(
f"Migration of {num_experiments} experiments: {args.experiment_name}")
print("-- MLFlow SQL Dump %s" % datetime.now())
if args.wipe_db is True:
print("""
SET FOREIGN_KEY_CHECKS=0;
TRUNCATE `runs`;
TRUNCATE `experiments`;
TRUNCATE `metrics`;
TRUNCATE `params`;
TRUNCATE `tags`;
SET FOREIGN_KEY_CHECKS=1;
-- ALTER TABLE `params` MODIFY value VARCHAR(6512) NOT NULL;
""")
experiment_names = set()
for _step, experiment_id in enumerate(os.listdir(args.mlruns_dir)):
if experiment_id in ['.trash']:
continue
considered = process_experiment(rootDir=args.mlruns_dir, experiment_id=experiment_id,
experiment_names=experiment_names,
partial_update=args.partial_update,
partial_last_seconds=args.partial_last_seconds,
partial_since_seconds=args.partial_since_seconds,
filter_name=selected_experiments,
only_last_metric=args.only_last_metric)
if considered:
progress(_step, num_experiments)
progress(num_experiments, num_experiments)
error("..."*5, end="\r")
if args.experiment_name is not None and args.experiment_name not in experiment_names:
error("Could not find selected experiment.")
error("DONE")
if __name__ == '__main__':
main()
-- alembic_version definition
CREATE TABLE alembic_version (
version_num VARCHAR(32) NOT NULL,
CONSTRAINT alembic_version_pkc PRIMARY KEY (version_num)
);
-- experiments definition
CREATE TABLE experiments (
experiment_id INTEGER NOT NULL,
name VARCHAR(256) NOT NULL,
artifact_location VARCHAR(256),
lifecycle_stage VARCHAR(32),
CONSTRAINT experiment_pk PRIMARY KEY (experiment_id),
CONSTRAINT experiments_lifecycle_stage CHECK (lifecycle_stage IN ('active', 'deleted')),
UNIQUE (name)
);
-- registered_models definition
CREATE TABLE registered_models (
name VARCHAR(256) NOT NULL,
creation_time BIGINT,
last_updated_time BIGINT,
description VARCHAR(5000),
CONSTRAINT registered_model_pk PRIMARY KEY (name),
UNIQUE (name)
);
-- experiment_tags definition
CREATE TABLE experiment_tags (
"key" VARCHAR(250) NOT NULL,
value VARCHAR(5000),
experiment_id INTEGER NOT NULL,
CONSTRAINT experiment_tag_pk PRIMARY KEY ("key", experiment_id),
FOREIGN KEY(experiment_id) REFERENCES experiments (experiment_id)
);
-- model_versions definition
CREATE TABLE "model_versions" (
name VARCHAR(256) NOT NULL,
version INTEGER NOT NULL,
creation_time BIGINT,
last_updated_time BIGINT,
description VARCHAR(5000),
user_id VARCHAR(256),
current_stage VARCHAR(20),
source VARCHAR(500),
run_id VARCHAR(32),
status VARCHAR(20),
status_message VARCHAR(500),
run_link VARCHAR(500),
CONSTRAINT model_version_pk PRIMARY KEY (name, version),
FOREIGN KEY(name) REFERENCES registered_models (name) ON UPDATE CASCADE
);
-- registered_model_tags definition
CREATE TABLE registered_model_tags (
"key" VARCHAR(250) NOT NULL,
value VARCHAR(5000),
name VARCHAR(256) NOT NULL,
CONSTRAINT registered_model_tag_pk PRIMARY KEY ("key", name),
FOREIGN KEY(name) REFERENCES registered_models (name) ON UPDATE cascade
);
-- runs definition
CREATE TABLE "runs" (
run_uuid VARCHAR(32) NOT NULL,
name VARCHAR(250),
source_type VARCHAR(20),
source_name VARCHAR(500),
entry_point_name VARCHAR(50),
user_id VARCHAR(256),
status VARCHAR(9),
start_time BIGINT,
end_time BIGINT,
source_version VARCHAR(50),
lifecycle_stage VARCHAR(20),
artifact_uri VARCHAR(200),
experiment_id INTEGER,
CONSTRAINT run_pk PRIMARY KEY (run_uuid),
CONSTRAINT source_type CHECK (source_type IN ('NOTEBOOK', 'JOB', 'LOCAL', 'UNKNOWN', 'PROJECT')),
CONSTRAINT runs_lifecycle_stage CHECK (lifecycle_stage IN ('active', 'deleted')),
CHECK (status IN ('SCHEDULED', 'FAILED', 'FINISHED', 'RUNNING', 'KILLED')),
FOREIGN KEY(experiment_id) REFERENCES experiments (experiment_id)
);
-- tags definition
CREATE TABLE "tags" (
"key" VARCHAR(250) NOT NULL,
value VARCHAR(5000),
run_uuid VARCHAR(32) NOT NULL,
CONSTRAINT tag_pk PRIMARY KEY ("key", run_uuid),
FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid)
);
-- latest_metrics definition
CREATE TABLE "latest_metrics" (
"key" VARCHAR(250) NOT NULL,
value FLOAT NOT NULL,
timestamp BIGINT,
step BIGINT NOT NULL,
is_nan BOOLEAN NOT NULL,
run_uuid VARCHAR(32) NOT NULL,
CONSTRAINT latest_metric_pk PRIMARY KEY ("key", run_uuid),
CHECK (is_nan IN (0, 1)),
FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid)
);
-- metrics definition
CREATE TABLE "metrics" (
"key" VARCHAR(250) NOT NULL,
value FLOAT NOT NULL,
timestamp BIGINT NOT NULL,
run_uuid VARCHAR(32) NOT NULL,
step BIGINT DEFAULT '0' NOT NULL,
is_nan BOOLEAN DEFAULT '0' NOT NULL,
CONSTRAINT metric_pk PRIMARY KEY ("key", timestamp, step, run_uuid, value, is_nan),
CHECK (is_nan IN (0, 1)),
FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid)
);
-- model_version_tags definition
CREATE TABLE model_version_tags (
"key" VARCHAR(250) NOT NULL,
value VARCHAR(5000),
name VARCHAR(256) NOT NULL,
version INTEGER NOT NULL,
CONSTRAINT model_version_tag_pk PRIMARY KEY ("key", name, version),
FOREIGN KEY(name, version) REFERENCES model_versions (name, version) ON UPDATE cascade
);
-- params definition
CREATE TABLE params (
"key" VARCHAR(250) NOT NULL,
value VARCHAR(250) NOT NULL,
run_uuid VARCHAR(32) NOT NULL,
CONSTRAINT param_pk PRIMARY KEY ("key", run_uuid),
FOREIGN KEY(run_uuid) REFERENCES runs (run_uuid)
);
#!/usr/bin/env bash
CMD_DIR="${BASH_SOURCE%/*}"
SCRIPT=$(basename $0)
SOURCE=${1:-"mlruns"}
TARGET=${2:-"mlflow.db"}
EXP_NAME=${3:-$EXPERIMENT_NAME}
EXIT=0
if [ -z "$SOURCE" ]; then
echo "Source directory missing."
EXIT=1
fi
if [ -z "$TARGET" ]; then
echo "Target DB path missing."
EXIT=1
fi
if [ ! -z "$EXP_NAME" ]; then
EXP_NAME="--experiment-name $EXP_NAME"
fi
if [ ! -z "$ONLY_LAST_METRIC" ]; then
OLM="--only-last-metric"
else
OLM=""
fi
if [ "$EXIT" == "1" ]; then
echo "Usage: $SCRIPT [source dir] [target db] [experiment name]"
exit 1
fi
# Create DB:
echo "Initializing database at $TARGET..."
[ -f "$TARGET" ] && rm $TARGET
if cat $CMD_DIR/mlflow_schema.sql | sqlite3 $TARGET; then
echo "Created DB schema."
else
echo "Schema import failed."
exit 1
fi
# Import MLFlow data:
echo "Importing MLFlow data from $SOURCE..."
PRAGMA="PRAGMA journal_mode = WAL; PRAGMA synchronous = OFF;"
{ echo $PRAGMA & $CMD_DIR/migrate_data.py --mlruns-dir $SOURCE $EXP_NAME $OLM; } \
| sqlite3 $TARGET
if [ $? -eq 0 ]; then
echo "Successfully imported data."
else
echo "Data import failed."
exit 1
fi
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment