Skip to content

Instantly share code, notes, and snippets.

@masahitojp
Created September 10, 2024 21:58
Show Gist options
  • Save masahitojp/80d69da64e94bea51b9a18eb606b64f2 to your computer and use it in GitHub Desktop.
Save masahitojp/80d69da64e94bea51b9a18eb606b64f2 to your computer and use it in GitHub Desktop.
cloudcomposerてファイルダウンロード

Cloud Composer の DAG でファイルをダウンロードし、それを Google Cloud Storage (GCS) にアップロードする方法を以下に説明します。これまで説明した Secret Manager を使った認証情報の取得に加えて、GCS にファイルをアップロードする部分を追加します。

1. GCS にファイルをアップロードする関数を追加

ファイルを GCS にアップロードするために、Airflow の GoogleCloudStorageHook を使用します。このフックは、Google Cloud Storage にファイルをアップロードする際に使用されます。

以下に、先ほどの DAG に GCS アップロード機能を追加した例を示します。

2. DAG の実装例

from google.cloud import secretmanager
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.utils.dates import days_ago
import requests
import os

# 定数
LOCAL_FILE_PATH = '/tmp/downloaded_file.csv'  # ローカルにダウンロードするファイルの保存先
GCS_BUCKET_NAME = 'your-gcs-bucket'  # GCSのバケット名
GCS_OBJECT_NAME = 'your-folder-path-on-gcs/downloaded_file.csv'  # GCS上でのファイルパス

# Secret Manager から認証情報を取得する関数
def access_secret_version(secret_id, version_id="latest"):
    client = secretmanager.SecretManagerServiceClient()
    name = f"projects/your-project-id/secrets/{secret_id}/versions/{version_id}"
    response = client.access_secret_version(name=name)
    return response.payload.data.decode('UTF-8')

# HTTP リクエストでファイルをダウンロードする関数
def download_file_using_secret_manager():
    username = access_secret_version('http-username')
    password = access_secret_version('http-password')

    url = 'https://your-hostname.com/path/to/file'
    response = requests.get(url, auth=(username, password))

    if response.status_code == 200:
        with open(LOCAL_FILE_PATH, 'wb') as f:
            f.write(response.content)
    else:
        raise Exception(f"Failed to download file: {response.status_code}")

# ファイルを GCS にアップロードする関数
def upload_file_to_gcs():
    gcs_hook = GCSHook(gcp_conn_id='google_cloud_default')  # Airflow の GCP 接続ID
    gcs_hook.upload(bucket_name=GCS_BUCKET_NAME, object_name=GCS_OBJECT_NAME, filename=LOCAL_FILE_PATH)

# DAG の定義
default_args = {
    'owner': 'airflow',
    'retries': 1,
}

with DAG(
    dag_id='http_to_gcs_with_secret_manager',
    default_args=default_args,
    start_date=days_ago(1),
    schedule_interval=None,
    catchup=False,
) as dag:

    # ファイルをダウンロードするタスク
    download_task = PythonOperator(
        task_id='download_file',
        python_callable=download_file_using_secret_manager,
    )

    # ファイルを GCS にアップロードするタスク
    upload_task = PythonOperator(
        task_id='upload_to_gcs',
        python_callable=upload_file_to_gcs,
    )

    # ダウンロード後に GCS にアップロード
    download_task >> upload_task

3. 説明

  1. download_file_using_secret_manager:

    • Secret Manager から認証情報(ユーザー名とパスワード)を取得し、指定された URL からファイルをダウンロードします。このファイルは /tmp/downloaded_file.csv に保存されます。
  2. upload_file_to_gcs:

    • Airflow の GoogleCloudStorageHook を使って、ローカルに保存されたファイルを指定の GCS バケットにアップロードします。gcp_conn_id='google_cloud_default' は、Cloud Composer 環境で自動的に利用可能な GCP 接続 ID です。これを使って、GCS に対してアップロードを行います。
  3. DAG の構成:

    • 2つのタスクがあり、最初にファイルをダウンロードし、その後 GCS にアップロードします。download_task の後に upload_task が実行されるように依存関係を設定しています。

4. GCS にアクセスするための設定

Airflow の google_cloud_default 接続が Cloud Composer 環境に自動的に作成されていますが、GCS バケットにアクセスする権限があることを確認してください。

  • Cloud Composer のサービスアカウントに、少なくとも roles/storage.objectAdmin または roles/storage.objectCreator 権限を付与する必要があります。
gcloud projects add-iam-policy-binding your-project-id \
    --member=serviceAccount:your-composer-service-account@your-project-id.iam.gserviceaccount.com \
    --role=roles/storage.objectAdmin

まとめ

この手順を使用すると、Google Secret Manager で管理された認証情報を用いて HTTP 経由でファイルをダウンロードし、そのファイルを Google Cloud Storage にアップロードできます。このアプローチは、安全に認証情報を管理しながら、Cloud Composer 環境でのファイル操作を自動化するのに最適です。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment