Skip to content

Instantly share code, notes, and snippets.

@laughingman7743
Last active April 29, 2019 10:49
Show Gist options
  • Save laughingman7743/418fb255362c9b0179e25773b9469254 to your computer and use it in GitHub Desktop.
Save laughingman7743/418fb255362c9b0179e25773b9469254 to your computer and use it in GitHub Desktop.
Triggering DAGs with Python script

Triggering DAGs with Python script

Usage

Usage: trigger_dag.py [OPTIONS]

Options:
  -p, --project-id TEXT    [required]
  -e, --env-name TEXT      [required]
  -w, --webserver-id TEXT  [required]
  -d, --dag-name TEXT      [required]
  -c, --conf TEXT          [required]
  --help                   Show this message and

Example

$ export GOOGLE_APPLICATION_CREDENTIALS=/path/to/credentials.json
$ pipenv run ./trigger_dag.py \
  --project-id YOUR_PROJECT_ID \
  --env-name YOUR_COMPOSER_ENVIRONMENT_NAME \
  --webserver-id YOUR_COMPOSER_WEBSERVER_ID \
  --dag-name composer_sample_trigger_response_dag \
  --conf '{ "foo": "bar"}'
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
google-api-python-client = "*"
requests = "*"
requests-toolbelt = "*"
click = "*"
[requires]
python_version = "3.6"
{
"_meta": {
"hash": {
"sha256": "659c868820c51ab8af5ad4858a6fee34c7f5e9cdada68288aa96c76b56e53708"
},
"pipfile-spec": 6,
"requires": {
"python_version": "3.6"
},
"sources": [
{
"name": "pypi",
"url": "https://pypi.org/simple",
"verify_ssl": true
}
]
},
"default": {
"cachetools": {
"hashes": [
"sha256:219b7dc6024195b6f2bc3d3f884d1fef458745cd323b04165378622dcc823852",
"sha256:9efcc9fab3b49ab833475702b55edd5ae07af1af7a4c627678980b45e459c460"
],
"version": "==3.1.0"
},
"certifi": {
"hashes": [
"sha256:47f9c83ef4c0c621eaef743f133f09fa8a74a9b75f037e8624f83bd1b6626cb7",
"sha256:993f830721089fef441cdfeb4b2c8c9df86f0c63239f06bd025a76a7daddb033"
],
"version": "==2018.11.29"
},
"chardet": {
"hashes": [
"sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae",
"sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691"
],
"version": "==3.0.4"
},
"click": {
"hashes": [
"sha256:2335065e6395b9e67ca716de5f7526736bfa6ceead690adf616d925bdc622b13",
"sha256:5b94b49521f6456670fdb30cd82a4eca9412788a93fa6dd6df72c94d5a8ff2d7"
],
"index": "pypi",
"version": "==7.0"
},
"google-api-python-client": {
"hashes": [
"sha256:06907006ed5ce831018f03af3852d739c0b2489cdacfda6971bcc2075c762858",
"sha256:937eabdc3940977f712fa648a096a5142766b6d0a0f58bc603e2ac0687397ef0"
],
"index": "pypi",
"version": "==1.7.8"
},
"google-auth": {
"hashes": [
"sha256:0f7c6a64927d34c1a474da92cfc59e552a5d3b940d3266606c6a28b72888b9e4",
"sha256:20705f6803fd2c4d1cc2dcb0df09d4dfcb9a7d51fd59e94a3a28231fd93119ed"
],
"version": "==1.6.3"
},
"google-auth-httplib2": {
"hashes": [
"sha256:098fade613c25b4527b2c08fa42d11f3c2037dda8995d86de0745228e965d445",
"sha256:f1c437842155680cf9918df9bc51c1182fda41feef88c34004bd1978c8157e08"
],
"version": "==0.0.3"
},
"httplib2": {
"hashes": [
"sha256:4ba6b8fd77d0038769bf3c33c9a96a6f752bc4cdf739701fdcaf210121f399d4"
],
"version": "==0.12.1"
},
"idna": {
"hashes": [
"sha256:c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407",
"sha256:ea8b7f6188e6fa117537c3df7da9fc686d485087abf6ac197f9c46432f7e4a3c"
],
"version": "==2.8"
},
"pyasn1": {
"hashes": [
"sha256:da2420fe13a9452d8ae97a0e478adde1dee153b11ba832a95b223a2ba01c10f7",
"sha256:da6b43a8c9ae93bc80e2739efb38cc776ba74a886e3e9318d65fe81a8b8a2c6e"
],
"version": "==0.4.5"
},
"pyasn1-modules": {
"hashes": [
"sha256:79580acf813e3b7d6e69783884e6e83ac94bf4617b36a135b85c599d8a818a7b",
"sha256:a52090e8c5841ebbf08ae455146792d9ef3e8445b21055d3a3b7ed9c712b7c7c"
],
"version": "==0.2.4"
},
"requests": {
"hashes": [
"sha256:502a824f31acdacb3a35b6690b5fbf0bc41d63a24a45c4004352b0242707598e",
"sha256:7bf2a778576d825600030a110f3c0e3e8edc51dfaafe1c146e39a2027784957b"
],
"index": "pypi",
"version": "==2.21.0"
},
"requests-toolbelt": {
"hashes": [
"sha256:380606e1d10dc85c3bd47bf5a6095f815ec007be7a8b69c878507068df059e6f",
"sha256:968089d4584ad4ad7c171454f0a5c6dac23971e9472521ea3b6d49d610aa6fc0"
],
"index": "pypi",
"version": "==0.9.1"
},
"rsa": {
"hashes": [
"sha256:14ba45700ff1ec9eeb206a2ce76b32814958a98e372006c8fb76ba820211be66",
"sha256:1a836406405730121ae9823e19c6e806c62bbad73f890574fff50efa4122c487"
],
"version": "==4.0"
},
"six": {
"hashes": [
"sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c",
"sha256:d16a0141ec1a18405cd4ce8b4613101da75da0e9a7aec5bdd4fa804d0e0eba73"
],
"version": "==1.12.0"
},
"uritemplate": {
"hashes": [
"sha256:01c69f4fe8ed503b2951bef85d996a9d22434d2431584b5b107b2981ff416fbd",
"sha256:1b9c467a940ce9fb9f50df819e8ddd14696f89b9a8cc87ac77952ba416e0a8fd",
"sha256:c02643cebe23fc8adb5e6becffe201185bf06c40bda5c0b4028a93f1527d011d"
],
"version": "==3.0.0"
},
"urllib3": {
"hashes": [
"sha256:61bf29cada3fc2fbefad4fdf059ea4bd1b4a86d2b6d15e1c7c0b582b9752fe39",
"sha256:de9529817c93f27c8ccbfead6985011db27bd0ddfcdb2d86f3f663385c6a9c22"
],
"version": "==1.24.1"
}
},
"develop": {}
}
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Reference implementation:
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/iap/make_iap_request.py
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/composer/rest/get_client_id.py
import json
import urllib
import click
import google.auth
import google.auth.app_engine
import google.auth.compute_engine.credentials
import google.auth.iam
import google.oauth2.credentials
import google.oauth2.service_account
import requests
import requests_toolbelt.adapters.appengine
from google.auth.transport.requests import Request
IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
GCP_SCOPE = 'https://www.googleapis.com/auth/cloud-platform'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
AIRFLOW_ENVIRONMENT_URI = 'https://composer.googleapis.com/v1beta1/' \
'projects/{project}/locations/{location}/environments/{name}'
AIRFLOW_DAG_URI = 'https://{webserver_id}.appspot.com/api/experimental/dags/{dag_name}/dag_runs'
def make_iap_request(url, client_id, method='GET', **kwargs):
"""Makes a request to an application protected by Identity-Aware Proxy.
Args:
url: The Identity-Aware Proxy-protected URL to fetch.
client_id: The client ID used by Identity-Aware Proxy.
method: The request method to use
('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
**kwargs: Any of the parameters defined for the request function:
https://github.com/requests/requests/blob/master/requests/api.py
If no timeout is provided, it is set to 90 by default.
Returns:
The page body, or raises an exception if the page couldn't be retrieved.
"""
# Set the default timeout, if missing
if 'timeout' not in kwargs:
kwargs['timeout'] = 90
# Figure out what environment we're running in and get some preliminary
# information about the service account.
bootstrap_credentials, _ = google.auth.default(
scopes=[IAM_SCOPE])
if isinstance(bootstrap_credentials,
google.oauth2.credentials.Credentials):
raise Exception('make_iap_request is only supported for service '
'accounts.')
elif isinstance(bootstrap_credentials,
google.auth.app_engine.Credentials):
requests_toolbelt.adapters.appengine.monkeypatch()
# For service account's using the Compute Engine metadata service,
# service_account_email isn't available until refresh is called.
bootstrap_credentials.refresh(Request())
# For service account's using the Compute Engine metadata service,
# service_account_email isn't available until refresh is called.
bootstrap_credentials.refresh(Request())
signer_email = bootstrap_credentials.service_account_email
if isinstance(bootstrap_credentials,
google.auth.compute_engine.credentials.Credentials):
# Since the Compute Engine metadata service doesn't expose the service
# account key, we use the IAM signBlob API to sign instead.
# In order for this to work:
#
# 1. Your VM needs the https://www.googleapis.com/auth/iam scope.
# You can specify this specific scope when creating a VM
# through the API or gcloud. When using Cloud Console,
# you'll need to specify the "full access to all Cloud APIs"
# scope. A VM's scopes can only be specified at creation time.
#
# 2. The VM's default service account needs the "Service Account Actor"
# role. This can be found under the "Project" category in Cloud
# Console, or roles/iam.serviceAccountActor in gcloud.
signer = google.auth.iam.Signer(
Request(), bootstrap_credentials, signer_email)
else:
# A Signer object can sign a JWT using the service account's key.
signer = bootstrap_credentials.signer
# Construct OAuth 2.0 service account credentials using the signer
# and email acquired from the bootstrap credentials.
service_account_credentials = google.oauth2.service_account.Credentials(
signer, signer_email, token_uri=OAUTH_TOKEN_URI, additional_claims={
'target_audience': client_id
})
# service_account_credentials gives us a JWT signed by the service
# account. Next, we use that to obtain an OpenID Connect token,
# which is a JWT signed by Google.
google_open_id_connect_token = get_google_open_id_connect_token(
service_account_credentials)
# Fetch the Identity-Aware Proxy-protected URL, including an
# Authorization header containing "Bearer " followed by a
# Google-issued OpenID Connect token for the service account.
resp = requests.request(
method, url,
headers={'Authorization': 'Bearer {}'.format(
google_open_id_connect_token)}, **kwargs)
if resp.status_code == 403:
raise Exception('Service account {} does not have permission to '
'access the IAP-protected application.'.format(signer_email))
elif resp.status_code != 200:
raise Exception(
'Bad response from application: {!r} / {!r} / {!r}'.format(
resp.status_code, resp.headers, resp.text))
else:
return resp.text
def get_google_open_id_connect_token(service_account_credentials):
"""Get an OpenID Connect token issued by Google for the service account.
This function:
1. Generates a JWT signed with the service account's private key
containing a special "target_audience" claim.
2. Sends it to the OAUTH_TOKEN_URI endpoint. Because the JWT in #1
has a target_audience claim, that endpoint will respond with
an OpenID Connect token for the service account -- in other words,
a JWT signed by *Google*. The aud claim in this JWT will be
set to the value from the target_audience claim in #1.
For more information, see
https://developers.google.com/identity/protocols/OAuth2ServiceAccount .
The HTTP/REST example on that page describes the JWT structure and
demonstrates how to call the token endpoint. (The example on that page
shows how to get an OAuth2 access token; this code is using a
modified version of it to get an OpenID Connect token.)
"""
service_account_jwt = (
service_account_credentials._make_authorization_grant_assertion())
request = google.auth.transport.requests.Request()
body = {
'assertion': service_account_jwt,
'grant_type': google.oauth2._client._JWT_GRANT_TYPE,
}
token_response = google.oauth2._client._token_endpoint_request(
request, OAUTH_TOKEN_URI, body)
return token_response['id_token']
def get_client_id(project_id, composer_env_name, location='us-central1'):
# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(scopes=[GCP_SCOPE])
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)
environment_url = ('https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
'/environments/{}').format(project_id, location, composer_env_name)
response = authed_session.request('GET', environment_url)
environment_data = response.json()
airflow_uri = environment_data['config']['airflowUri']
# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers['location']
# Extract the client_id query parameter from the redirect.
parsed = urllib.parse.urlparse(redirect_location)
query_string = urllib.parse.parse_qs(parsed.query)
return query_string['client_id'][0]
@click.command()
@click.option('-p', '--project-id', type=str, required=True)
@click.option('-e', '--env-name', type=str, required=True)
@click.option('-w', '--webserver-id', type=str, required=True)
@click.option('-d', '--dag-name', type=str, required=True)
@click.option('-c', '--conf', type=str, required=True)
def main(project_id, env_name, webserver_id, dag_name, conf):
client_id = get_client_id(project_id, env_name)
airflow_url = AIRFLOW_DAG_URI.format(
webserver_id=webserver_id,
dag_name=dag_name,
)
resps = make_iap_request(
url=airflow_url,
client_id=client_id,
method='POST',
data=json.dumps({
'conf': json.dumps(json.loads(conf))
})
)
click.echo(resps)
if __name__ == '__main__':
main()
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Reference implementation:
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/composer/workflows/trigger_response_dag.py
"""DAG running in response to a Cloud Storage bucket change."""
import datetime
import airflow
from airflow.operators import bash_operator
default_args = {
'owner': 'Composer Example',
'depends_on_past': False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017, 1, 1),
}
with airflow.DAG(
'composer_sample_trigger_response_dag',
default_args=default_args,
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Print the dag_run's configuration, which includes information about the
# Cloud Storage object change.
print_gcs_info = bash_operator.BashOperator(
task_id='print_gcs_info', bash_command="echo {{ dag_run.conf['foo'] }}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment