Created
July 6, 2020 19:39
-
-
Save vishnu667/e37981b7855af9b2f76b3f5db1d398d9 to your computer and use it in GitHub Desktop.
Dynamically load dags from yaml files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow import DAG | |
from airflow import configuration | |
from datetime import date, timedelta, datetime | |
from deepmerge import Merger | |
from os import listdir | |
from os.path import isfile, join | |
import copy | |
import glob | |
import importlib | |
import os | |
import time | |
import yaml | |
merger = Merger([(list,["override"]),(dict,["merge"])],["override"],["override"]) | |
PROPERTY_DIR = configuration.get('core', 'property_folder') | |
defaultParams=yaml.load(open(PROPERTY_DIR+"/yaml_dags/params/default_operator_params.yml"),Loader=yaml.FullLoader) | |
def import_package(name): | |
module_name, class_name = name.rsplit(".", 1) | |
class_obj = getattr(importlib.import_module(module_name), class_name) | |
return class_obj | |
def mergeDefaultArgs(kwargs,operatorName): | |
if(operatorName in defaultParams['operators']): | |
base=copy.deepcopy(defaultParams['operators'][operatorName]) | |
merger.merge(base,kwargs) | |
return base | |
else: | |
return kwargs | |
def createTask(taskConfig,dag): | |
operatorClass_ = import_package(taskConfig['operator']) | |
kwargs = mergeDefaultArgs(taskConfig['kwargs'],taskConfig['operator']) | |
instance = operatorClass_(dag=dag,task_id=taskConfig['task_name'],*taskConfig['args'],**kwargs) | |
return instance | |
def create_task_sequence(task_sequence,tasks): | |
for task in task_sequence: | |
tasks[task[0]].set_downstream(tasks[task[1]]) | |
def dagDefaultArguments(config): | |
default_arguments = config['dag_default_args'] | |
if('start_date' in config): | |
default_arguments['start_date']=datetime.strptime(config['start_date'], '%Y,%m,%d') | |
else: | |
default_arguments['start_date']=datetime.today() | |
if('end_date' in config): | |
default_arguments['end_date']=datetime.strptime(config['end_date'], '%Y,%m,%d') | |
return default_arguments | |
def generateDag(config): | |
dag = DAG(config['dag_name'], default_args=dagDefaultArguments(config), schedule_interval=timedelta(days=1)) | |
with dag: | |
tasks={} | |
for taskConfig in config['tasks']: | |
tasks[taskConfig['task_name']]=createTask(taskConfig,dag) | |
create_task_sequence(config['task_sequence'],tasks) | |
return dag | |
property_folder=PROPERTY_DIR+"/yaml_dags/" | |
files =list(map(lambda x:property_folder+'/'+x,filter(lambda x:x.endswith(".yml"), [f for f in listdir(property_folder) if isfile(join(property_folder, f))]))) | |
for filePath in files: | |
config = yaml.load(open(filePath),Loader=yaml.FullLoader) | |
if('is_active' in config): | |
if(config['is_active']): | |
print("Making Dag "+config['dag_name']) | |
globals()[config['dag_name']]=generateDag(config) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment