1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Cloud Composer を使ってみた

Last updated at Posted at 2023-03-10

Cloud Composer を使ってみた

はじめに

業務内の検証として Cloud Composer を実装してみました。
ちなみに筆者は、ワークフローという概念はなんとなく理解しているものの、Airflow や Python は未経験の初心者です。
なので初歩的な箇所で躓いてますが暖かい目で見ていただければと思ってます!

Cloud Composer とは

Cloud Composer とは、Google Cloud が提供しているマネージド Apache Airflow サービスです。

Apache Airflow はワークフロー管理プラットフォームです。
Python でワークフローを作成でき、作ったワークフローをGUIで見ることができます。

やりたいこと

公式チュートリアルの、

外部データセットを S3 から Cloud Storage に読み込む
Cloud Storage から BigQuery に外部データセットを読み込む

を実行するワークフローを作成しました。

実装

AWS 関連の準備

ワークフローで使う Amazon S3 バケットの作成とアクセスキーの作成、ポリシーのアタッチを行いました。
基本は前述の公式チュートリアルの通りですが、ポリシーの構成は以下のように設定しました。
※3/10 指定するポリシーの内容が間違っていたため修正しました。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::<Amazon S3 のバケット名>"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetBucketVersioning"
            ],
            "Resource": "arn:aws:s3:::<Amazon S3 のバケット名>/*"
        }
    ]
}

Amazon S3 を使うのはファイル取得だけなので、読み取り処理に必要そうなアクションを指定しました。
また他のバケットにアクセスするのを防ぐために、リソースにバケット名を指定しました。

Google Cloud 側の準備

ワークフローで使う GCS バケットと BigQuery データセットを用意します。既存のものを使いまわせます。
念のため、Amazon S3 バケットと GCS バケット、BigQuery データセットのリージョンは asia-northeast1 に統一しました。

Cloud Composer の環境作成時にサービスアカウントが必要になりますが、後から変更できないこと、
指定しない場合 "default" の Compute Engine サービス アカウントが使用されるとのことだったので、この検証専用のサービスアカウントを別途用意しました。
用意したサービスアカウントに

  • Composer ワーカー
  • BigQuery ユーザー
  • BigQuery データオーナー

のロールを付与しておきます。
Composer ワーカー は Cloud Composer 環境を動かすのに、BigQuery 系ロールはワークフロー内で BigQuery の操作をするために必要です。

Cloud Composer 環境の作成

自動スケーリング不要だったので Composer1 で作成しました。

環境作成.PNG

ロケーションはバケットたちと合わせて asia-northeast1、ノード数やマシンタイプ、ディスクタイプは最小、
サービスアカウントは前述で作成したアカウントを指定しました。
Airflow のバージョンはデフォルトで入力された 2.4.3 を使用しました。

Cloud Composer 環境の設定

PyPI 依存関係のインストール

作った環境の詳細を見ると、「PYPIパッケージ」というタブがあるので、apache-airflow-providers-amazon を指定します。
どのバージョンを指定すべきかわからなかったのでバージョン欄は空欄のままにしました。

PyPI依存関係.PNG

Amazon S3 接続情報の作成

環境の詳細から「AIRFLOW UI を開く」を開き、[Admin] -> [Connections] で Amazon S3 接続情報を作成します。
チュートリアルでは接続タイプ Amazon S3 となっていますが、接続タイプの一覧になかったので Amazon Web Services で作成しました。
このあたりは Airflow のバージョンで変わるのかもしれません。

環境変数の設定

後に作成する Python ファイル内で使う環境変数を設定します。
Cloud Composer の環境の詳細でも「環境変数」タブがありましたが、こちらだと環境変数として反映されませんでした。
なので AIRFLOW UI の [Admin] -> [Variable] で指定しました。

DAG の実行

Python ファイルの作成

DAGを動かすための Python ファイルを公式チュートリアルのサンプルをもとに作成しました。
Amazon S3 から転送するファイルも公式チュートリアルの holidays.csv を使いました。

s3_to_bq.py
import datetime

from airflow import models
from airflow.models import Variable
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)
from airflow.providers.google.cloud.transfers.s3_to_gcs import S3ToGCSOperator
from airflow.utils.task_group import TaskGroup

PROJECT_NAME = "{{var.value.gcp_project}}"
REGION = "{{var.value.gce_region}}"
S3_BUCKET_NAME = Variable.get("S3_BUCKET_NAME")
GCS_BUCKET_NAME = Variable.get("GCS_BUCKET_NAME")
BQ_DATASET_NAME = Variable.get("BQ_DATASET_NAME")

from airflow import models
default_dag_args = {
    'start_date': datetime.datetime(2023, 2, 27),
}
with models.DAG(
    "s3_to_gcs_dag",
    schedule_interval=datetime.timedelta(days=1),
    default_args=default_dag_args) as dag:

    s3_to_gcs_op = S3ToGCSOperator(
        task_id="s3_to_gcs",
        bucket=S3_BUCKET_NAME,
        gcp_conn_id="google_cloud_default",
        aws_conn_id="aws_s3_connection",
        dest_gcs=f"gs://" + GCS_BUCKET_NAME + "/composer/",
    )

    load_external_dataset = GCSToBigQueryOperator(
        task_id="run_bq_external_ingestion",
        bucket=GCS_BUCKET_NAME,
        source_objects=["composer/holidays.csv"],
        destination_project_dataset_table=BQ_DATASET_NAME + ".holidays",
        source_format="CSV",
        schema_fields=[
            {"name": "Date", "type": "DATE"},
            {"name": "Holiday", "type": "STRING"},
        ],
        skip_leading_rows=1,
        write_disposition="WRITE_TRUNCATE",
    )

    s3_to_gcs_op >> load_external_dataset

環境変数を Variable.get("key名") で指定することがわからず時間かかりました。

DAG の作成

Cloud Composer 環境の詳細の「環境構成」タブに「DAG のフォルダ」という項目があるので、そこに記載されている GCS のパスに先ほど作った Python ファイルを置きます。
この GCS バケットは Cloud Composer 作成時に自動的に作成されます。
DAGフォルダ.PNG

読み込みが完了すると「DAG」タブの一覧に DAG ID が表示され、Python ファイルにエラーがあれば一覧上部にエラーが表記されます。

DAG の実行

DAG が正常に読み込まれれば自動で実行されます。
自動で実行されない場合は、DAG の名前のクリックして DAG の詳細に移り、「DAG をトリガー」をクリックすると実行されます。

自動実行分の実行状況が DAG の詳細の「実行」タブ一覧に表示されなかったのですが、AIRFLOW UI からは見れました。

使ってみた感想!

参考となるソースコードがあったからというのもありますが、Python 初めてでも簡単にワークフローを作成できました。
Operator は引数指定すればOKで分かりやすく、調べてみるといろんな Operator が用意されているのでワークフロー作成はハードルが低そうと感じました。
(例外処理とかを考えると難易度あがりそうですが)

Cloud Composer の環境作成は簡単で、スケーリングの調整などもおそらく不要なので管理は簡単そうですが、
一度環境を作ったら停止ができない(DAGの一時停止はできるが料金はかかり続ける)ことがネックなのと、Cloud Composer の画面と AIRFLOW UI との使い分けが今回の検証では不明でした。

参考

https://cloud.google.com/composer/docs/how-to/using/writing-dags?hl=ja
https://qiita.com/hankehly/items/1f02a34740276d1b8f0f

以上です。誰かの参考になれば幸いです。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?