Skip to content

Instantly share code, notes, and snippets.

@sairamkrish
Last active May 19, 2020 06:58
Show Gist options
  • Save sairamkrish/fbc6b3019dd24535d92a6c9b791e3a75 to your computer and use it in GitHub Desktop.
Save sairamkrish/fbc6b3019dd24535d92a6c9b791e3a75 to your computer and use it in GitHub Desktop.
from operators.template_operator import TemplateOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.dates import days_ago
import os
from pathlib import Path
class RestToTemplateWrapperOperator(TemplateOperator):
@apply_defaults
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.script_dir = os.path.dirname(__file__)
self.dynamic_dag_dir = os.environ.get(
'DYNAMIC_DAG_DIR', '/opt/airflow/dags/dynamic_dags') # TODO this should go to config
def execute(self, context):
conf = context["dag_run"].conf
namespace = conf['namespace']
dag_id = conf["dag_id"]
template = conf['template']
Path(os.path.join(self.dynamic_dag_dir, namespace)).mkdir(parents=True, exist_ok=True)
self.template_file_path = os.path.join(
self.script_dir, '..', 'templates', template) # TODO should go into config
self.destination_file_path = os.path.join(self.dynamic_dag_dir, namespace, '{}.py'.format(dag_id))
self.search_and_replace = {
'#SCHEDULE_INTERVAL': conf["schedule_interval"],
'#DAG_ID': dag_id,
'#TASK_ID_1': 'run-docker'
}
TemplateOperator.execute(self)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment