-
-
Save cr3a7ure/96dcd5dea96e9e165aaff3effd0a5ceb to your computer and use it in GitHub Desktop.
Sample airflow dag for ssh tunnel + postgres (assuming both SERVER_ssh_connector and SERVER_ssh_postresql_tunnel_connector are available)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import timedelta, datetime | |
import airflow | |
from airflow import DAG | |
from airflow.models import Variable | |
from airflow.contrib.operators.ssh_operator import SSHOperator | |
from airflow.contrib.hooks.ssh_hook import SSHHook | |
from airflow.operators.postgres_operator import PostgresOperator | |
default_args = { | |
'owner': 'airflow', | |
'depends_on_past': False, | |
'retries': 5, | |
'retry_delay': timedelta(minutes=5), | |
} | |
dag = DAG( | |
dag_id='testing_postgres_tunnel_ssh', | |
default_args=default_args, | |
start_date=datetime(2019, 12, 1), | |
end_date=datetime(2019, 12, 30), | |
schedule_interval= timedelta(minutes=10) #'@daily' | |
) | |
REMOTE_BIND_IP = Variable.get('SERVER_REMOTE_BIND_IP') | |
REMOTE_BIND_PORT = Variable.get('SERVER_REMOTE_BIND_PORT') | |
LOCAL_BIND_PORT = Variable.get('SERVER_LOCAL_BIND_PORT') | |
ssh_hook = SSHHook(ssh_conn_id='SERVER_ssh_connector', keepalive_interval=60).get_tunnel( | |
int(REMOTE_BIND_PORT), | |
remote_host=REMOTE_BIND_IP, | |
local_port=int(LOCAL_BIND_PORT) | |
).start() | |
ssh_operator = SSHOperator( | |
ssh_hook=ssh_hook, | |
task_id='open_tunnel_to_SERVER', | |
command='ls -al', | |
dag=dag | |
) | |
postgres_operator = PostgresOperator( | |
postgres_conn_id='SERVER_ssh_postresql_tunnel_connector', | |
sql="select * from users limit 100", | |
task_id='get_users_from_SERVER_postgres_table', | |
dag=dag | |
) | |
ssh_operator >> postgres_operator |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment