Skip to content

Instantly share code, notes, and snippets.

@r39132
Last active July 7, 2016 23:02
Show Gist options
  • Save r39132/f0bec354e6147fe2d46033498b474ea1 to your computer and use it in GitHub Desktop.
Save r39132/f0bec354e6147fe2d46033498b474ea1 to your computer and use it in GitHub Desktop.
import os
import yaml
import yaml
from airflow import configuration as conf
from airflow import DAG
from airflow.operators import BashOperator, PostgresOperator
from datetime import datetime
from pprint import pprint
# build DAG
default_args = {
'owner': 'jrideout',
'pool': 'ep_generate_spoofs',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True
}
dag = DAG('generate_spoofs_v3', default_args=default_args, schedule_interval='@weekly')
def get_msg_ids_for_spoofs():
"""
Get a list of message ids for the spoof messages we insert. Get these
by extracting them from the spoof definition yaml file
"""
dag_folder = conf.get('core', 'dags_folder')
with open(os.path.join(dag_folder, 'generate_spoofs', 'agari_spoofs.yaml')) as f:
datamap = yaml.load(f)
#pprint(datamap['messages'])
msg_ids = list()
msgs = datamap['messages']
for message_info in msgs:
msg_ids.append(message_info["msg_id"])
msg_ids_as_strs = ["'{}'".format(i) for i in msg_ids]
pprint(msg_ids_as_strs)
return ",".join(msg_ids_as_strs)
# Inject Spoofs Task
clean_up_spoofs_job = PostgresOperator(
task_id='clean_up_spoofs',
sql="""delete from message where organization_id=1 and msg_id in ({{ params.msg_ids }}) and ts > now() - '1 weeks'::interval;""",
postgres_conn_id="cousteau_db_stage",
params={"msg_ids":get_msg_ids_for_spoofs()},
dag=dag)
# Inject New Spoofs Task
inject_new_spoofs_command = """python {{ params.dags_folder }}/generate_spoofs/generate_spoofed_message.py"""
build_sender_models_spark_job = BashOperator(
task_id='inject_new_spoofs',
bash_command=inject_new_spoofs_command,
params={"dags_folder":conf.get('core', 'dags_folder')},
dag=dag)
build_sender_models_spark_job.set_upstream(clean_up_spoofs_job)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment