Last active
July 7, 2016 23:02
-
-
Save r39132/f0bec354e6147fe2d46033498b474ea1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import yaml | |
import yaml | |
from airflow import configuration as conf | |
from airflow import DAG | |
from airflow.operators import BashOperator, PostgresOperator | |
from datetime import datetime | |
from pprint import pprint | |
# build DAG | |
default_args = { | |
'owner': 'jrideout', | |
'pool': 'ep_generate_spoofs', | |
'depends_on_past': False, | |
'start_date': datetime.now(), | |
'email': ['[email protected]'], | |
'email_on_failure': True, | |
'email_on_retry': True | |
} | |
dag = DAG('generate_spoofs_v3', default_args=default_args, schedule_interval='@weekly') | |
def get_msg_ids_for_spoofs(): | |
""" | |
Get a list of message ids for the spoof messages we insert. Get these | |
by extracting them from the spoof definition yaml file | |
""" | |
dag_folder = conf.get('core', 'dags_folder') | |
with open(os.path.join(dag_folder, 'generate_spoofs', 'agari_spoofs.yaml')) as f: | |
datamap = yaml.load(f) | |
#pprint(datamap['messages']) | |
msg_ids = list() | |
msgs = datamap['messages'] | |
for message_info in msgs: | |
msg_ids.append(message_info["msg_id"]) | |
msg_ids_as_strs = ["'{}'".format(i) for i in msg_ids] | |
pprint(msg_ids_as_strs) | |
return ",".join(msg_ids_as_strs) | |
# Inject Spoofs Task | |
clean_up_spoofs_job = PostgresOperator( | |
task_id='clean_up_spoofs', | |
sql="""delete from message where organization_id=1 and msg_id in ({{ params.msg_ids }}) and ts > now() - '1 weeks'::interval;""", | |
postgres_conn_id="cousteau_db_stage", | |
params={"msg_ids":get_msg_ids_for_spoofs()}, | |
dag=dag) | |
# Inject New Spoofs Task | |
inject_new_spoofs_command = """python {{ params.dags_folder }}/generate_spoofs/generate_spoofed_message.py""" | |
build_sender_models_spark_job = BashOperator( | |
task_id='inject_new_spoofs', | |
bash_command=inject_new_spoofs_command, | |
params={"dags_folder":conf.get('core', 'dags_folder')}, | |
dag=dag) | |
build_sender_models_spark_job.set_upstream(clean_up_spoofs_job) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment