Last active
July 1, 2022 20:51
-
-
Save yu-iskw/42f9f0aa6f2ff0a2a375d43881e13b49 to your computer and use it in GitHub Desktop.
Example to add a airflow connection to google cloud platform
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
airflow run add_gcp_connection add_gcp_connection_python 2001-01-01 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from airflow import DAG, settings | |
from airflow.models import Connection | |
from airflow.operators.python_operator import PythonOperator | |
from datetime import datetime | |
from common.utils import get_default_google_cloud_connection_id | |
default_args = { | |
'owner': 'airflow', | |
'email': ['[email protected]'], | |
'depends_on_past': False, | |
'start_date': datetime(2001, 01, 01), | |
'email_on_failure': False, | |
'email_on_retry': False, | |
'retries': 5, | |
'priority_weight': 1000, | |
} | |
def add_gcp_connection(ds, **kwargs): | |
""""Add a airflow connection for GCP""" | |
new_conn = Connection( | |
conn_id=get_default_google_cloud_connection_id(), | |
conn_type='google_cloud_platform', | |
) | |
scopes = [ | |
"https://www.googleapis.com/auth/pubsub", | |
"https://www.googleapis.com/auth/datastore", | |
"https://www.googleapis.com/auth/bigquery", | |
"https://www.googleapis.com/auth/devstorage.read_write", | |
"https://www.googleapis.com/auth/logging.write", | |
"https://www.googleapis.com/auth/cloud-platform", | |
] | |
conn_extra = { | |
"extra__google_cloud_platform__scope": ",".join(scopes), | |
"extra__google_cloud_platform__project": "your-gcp-project", | |
"extra__google_cloud_platform__key_path": '/var/local/google_cloud_default.json' | |
} | |
conn_extra_json = json.dumps(conn_extra) | |
new_conn.set_extra(conn_extra_json) | |
session = settings.Session() | |
if not (session.query(Connection).filter(Connection.conn_id == new_conn.conn_id).first()): | |
session.add(new_conn) | |
session.commit() | |
else: | |
msg = '\n\tA connection with `conn_id`={conn_id} already exists\n' | |
msg = msg.format(conn_id=new_conn.conn_id) | |
print(msg) | |
dag = DAG( | |
'add_gcp_connection', | |
default_args=default_args, | |
schedule_interval="@once") | |
# Task to add a connection | |
t1 = PythonOperator( | |
dag=dag, | |
task_id='add_gcp_connection_python', | |
python_callable=add_gcp_connection, | |
provide_context=True, | |
) | |
Is there any way to connect with gcp without providing - /var/local/google_cloud_default.json ?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Team,
When iam trying to excute the similar to the above script , iam facing the below error.
Attributerror:'sts' object has no attribute 'update'.
Kindly help me on this