Skip to content

Instantly share code, notes, and snippets.

@yu-iskw
Last active July 1, 2022 20:51
Show Gist options
  • Save yu-iskw/42f9f0aa6f2ff0a2a375d43881e13b49 to your computer and use it in GitHub Desktop.
Save yu-iskw/42f9f0aa6f2ff0a2a375d43881e13b49 to your computer and use it in GitHub Desktop.
Example to add a airflow connection to google cloud platform
#!/bin/bash
airflow run add_gcp_connection add_gcp_connection_python 2001-01-01
import json
from airflow import DAG, settings
from airflow.models import Connection
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from common.utils import get_default_google_cloud_connection_id
default_args = {
'owner': 'airflow',
'email': ['[email protected]'],
'depends_on_past': False,
'start_date': datetime(2001, 01, 01),
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'priority_weight': 1000,
}
def add_gcp_connection(ds, **kwargs):
""""Add a airflow connection for GCP"""
new_conn = Connection(
conn_id=get_default_google_cloud_connection_id(),
conn_type='google_cloud_platform',
)
scopes = [
"https://www.googleapis.com/auth/pubsub",
"https://www.googleapis.com/auth/datastore",
"https://www.googleapis.com/auth/bigquery",
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/logging.write",
"https://www.googleapis.com/auth/cloud-platform",
]
conn_extra = {
"extra__google_cloud_platform__scope": ",".join(scopes),
"extra__google_cloud_platform__project": "your-gcp-project",
"extra__google_cloud_platform__key_path": '/var/local/google_cloud_default.json'
}
conn_extra_json = json.dumps(conn_extra)
new_conn.set_extra(conn_extra_json)
session = settings.Session()
if not (session.query(Connection).filter(Connection.conn_id == new_conn.conn_id).first()):
session.add(new_conn)
session.commit()
else:
msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
msg = msg.format(conn_id=new_conn.conn_id)
print(msg)
dag = DAG(
'add_gcp_connection',
default_args=default_args,
schedule_interval="@once")
# Task to add a connection
t1 = PythonOperator(
dag=dag,
task_id='add_gcp_connection_python',
python_callable=add_gcp_connection,
provide_context=True,
)
@PierreC1024
Copy link

PierreC1024 commented Nov 23, 2017

@yu-iskw
Many thanks for this example.

@Salad-King
You don't have to use get_default_google_cloud_connection_id() function, just replace it with a string like "get_default_google_cloud_connection_id".
It will be the value of the google_cloud_conn_id in airflow.

@khavronin
Copy link

khavronin commented Dec 13, 2017

I was using DAG from this example for some time with postgress backend DB and everything worked perfectly well. Then we switched to cloudsql database and now running add_gcp_connection DAG does not insert anything into connection table. I am pretty new to Airflow and I would appreciate any suggestion what could be the reason and where I could look for an answer.

@saietl
Copy link

saietl commented Jul 21, 2018

Hi Team,
When iam trying to excute the similar to the above script , iam facing the below error.
Attributerror:'sts' object has no attribute 'update'.

Kindly help me on this

@lochanm2
Copy link

Is there any way to connect with gcp without providing - /var/local/google_cloud_default.json ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment