Created
May 16, 2022 11:57
-
-
Save pspeter/13c18ee6cf247a5ee2b3e3b2ee8a5470 to your computer and use it in GitHub Desktop.
Migrate airflow logs from v2.2.x to v2.3.x folder structure
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
from pathlib import Path | |
from airflow.settings import Session | |
log_dir = Path("/opt/airflow/logs") | |
session = Session() | |
tis_iter = session.execute("SELECT * FROM task_instance;") | |
tis = pd.DataFrame(tis_iter, columns=["task_id", "dag_id", "date1", "date2", "num0", "status", "num1", "hash", "user", "num2", "pool", "str1", "num3", "operator", "date3", "num4", "num5", "mem_addr", "num6", "num7", "longid", "None1", "None2", "None3", "None4", "scheduled_task_id", "num8"]) | |
def get_run_id(dag_id, task_id, ts): | |
tis[tis["dag_id"].eq(dag_id) & tis["task_id"].eq(task_id) & tis["ts"].eq(ts)]["scheduled_task_id"].iloc[0] | |
def run(): | |
for p1 in (a for a in log_dir.iterdir() if not a.name.startswith("dag_id=") and a.name not in ("scheduler", "dag_processor_manager")): | |
for p2 in p1.iterdir(): | |
for p3 in p2.iterdir(): | |
for f in p3.iterdir(): | |
attempt = f.name | |
ts = f.parent.name | |
task_id = f.parent.parent.name | |
dag_id = f.parent.parent.parent.name | |
try: | |
run_id = get_run_id(dag_id, task_id, ts) | |
except: | |
continue | |
target_dir = log_dir / f"dag_id={dag_id}" / f"run_id={run_id}" / f"task_id={task_id}" | |
target_dir.mkdir(parents=True, exist_ok=True) | |
target_file = target_dir / f"attempt={attempt}" | |
f.rename(target_file) | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment