Cloud Composer の DAG でファイルをダウンロードし、それを Google Cloud Storage (GCS) にアップロードする方法を以下に説明します。これまで説明した Secret Manager を使った認証情報の取得に加えて、GCS にファイルをアップロードする部分を追加します。
ファイルを GCS にアップロードするために、Airflow の GoogleCloudStorageHook
を使用します。このフックは、Google Cloud Storage にファイルをアップロードする際に使用されます。
以下に、先ほどの DAG に GCS アップロード機能を追加した例を示します。
from google.cloud import secretmanager
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.utils.dates import days_ago
import requests
import os
# 定数
LOCAL_FILE_PATH = '/tmp/downloaded_file.csv' # ローカルにダウンロードするファイルの保存先
GCS_BUCKET_NAME = 'your-gcs-bucket' # GCSのバケット名
GCS_OBJECT_NAME = 'your-folder-path-on-gcs/downloaded_file.csv' # GCS上でのファイルパス
# Secret Manager から認証情報を取得する関数
def access_secret_version(secret_id, version_id="latest"):
client = secretmanager.SecretManagerServiceClient()
name = f"projects/your-project-id/secrets/{secret_id}/versions/{version_id}"
response = client.access_secret_version(name=name)
return response.payload.data.decode('UTF-8')
# HTTP リクエストでファイルをダウンロードする関数
def download_file_using_secret_manager():
username = access_secret_version('http-username')
password = access_secret_version('http-password')
url = 'https://your-hostname.com/path/to/file'
response = requests.get(url, auth=(username, password))
if response.status_code == 200:
with open(LOCAL_FILE_PATH, 'wb') as f:
f.write(response.content)
else:
raise Exception(f"Failed to download file: {response.status_code}")
# ファイルを GCS にアップロードする関数
def upload_file_to_gcs():
gcs_hook = GCSHook(gcp_conn_id='google_cloud_default') # Airflow の GCP 接続ID
gcs_hook.upload(bucket_name=GCS_BUCKET_NAME, object_name=GCS_OBJECT_NAME, filename=LOCAL_FILE_PATH)
# DAG の定義
default_args = {
'owner': 'airflow',
'retries': 1,
}
with DAG(
dag_id='http_to_gcs_with_secret_manager',
default_args=default_args,
start_date=days_ago(1),
schedule_interval=None,
catchup=False,
) as dag:
# ファイルをダウンロードするタスク
download_task = PythonOperator(
task_id='download_file',
python_callable=download_file_using_secret_manager,
)
# ファイルを GCS にアップロードするタスク
upload_task = PythonOperator(
task_id='upload_to_gcs',
python_callable=upload_file_to_gcs,
)
# ダウンロード後に GCS にアップロード
download_task >> upload_task
-
download_file_using_secret_manager
:- Secret Manager から認証情報(ユーザー名とパスワード)を取得し、指定された URL からファイルをダウンロードします。このファイルは
/tmp/downloaded_file.csv
に保存されます。
- Secret Manager から認証情報(ユーザー名とパスワード)を取得し、指定された URL からファイルをダウンロードします。このファイルは
-
upload_file_to_gcs
:- Airflow の
GoogleCloudStorageHook
を使って、ローカルに保存されたファイルを指定の GCS バケットにアップロードします。gcp_conn_id='google_cloud_default'
は、Cloud Composer 環境で自動的に利用可能な GCP 接続 ID です。これを使って、GCS に対してアップロードを行います。
- Airflow の
-
DAG の構成:
- 2つのタスクがあり、最初にファイルをダウンロードし、その後 GCS にアップロードします。
download_task
の後にupload_task
が実行されるように依存関係を設定しています。
- 2つのタスクがあり、最初にファイルをダウンロードし、その後 GCS にアップロードします。
Airflow の google_cloud_default
接続が Cloud Composer 環境に自動的に作成されていますが、GCS バケットにアクセスする権限があることを確認してください。
- Cloud Composer のサービスアカウントに、少なくとも
roles/storage.objectAdmin
またはroles/storage.objectCreator
権限を付与する必要があります。
gcloud projects add-iam-policy-binding your-project-id \
--member=serviceAccount:your-composer-service-account@your-project-id.iam.gserviceaccount.com \
--role=roles/storage.objectAdmin
この手順を使用すると、Google Secret Manager で管理された認証情報を用いて HTTP 経由でファイルをダウンロードし、そのファイルを Google Cloud Storage にアップロードできます。このアプローチは、安全に認証情報を管理しながら、Cloud Composer 環境でのファイル操作を自動化するのに最適です。