Skip to content

Instantly share code, notes, and snippets.

@PavelPenkov
Last active November 14, 2019 10:29
Show Gist options
  • Save PavelPenkov/7aabdc15d795cb1fd29e1b5a455604a7 to your computer and use it in GitHub Desktop.
Save PavelPenkov/7aabdc15d795cb1fd29e1b5a455604a7 to your computer and use it in GitHub Desktop.
from string import Template
from datetime import datetime, timedelta
template = """
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonVirtualenvOperator
DEFAULT_ARGS = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': $start_date,
'email': ['$email'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('$package_name', default_args=DEFAULT_ARGS, schedule_interval="$schedule_interval")
def run(*args, **kwargs):
# YOUR CODE GOES HERE
task = PythonVirtualenvOperator(task_id='$package_name',
dag=dag,
python_callable=run,
requirements=[
# YOUR REQUIREMENTS GO HERE
]
)
"""
t = Template(template)
def or_default(s, default):
if not s:
return default
package_name = input("DAG name: ")
email = input("Author email: ")
yesterday = datetime.today() - timedelta(days=1)
start_date = f"datetime({yesterday.year}, {yesterday.month}, {yesterday.day})"
schedule_interval = or_default(input("Schedule interval (e.g. 15 0 * * *) [default: 5 0 * * *]"), '5 0 * * *')
with open(f"{package_name}.py", "w") as f:
result = t.substitute(dict(email=email, package_name=package_name, start_date=start_date, schedule_interval=schedule_interval))
f.write(result)
print(f"\nCreated new DAG in file {package_name}.py")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment