Skip to content

Instantly share code, notes, and snippets.

@cr3a7ure
Forked from edthix/ssh_tunnel_postgres.py
Created December 4, 2020 19:07
Show Gist options
  • Save cr3a7ure/96dcd5dea96e9e165aaff3effd0a5ceb to your computer and use it in GitHub Desktop.
Save cr3a7ure/96dcd5dea96e9e165aaff3effd0a5ceb to your computer and use it in GitHub Desktop.
Sample airflow dag for ssh tunnel + postgres (assuming both SERVER_ssh_connector and SERVER_ssh_postresql_tunnel_connector are available)
from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.models import Variable
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.operators.postgres_operator import PostgresOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 5,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='testing_postgres_tunnel_ssh',
default_args=default_args,
start_date=datetime(2019, 12, 1),
end_date=datetime(2019, 12, 30),
schedule_interval= timedelta(minutes=10) #'@daily'
)
REMOTE_BIND_IP = Variable.get('SERVER_REMOTE_BIND_IP')
REMOTE_BIND_PORT = Variable.get('SERVER_REMOTE_BIND_PORT')
LOCAL_BIND_PORT = Variable.get('SERVER_LOCAL_BIND_PORT')
ssh_hook = SSHHook(ssh_conn_id='SERVER_ssh_connector', keepalive_interval=60).get_tunnel(
int(REMOTE_BIND_PORT),
remote_host=REMOTE_BIND_IP,
local_port=int(LOCAL_BIND_PORT)
).start()
ssh_operator = SSHOperator(
ssh_hook=ssh_hook,
task_id='open_tunnel_to_SERVER',
command='ls -al',
dag=dag
)
postgres_operator = PostgresOperator(
postgres_conn_id='SERVER_ssh_postresql_tunnel_connector',
sql="select * from users limit 100",
task_id='get_users_from_SERVER_postgres_table',
dag=dag
)
ssh_operator >> postgres_operator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment