-
-
Save weldpua2008/7f0c4644d247bd0fc7ba9a83c2d337d5 to your computer and use it in GitHub Desktop.
MLFlow migration script from filesystem to database tracking data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Migration From FileBased store of MLFlow 0.8 up to 1.8.0 | |
Credits: | |
* Based on work of bnekolny https://gist.github.com/bnekolny/8ec96faa0d57913eea6bb7848c06b912 | |
* Latest version https://gist.github.com/weldpua2008/7f0c4644d247bd0fc7ba9a83c2d337d5 | |
requirements: | |
pyyaml version 5.1 was required | |
pip3.6 install -U pyyaml | |
Usage: | |
* Import all experements | |
migrate_data.py.py \ | |
--wipe-db \ | |
--mlruns-dir /opt/mlflow/mlruns > /tmp/migration_inserts_full.sql | |
mysql -D mlflow_storage < /tmp/migration_inserts_full.sql 2>> /var/log/mlflow-migration-full.log | |
* Run periodically import(every 10 minutes by cron) of the latest runs | |
migrate_data.py.py \ | |
--mlruns-dir /opt/mlflow/mlruns --partial-update --partial-last-seconds 1000 \ | |
--mlruns-dir /opt/mlflow/mlruns > /tmp/migration_inserts_partial.sql | |
mysql -D mlflow_storage < /tmp/migration_inserts_partial.sql 2>> /var/log/mlflow-migration-full.log | |
Important: | |
* To fix Cannot import data ERROR 1406 (22001): Data too long (https://github.com/mlflow/mlflow/issues/2814) | |
You can change schema (uncomment ALTER TABLE `params` MODIFY value VARCHAR(6512) NOT NULL;) | |
""" | |
import os | |
from pathlib import Path | |
import sys | |
import yaml | |
import codecs | |
import argparse | |
from functools import partial | |
from datetime import datetime | |
error = partial(print, file=sys.stderr) | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--mlruns-dir", | |
type=Path, | |
default=Path("/opt/mlflow/mlruns/"), | |
help="Path to the MLflow runs data directory", | |
) | |
parser.add_argument( | |
'--wipe-db', | |
action="store_true", | |
default=False, | |
help="Add SQL statements for flush all data in database") | |
parser.add_argument( | |
'--partial-update', | |
action="store_true", | |
default=False, | |
help="Prepare partial SQL statements") | |
parser.add_argument( | |
"--partial-last-seconds", | |
type=int, | |
default=900, | |
help="Prepare dump for MLFlow Runs recorded since last seconds", | |
) | |
parser.add_argument( | |
"--partial-since-seconds", | |
type=int, | |
default=120, | |
help="Skip MLFlow Runs created less then since last seconds", | |
) | |
return parser.parse_args() | |
def progress(_cur, _max): | |
p = round(100*_cur/_max) | |
b = f"Progress: {p}% - ["+"."*int(p/5)+" "*(20-int(p/5))+"]" | |
error(b, end="\r") | |
def load_metadata_file(fpath): | |
if os.path.exists(fpath): | |
with open(fpath) as fp: | |
try: | |
return yaml.full_load(fp) | |
except AttributeError: | |
return yaml.load(fp) | |
def process_experiment(rootDir, experiment_id, experiment_names, partial_update, | |
partial_last_seconds, partial_since_seconds): | |
status = {1: 'RUNNING', 2: 'SCHEDULED', 3: 'FINISHED', 4: 'FAILED', 5: 'KILLED'} | |
sourceType = {1: 'NOTEBOOK', 2: 'JOB', 3: 'PROJECT', 4: 'LOCAL', 5: 'UNKNOWN'} | |
EPATH = "{root}/{experiment}".format(root=rootDir, experiment=experiment_id) | |
NOW = datetime.now() | |
experiment = load_metadata_file("{experiment}/meta.yaml".format(experiment=EPATH)) | |
if experiment is None: | |
return | |
experiment['experiment_id'] = experiment.get('experiment_id', experiment_id) | |
if experiment_id == 0 or experiment_id == '0': | |
print("SET sql_mode='NO_AUTO_VALUE_ON_ZERO';") | |
if len(experiment['name']) < 1: | |
error("experiment name is empty at {experiment}/meta.yaml".format(experiment=EPATH)) | |
return | |
if experiment['name'] in experiment_names: | |
error("experiment {name} exists, appending _".format(name=experiment['name'])) | |
experiment['name'] = "{}_".format(experiment['name']) | |
experiment_names.add(experiment['name']) | |
experiment_insert = "INSERT IGNORE INTO `experiments` (`experiment_id`, `name`, `artifact_location`, `lifecycle_stage`) VALUES ({0}, '{1}', '{2}', '{3}');".format( | |
experiment['experiment_id'], | |
experiment['name'], | |
experiment['artifact_location'], | |
experiment.get('lifecycle_stage','active')) | |
print("-- {root}/{experiment}".format(root=rootDir, experiment=experiment['experiment_id'])) | |
print(experiment_insert) | |
for run_uuid in os.listdir("{experiment}".format(experiment=EPATH)): | |
if run_uuid in ['meta.yaml']: | |
continue | |
RPATH = "{experiment}/{run}".format(experiment=EPATH, run=run_uuid) | |
if partial_update is True: | |
diff = int(NOW.timestamp()) - int(min(os.path.getctime(RPATH), os.path.getmtime(RPATH))) | |
if (diff > int(partial_last_seconds)): | |
continue | |
if (diff < int(partial_since_seconds)): | |
continue | |
run = load_metadata_file("{run}/meta.yaml".format(run=RPATH)) | |
if run is None: | |
continue | |
run['run_uuid'] = run.get('run_uuid', run_uuid) | |
run_insert = "INSERT IGNORE INTO `runs` (" \ | |
"`run_uuid`, `name`, `source_type`, `source_name`, `entry_point_name`, `user_id`, `status`, `start_time`, `end_time`, `source_version`, `lifecycle_stage`, `artifact_uri`, `experiment_id`" \ | |
") VALUES ( '{0}', '{1}', '{2}', '{3}', '{4}', '{5}', '{6}', {7}, {8}, '{9}', '{10}', '{11}', {12});".format( | |
run['run_uuid'], | |
run['name'], | |
sourceType[int(run['source_type'])], | |
run['source_name'], | |
run['entry_point_name'], | |
run['user_id'], | |
status[int(run['status'])], | |
run['start_time'], | |
"NULL" if run['end_time'] is None else run['end_time'], | |
run['source_version'], | |
run.get('lifecycle_stage', 'active'), | |
run['artifact_uri'], | |
experiment_id) | |
print(run_insert) | |
# Tags | |
tag_path = "{experiment}/{run}/tags".format(experiment=EPATH, run=run_uuid) | |
for tag_fname in Path(tag_path).rglob("*"): | |
if os.path.isdir(tag_fname): | |
continue | |
tag = str(tag_fname.relative_to(tag_path)) | |
with codecs.open(tag_fname, mode='r', encoding="utf-8") as f: | |
line = f.read() | |
tag_insert = "INSERT IGNORE INTO `tags` (" \ | |
"`key`, `value`, `run_uuid`" \ | |
") VALUES ( '{0}', '{1}', '{2}' );".format( | |
tag, | |
line.strip().replace("\\", "\\\\").replace("'", "\\'") if "'" in line else line, | |
run['run_uuid']) | |
print(tag_insert) | |
# Metrics | |
metrics_path = "{experiment}/{run}/metrics".format(experiment=EPATH, run=run_uuid) | |
for metrics_fname in Path(metrics_path).rglob("*"): | |
if os.path.isdir(metrics_fname): | |
continue | |
with open(metrics_fname,'r') as f: | |
lines = set(f.readlines()) | |
metric = str(metrics_fname.relative_to(metrics_path)) | |
for line in lines: | |
#split | |
counter, val = line.split() | |
metric_insert = "INSERT IGNORE INTO `metrics` (" \ | |
"`key`, `value`, `timestamp`, `run_uuid`" \ | |
") VALUES ( '{0}', '{1}', {2}, '{3}' );".format( | |
metric, | |
val, | |
int(run['start_time']) + int(counter), | |
run_uuid) | |
print(metric_insert) | |
line = f.readline() | |
# Params | |
param_path = "{experiment}/{run}/params".format(experiment=EPATH, run=run_uuid) | |
for param_fname in Path(param_path).rglob("*"): | |
if os.path.isdir(param_fname): | |
continue | |
param = str(param_fname.relative_to(param_path)) | |
with codecs.open(param_fname, mode='r', encoding="utf-8") as f: | |
line = f.read() | |
param_insert = "INSERT IGNORE INTO `params` (" \ | |
"`key`, `value`, `run_uuid`" \ | |
") VALUES ( '{0}', '{1}', '{2}' );".format( | |
param.replace("'", "\\'") if "'" in param else param, | |
line.strip().replace("\\", "\\\\").replace("'", "\\'") if "'" in line else line, | |
run_uuid) | |
print(param_insert) | |
def main(): | |
""" | |
Execution for me was: | |
`python migrate_data.py > ./migration_inserts.sql` | |
`mysq < ./migration_inserts.sql` | |
NOTE: A few things to know about the script here: | |
- Artifacts were stored remotely, so no artifact migration | |
- experiment source_version is never set | |
- experiment lifecycle_stage is always active for us, I avoided the mapping from int -> str | |
- metric timestamp is made up, since it was tracked as an array in filesystem and as an epoch in the DB | |
""" | |
args = parse_args() | |
num_experiments = len(os.listdir(args.mlruns_dir))+1 | |
error(f"Migration of {num_experiments-1} experements") | |
print("-- MLFlow SQL Dump %s" % datetime.now()) | |
if args.wipe_db is True: | |
print(""" | |
SET FOREIGN_KEY_CHECKS=0; | |
TRUNCATE `runs`; | |
TRUNCATE `experiments`; | |
TRUNCATE `metrics`; | |
TRUNCATE `params`; | |
TRUNCATE `tags`; | |
SET FOREIGN_KEY_CHECKS=1; | |
-- ALTER TABLE `params` MODIFY value VARCHAR(6512) NOT NULL; | |
""") | |
experiment_names = set() | |
for _step, experiment_id in enumerate(os.listdir(args.mlruns_dir)): | |
if experiment_id in ['.trash']: | |
continue | |
process_experiment(rootDir=args.mlruns_dir, experiment_id=experiment_id, | |
experiment_names=experiment_names, | |
partial_update=args.partial_update, | |
partial_last_seconds=args.partial_last_seconds, | |
partial_since_seconds=args.partial_since_seconds) | |
progress(_step, num_experiments) | |
if experiment_id in ['.trash']: | |
continue | |
progress(num_experiments, num_experiments) | |
error("..."*5, end="\r") | |
error("DONE") | |
if __name__ == '__main__': | |
main() |
Partial update for crontab:
/usr/local/bin/mlflow-migration-partial
#!/usr/bin/env bash
export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
_lock_file_key=$(basename $0)
LOCK_FILE="/tmp/${_lock_file_key:-partial-migration-v1}.lock"
export MYSQL_USER=root
export MYSQL_PASSWORD=password
export MYSQL_HOST=localhost
export MYSQL_DBNAME=db
function on_exit()
{
local _exit_code=${1:-1}
if [[ -e "${LOCK_FILE}" ]];then
rm -f ${LOCK_FILE} || sudo rm -f ${LOCK_FILE}
fi
exit $_exit_code
}
if [[ ! -e "${LOCK_FILE}" ]]; then
trap 'on_exit $?' EXIT HUP TERM INT
touch "${LOCK_FILE}"
[[ -e /tmp/migration_inserts_partial.sql ]] && rm -f /tmp/migration_inserts_partial.sql
/usr/local/bin/mlflow-db-dump.py \
--partial-update --partial-last-seconds 1000 \
--mlruns-dir /opt/mlflow/mlruns > /tmp/migration_inserts_partial.sql \
2>> /var/log/mlflow-migration-partial.log
[[ -e /tmp/migration_inserts_partial.sql ]] \
&& mysql -u ${MYSQL_USER} \
-p"${MYSQL_PASSWORD}" \
-h ${MYSQL_HOST} \
-D ${MYSQL_DBNAME} < /tmp/migration_inserts_partial.sql 2>> /var/log/mlflow-migration-partial.log
rm -f "${LOCK_FILE}"
fi
you can add to crontab (crontab -e
) the following record
*/10 * * * * /usr/local/bin/mlflow-migration-partial
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Scripts for full migration
/usr/local/bin/mlflow-migration-full
and the above file is placed at/usr/local/bin/migrate_data.py