Follow the installation instructions on the Airflow website.
To configure Airflow to use Postgres rather than the default Sqlite3, go to airflow.cfg
and update this configuration to LocalExecutor
:
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = LocalExecutor
The LocalExecutor can parallelize task instances locally.
Also update the SequelAlchemy string to point to a database you are about to create.
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
sql_alchemy_conn = postgresql+psycopg2://localhost/airflow
Next open a PostgreSQL shell.
psql
And create a new postgres database.
CREATE DATABASE airflow
Your now ready to initialize the DB in Airflow. In bash run:
airflow initdb
In the console run:
mkdir airflow/dags
The first connection for my API call:
- A connection type of
HTTP
. - A connection identifier of
moves_profile
. - A host string of the full API endpoint:
https://moves....
The second connection for my project database:
- A connection type of
Postgres
. - A connection identifier of
users
(name of the table). - A host string of
127.0.0.1
. - A schema string (database name) of
kojak
. - A login of
postgres
(default).
In the console run:
touch ~/airflow/dags/moves_profile.py
Then add your DAG configs.
"""
DAG pulls a user's profile information from the Moves API.
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.hooks import HttpHook, PostgresHook
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
import json
def get_profile(ds, **kwargs):
pg_hook = PostgresHook(postgres_conn_id='users')
api_hook = HttpHook(http_conn_id='moves_profile', method='GET')
# Get profile info from Moves API
resp = api_hook.run('')
profile = json.loads(resp.content.decode('utf-8'))
moves_user_id = profile['userId']
moves_first_date = profile['profile']['firstDate']
timezone = profile['profile']['currentTimeZone']['id']
# Insert profile values into Postgres DB
user_insert = """INSERT INTO users (moves_user_id, moves_start_date, timezone)
VALUES (%s, %s, %s);"""
pg_hook.run(user_insert, parameters=(moves_user_id, moves_first_date, timezone))
default_args = {
'owner': 'rosiehoyem',
'depends_on_past': False,
'start_date': datetime(2017, 3, 21),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('moves_profile', default_args=default_args, schedule_interval=timedelta(1))
get_profile_task = \
PythonOperator(task_id='get_profile',
provide_context=True,
python_callable=get_profile,
dag=dag)
Instructions can be found here and here.
Be sure you've set-up Port 8080:
- Custom TCP Rule
- Port Range: 80 (for web REST)
- Source: Anywhere).
Instructions to do this can be found here.
Instructions for this instance can be found on the image Github page.
docker pull puckel/docker-airflow
docker run -d -p 8080:8080 puckel/docker-airflow
ssh -i ~/.ssh/aws_key_file.pem -NL 12345:localhost:8080 [email protected]
FYI, your link to install docker on an EC2 instance goes to a private page :)