- Fetch some json/XML file from an external API
- Store the file on S3 (as a backup) with a file name with a i.e. a timestamp. Pass the filename on to next task
- Read the json/xml from S3 and into some table structure (pandas, agate etc) and change field types etc.
- Store the table in a postgres database in a temp table
- Compare the temp table to a "main" table and see if there are changes (some SQL diff). Find out which records have to be added/removed/updated in the "main" table.
- If nothing has changed, abort everything. If it has, pass on which records are new, deleted and updated.
- a) insert new records in main table, alert newsroom on slack of new items.b) delete items in main table not in temp-table. Alert via slack. c) Update records in main table, alert via slack
- The end
Created
April 12, 2016 18:53
-
-
Save anderser/56e95ef76520df65f7b550d75154be66 to your computer and use it in GitHub Desktop.
A sample pipeline/DAG for Airflow
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For 5/6/7, give careful thought to how large the result size could get. If it's only ever going to be a few rows, it would be reasonable to stuff it in xcom and pass it on to a BranchPythonOperator which checks if its xcom payload was empty or not and.
If you could find yourself returning larger result sets you might want to store the delta in a temp table and return a "delta not empty" flag.
Or you could, as Sid said, combine them into one big PythonOperator.
One other thing is that while SlackOperator is easy/useful, it's a bit limiting in that the message has to be defined by a template and any user-supplied params must be specified at task definition time, not execution time. This makes it hard to pass in the result of an upstream task's xcom to create messages like "Job Complete: 5 new records, 3 updated, 2 deleted". You can get around this by either creating an Airflow plugin that defines a new macro for reading those values, or by replacing the SlackOperator with a PythonOperator that reads the result and makes the same API call SlackOperator would do.