Last active
August 12, 2022 20:34
-
-
Save alxthm/b62eceb1e47f0320e5bd82a67f33e042 to your computer and use it in GitHub Desktop.
Python script to transform meltano schedules from a json format into a crontab file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import os | |
import sys | |
from pathlib import Path | |
from typing import List | |
""" | |
Transform meltano schedules from a json format into a crontab file. | |
Example: | |
$ meltano schedule list --format=json | python make_crontab.py | |
If you want to specify environment variables (such as CRONTAB_MELTANO_EXECUTABLE), | |
first load your `.env` file : | |
$ export $(grep -v '^#' .env | xargs) | |
""" | |
PROJECT_HOME = Path(__file__).parents[1].absolute() | |
# The meltano executable that you want to use in the crontab. | |
# Note: crontab does not use the $PATH that you have in your regular shell. | |
# | |
# Examples: | |
# - 'meltano' is fine if it is on the '/usr/local/bin:/usr/bin:/bin' path (it is probably not) | |
# - '/home/my_user/.local/bin/meltano' if you installed meltano with pipx | |
# - '/usr/local/bin/docker compose exec meltano-ui meltano' if you are using docker | |
MELTANO_EXECUTABLE = os.environ.get("CRONTAB_MELTANO_EXECUTABLE", "meltano") | |
def read_schedules_from_stdin(): | |
""" | |
Read stdin, assuming the meltano schedules are in a json format. | |
:return: | |
""" | |
sys_input = sys.stdin.read() | |
meltano_schedules = json.loads(sys_input).get("schedules", {}) | |
job_schedules = meltano_schedules.get("job", []) | |
elt_schedules = meltano_schedules.get("elt", []) | |
return job_schedules, elt_schedules | |
def get_crontab_entries(job_schedules: List[dict], elt_schedules: List[dict]): | |
entries_by_interval = { | |
"@hourly": [], | |
"@daily": [], | |
"@weekly": [], | |
"@monthly": [], | |
"@yearly": [], | |
} | |
# These crontab intervals slightly differ from the meltano ones, so that cron jobs | |
# are not executed at the same time | |
entry_intervals = { | |
"@hourly": "30 * * * *", # every hour at minute 30 | |
"@daily": "0 0 * * *", # every day at 00:00 | |
"@weekly": "0 1 * * 0", # every week on sunday at 01:00 | |
"@monthly": "0 2 1 * *", # every first day of the month at 02:00 | |
"@yearly": "0 3 1 1 *", # every january 1st at 03:00 | |
} | |
# Convert elt and job schedules into proper commands | |
for schedule in job_schedules: | |
interval = schedule["interval"] | |
job_name = schedule["job"]["name"] | |
if interval in entries_by_interval: | |
entries_by_interval[interval].append( | |
f"{MELTANO_EXECUTABLE} run {job_name}" | |
) | |
else: | |
logging.info( | |
f"Schedule for job {job_name} with interval {interval} " | |
f"was not added to the crontab entries." | |
) | |
for schedule in elt_schedules: | |
interval = schedule["interval"] | |
if interval in entries_by_interval: | |
elt_args = " ".join(schedule["elt_args"]) | |
entries_by_interval[interval].append( | |
f"{MELTANO_EXECUTABLE} elt {elt_args}" | |
) | |
else: | |
logging.info( | |
f"ELT schedule {schedule['name']} with interval {interval} " | |
f"was not added to the crontab entries." | |
) | |
# Gather commands together by interval, and run them one after the other | |
crontab_entries = {} | |
for interval, commands in entries_by_interval.items(): | |
if len(commands) > 0: | |
cron_interval = entry_intervals[interval] | |
chained_commands = " && ".join(commands) | |
crontab_entries[interval] = ( | |
f"{cron_interval} (cd {PROJECT_HOME} && {chained_commands}) " | |
f"2>&1 | /usr/bin/logger -t meltano" | |
) | |
return crontab_entries | |
def make_crontab_file(crontab_entries: dict): | |
crontab_template = """ | |
# .---------------- minute (0 - 59) | |
# | .------------- hour (0 - 23) | |
# | | .---------- day of month (1 - 31) | |
# | | | .------- month (1 - 12) OR jan,feb,mar,apr ... | |
# | | | | .---- day of week (0 - 6) (Sunday=0 or 7) OR sun,mon,tue,wed,thu,fri,sat | |
# | | | | | | |
# * * * * * [username] command to be executed | |
SHELL=/bin/bash | |
PATH=/usr/local/bin:/usr/bin:/bin | |
PROJECT_HOME={project_home} | |
# Schedules are grouped by interval, and the output (both stdout and stderr) of | |
# the processes are redirected to the logger unix utility | |
{entries} | |
""" | |
formatted_entries = "\n".join( | |
f""" | |
# {interval} | |
{entry} | |
""" | |
for interval, entry in crontab_entries.items() | |
) | |
return crontab_template.format(project_home=PROJECT_HOME, entries=formatted_entries) | |
def main(): | |
job_schedules, elt_schedules = read_schedules_from_stdin() | |
crontab_entries = get_crontab_entries(job_schedules, elt_schedules) | |
crontab_full_content = make_crontab_file(crontab_entries) | |
print(crontab_full_content) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment