Skip to content

Instantly share code, notes, and snippets.

@zacgx
Forked from tmarthal/task-dag-creation.py
Created December 5, 2017 00:34
Show Gist options
  • Save zacgx/b89ec51b68fd0c76d50d5d956324043b to your computer and use it in GitHub Desktop.
Save zacgx/b89ec51b68fd0c76d50d5d956324043b to your computer and use it in GitHub Desktop.
DAG Creation from within a PythonOperator task DOES NOT WORK
# -*- coding: utf-8 -*-
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.sensors import SqlSensor
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable, DAG
from datetime import datetime, timedelta
default_args = {
'owner': '@tmarthal',
'start_date': datetime(2017, 2, 1),
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
##
## The DAG for the application audience job to run
##
dag = DAG('sensor_dag_creation_inoperator',
default_args=default_args,
schedule_interval = '*/5 * * * *' # every five minutes
)
dag.doc = """
Simple http call which triggers when a row shows up in a database
"""
def response_check(response):
"""
Dumps the http response and returns True when the http call status is 200/success
"""
print("checking the reponse from the app")
print(response.content)
return response.status_code == 200
def process_new_accounts(ds, **kwargs):
"""
The sensor has detected new ids to process, so we call the http operator for each
"""
select_sql = "SELECT id from audiences where created_at > '{ds}'".format(ds=ds)
print("running select sql {}".format(select_sql))
pg_hook = PostgresHook(postgres_conn_id='letterpress-app')
connection = pg_hook.get_conn()
cursor = connection.cursor()
cursor.execute(select_sql)
account_ids = cursor.fetchall()
for account_id in account_ids:
# Create a sub-dag with each new id
# the child dag name
export_account_task_name = 'task_process_account_%s' % account_id
print("starting task: {}".format(export_account_task_name))
export_account_dag = DAG(
dag_id=export_account_task_name,
default_args=default_args,
schedule_interval='*/5 * * * *' # '@once'
)
## This hits the account export url, _endpoint/account/export?id={ACCOUNT_ID}&token={AUTH_TOKEN}
account_export_endpoint_task = SimpleHttpOperator(
task_id='account_export_endpoint_task_%s' % (account_id),
http_conn_id='application',
method='GET',
endpoint='_endpoint/account/export',
data={"id": "{}".format(account_id), "token": Variable.get("APPLICATION_ACCESS_TOKEN")}, # http params
response_check=response_check, # will retry based on default_args if it fails
dag=export_account_dag)
print("Created account processing DAG {}".format(export_account_dag.dag_id))
# register the dynamically created DAG in the global namespace?
globals()[export_account_task_name] = export_account_dag
return account_ids
sensor = SqlSensor(
task_id='account_creation_check',
conn_id='account-database',
poke_interval=600, #do the select every 600 seconds, 5 minutes
sql="SELECT id from accounts where created_at > '{{ds}}' LIMIT 1",
dag=dag
)
process_new_accounts_task = PythonOperator(task_id='process_new_accounts',
provide_context=True,
python_callable=process_new_accounts,
dag=dag)
sensor >> process_new_accounts_task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment