Last active
July 1, 2022 20:51
-
-
Save yu-iskw/42f9f0aa6f2ff0a2a375d43881e13b49 to your computer and use it in GitHub Desktop.
Example to add a airflow connection to google cloud platform
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
airflow run add_gcp_connection add_gcp_connection_python 2001-01-01 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from airflow import DAG, settings | |
from airflow.models import Connection | |
from airflow.operators.python_operator import PythonOperator | |
from datetime import datetime | |
from common.utils import get_default_google_cloud_connection_id | |
default_args = { | |
'owner': 'airflow', | |
'email': ['[email protected]'], | |
'depends_on_past': False, | |
'start_date': datetime(2001, 01, 01), | |
'email_on_failure': False, | |
'email_on_retry': False, | |
'retries': 5, | |
'priority_weight': 1000, | |
} | |
def add_gcp_connection(ds, **kwargs): | |
""""Add a airflow connection for GCP""" | |
new_conn = Connection( | |
conn_id=get_default_google_cloud_connection_id(), | |
conn_type='google_cloud_platform', | |
) | |
scopes = [ | |
"https://www.googleapis.com/auth/pubsub", | |
"https://www.googleapis.com/auth/datastore", | |
"https://www.googleapis.com/auth/bigquery", | |
"https://www.googleapis.com/auth/devstorage.read_write", | |
"https://www.googleapis.com/auth/logging.write", | |
"https://www.googleapis.com/auth/cloud-platform", | |
] | |
conn_extra = { | |
"extra__google_cloud_platform__scope": ",".join(scopes), | |
"extra__google_cloud_platform__project": "your-gcp-project", | |
"extra__google_cloud_platform__key_path": '/var/local/google_cloud_default.json' | |
} | |
conn_extra_json = json.dumps(conn_extra) | |
new_conn.set_extra(conn_extra_json) | |
session = settings.Session() | |
if not (session.query(Connection).filter(Connection.conn_id == new_conn.conn_id).first()): | |
session.add(new_conn) | |
session.commit() | |
else: | |
msg = '\n\tA connection with `conn_id`={conn_id} already exists\n' | |
msg = msg.format(conn_id=new_conn.conn_id) | |
print(msg) | |
dag = DAG( | |
'add_gcp_connection', | |
default_args=default_args, | |
schedule_interval="@once") | |
# Task to add a connection | |
t1 = PythonOperator( | |
dag=dag, | |
task_id='add_gcp_connection_python', | |
python_callable=add_gcp_connection, | |
provide_context=True, | |
) | |
I was using DAG from this example for some time with postgress backend DB and everything worked perfectly well. Then we switched to cloudsql database and now running add_gcp_connection DAG does not insert anything into connection table. I am pretty new to Airflow and I would appreciate any suggestion what could be the reason and where I could look for an answer.
Hi Team,
When iam trying to excute the similar to the above script , iam facing the below error.
Attributerror:'sts' object has no attribute 'update'.
Kindly help me on this
Is there any way to connect with gcp without providing - /var/local/google_cloud_default.json ?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@yu-iskw
Many thanks for this example.
@Salad-King
You don't have to use get_default_google_cloud_connection_id() function, just replace it with a string like "get_default_google_cloud_connection_id".
It will be the value of the google_cloud_conn_id in airflow.