2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

GCP Composerより簡単・安いかもしれない、Astro Airflow導入メモ

Last updated at Posted at 2025-06-16

移行の目的

別プロジェクトでGCPのComposer(Airflow)を利用しているが、以下のような課題を感じていたため、新規プロジェクトはAstroの利用を検討。

現状の課題

  • ローカルで簡単に Airflow 環境を構築したい
  • 利用していない時間帯でも Composer は最低数万円の費用が発生する(ゼロデプロイ不可)
  • Dataflow など GCP サービスとの連携を Composer 経由だけでなく、ローカルからも柔軟に実行したい

Astro とは?

Astro は Astronomer が提供する Airflow のマネージドサービス。以下のような特徴がある:

  • Astro CLI によって ローカル環境構築が非常に簡単
  • Development モードでは、使っていないときに Scheduler や Web UI を停止でき、コスト削減(ゼロデプロイ)が可能
  • ローカル環境でも Cloud Dataflow などの GCP サービスと連携可能

アカウント登録・ログイン

以下からアカウント作成・ログインを行う:

Deployment 作成手順

Airflow 環境 = Astro における「Deployment」に相当。

作成手順

  1. Astro UI サイドバー → Deployments+Deployment をクリック
  2. 各設定を以下のように入力:
    • Basic
      • NAME / DESCRIPTION: 任意
      • MODE: Hosted Execution
    • Cluster
      • Standard Cluster
      • PROVIDER: GCP(Dataflow 利用のため GCP 推奨)
      • REGION: us-central1(推奨)
    • Execution
      • Astro Runtime / Airflow Version: 13.0.0 (Airflow v2.11.0 ベース)
        • バージョンにより利用できる Provider パッケージが異なる
        • apache-airflow-providers-google は Airflow v3 非対応のため、v2 系を使用
        • 参考: Provider Compatibility List
      • EXECUTOR: Celery Executor
      • Worker Queues / KubernetesPodOperator Pods: デフォルトでOK(後から変更可)
    • Advanced
      • DEVELOPMENT MODE: ON
      • Web UI・Scheduler を停止可能(ゼロデプロイ対応)
      • 本番では冗長構成が有効になるため、開発時のみON推奨
      • Hibernation Schedules / Scheduler: デフォルトでOK
  3. 全ての設定完了後、「Create Deployment」をクリック

screencapture-cloud-astronomer-io-xxx-deployments-create-2025-06-15-10_48_49.png

作成後の確認

作成が完了したら、以下からAirflow UIを確認できる:

  • サイドバー → Deployments → 該当Deployment(例:xxx-etl) → Open Airflow をクリック
    screencapture-xxx-astronomer-run-d3ih0fph-home-2025-06-15-11_18_17.png

ローカル環境

セットアップ

  • Astro CLIをインストール(mac)

    brew install astro
    
  • Astro Projectを作成

    % astro dev init xxx-etl && cd xxx-etl
    
  • 以下のような初期ファイルが作成

    xxx-etl/
    ├── .astro/
    ├── dags/
    ├── include/
    ├── plugins/
    ├── tests/
    ├── .dockerignore
    ├── .env
    ├── .gitignore
    ├── airflow_settings.yaml
    ├── Dockerfile
    ├── packages.txt
    ├── README.md
    └── requirements.txt
    
    
  • Dockerfileを変更
    バージョンをAstro Airflowと合わせます

    FROM quay.io/astronomer/astro-runtime:13.0.0
    
  • 既存Dag削除
    このDagは上記バージョンだと実行できません

    rm dags/exampledag.py
    
    
  • hello_world.pyを作成

    from airflow.decorators import dag, task
    from datetime import datetime
    
    @task
    def hello_world():
        print("Hello World")
    
    @dag(
        dag_id='hello_world',
        start_date=datetime(2023, 1, 1),
        schedule='@daily',
        catchup=False,
        default_args={'owner': 'airflow', 'retries': 1},
    )
    def hello_world_dag():
        hello_world_task = hello_world()
    
    hello_world_dag()
    
    
  • ローカル環境を実行

    astro dev start
    
  • localhost:8080にアクセスしAirflowが起動を確認します
    Usename, Passwordはadmin:admin

Astro Deploy

  • このコマンドでAstro 環境にデプロイを行う

    % astro deploy
    Authenticated to Astro 
    
    Select a Deployment
     #     DEPLOYMENT NAME     RELEASE NAME           DEPLOYMENT ID                 DAG DEPLOY ENABLED     
     1     xxx-etl      native-optics-7108     xxxxxxxxxxxxxxxxxxxxxxxxx     true                   
    
    > 1
    ...
    
    Deployed DAG bundle:  2025-06-15T04:40:26.8295719Z
    Deployed Image Tag:  deploy-2025-06-15T04-38-12
    Successfully pushed image to Astronomer registry. Navigate to the Astronomer UI for confirmation that your deploy was successful. To deploy dags only run astro deploy --dags.
    
     Access your Deployment:
    
     Deployment View: https://cloud.astronomer.io/xxx/deployments/cmbx0ymnt199b01iyy3ih0fph/overview
     Airflow UI: https://xxx.astronomer.run/d3ih0fph?orgId=org_695Fq6t8IHpWm3zh
    
  • Airflow UIにアクセスしhello_worldが実行できるのを確認します
    スクリーンショット 2025-06-15 13.46.56.png

Dataflow連携

  • サンプル用のdagとbeamを作成

    xxx-etl/
    ├── dags/
    │   ├── hello_world.py
    │   └── run_beam_pipeline_dag.py
    └──dataflow/
        └── jobs/
            ├── yf_mart/
            └── yf_raw/
                └── import_xxx_log_daily.py
    
    
  • import_xxx_log_daily.pyを作成
    GCSにある .jsonl ファイルを読み込んで、BigQuery の指定テーブルに書き込む Dataflow パイプラインを実行する。

    import argparse
    import json
    import logging
    from datetime import datetime
    
    import apache_beam as beam
    import pytz
    from apache_beam.io import ReadAllFromText, WriteToBigQuery
    from apache_beam.io.fileio import MatchFiles
    from apache_beam.options.pipeline_options import PipelineOptions
    
    job_name = "import_xxx_log_daily"
    
    def run(argv=None):
        parser = argparse.ArgumentParser()
        parser.add_argument(
            "--runner",
            dest="runner",
            default="DataflowRunner",
            help="Apache Beam runner. DataflowRunner or DirectRunner",
        )
        parser.add_argument(
            "--input",
            dest="input",
            help="Input Date (e.g., YYYYMMDD)",
        )
    
        parser.add_argument("--environment")
        parser.add_argument("--project")
        parser.add_argument("--region")
        parser.add_argument("--temp_location")
        parser.add_argument("--staging_location")
        parser.add_argument("--bucket_name")
    
        known_args, pipeline_args = parser.parse_known_args(argv)
    
        pipeline_args += [
            f"--runner={known_args.runner}",
            f"--project={known_args.project}",
            f"--region={known_args.region}",
            f"--temp_location={known_args.temp_location}",
            f"--staging_location={known_args.staging_location}",
            "--worker_machine_type=e2-medium",
            "--num_workers=1",
            "--max_num_workers=2",
        ]
    
        pipeline_options = PipelineOptions(pipeline_args)
    
        # タイムスタンプを生成して出力ファイル名を動的に設定
        if known_args.input:
            current_date = known_args.input
        else:
            current_date = (
                datetime.now(pytz.timezone("Asia/Tokyo")).date().strftime("%Y%m%d")
            )
    
        environment = known_args.environment
        bucket_name = known_args.bucket_name
    
        # フォルダ以下のすべての JSONL ファイルを対象
        input_files_pattern = f"{bucket_name}/threads/logs/{current_date}/*.jsonl"
    
        # BigQueryのテーブル指定
        table_spec = f"xxx:yf_raw_{environment}.xxx_log_daily"
    
        with beam.Pipeline(options=pipeline_options) as p:
            (
                p
                | "Match Files" >> MatchFiles(input_files_pattern)  # ファイルをマッチング
                | "Read All JSONL Files" >> ReadAllFromText()  # 全ファイルを読み込む
                | "Parse JSONL" >> beam.Map(lambda line: json.loads(line))
                | "Write to BigQuery"
                >> WriteToBigQuery(
                    table=table_spec,
                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,  # 既存テーブルを使用
                    method="STREAMING_INSERTS",
                )
            )
    
    if __name__ == "__main__":
        logging.getLogger().setLevel(logging.INFO)
        run()
    
    
  • run_beam_pipeline_dag.pyを作成
    GCS 上の import_xxx_log_daily.py(Beamスクリプト)を、Airflow DAGから BeamRunPythonPipelineOperator を使って実行します

    from airflow import DAG
    from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
    from datetime import datetime
    
    with DAG(
        dag_id="run_beam_from_gcs",
        start_date=datetime(2024, 1, 1),
        schedule=None,
        catchup=False,
    ) as dag:
        run_beam = BeamRunPythonPipelineOperator(
            task_id="beam_run_gcs_script",
            py_file="gs://xxx-dataflow-local/dataflow/jobs/yf_raw/import_xxx_log_daily.py",
            runner="DataflowRunner",
            pipeline_options={
                "runner": "DataflowRunner",
                "input": "20250518",
                "environment": "local",
                "project": "xxx",
                "region": "us-central1",
                "bucket_name": "gs://xxx-data-local",
                "temp_location": "gs://xxx-dataflow-local/temp_location",
                "staging_location": "gs://xxx-dataflow-local/staging_location",
            },
            gcp_conn_id="google_cloud_default",
        )
    
    
  • import_xxx_log_daily.pyを指定のGCSに保存します
    この場合はxxx-dataflow-local

  • Dataflowと連携するためにコネクションの追加をします
    ※ 設定しなくても自動追加されることもあります

    • Airflow→Connections→Add a new record
    • Connection Id:google_cloud_default
    • Connection Type:Google Cloud
    • Project Id:yf_analyst
    • Keyfile Path:サービスアカウントのjsonlのパスを追加
    • Number of Retries:5

screencapture-localhost-8080-connection-edit-1-2025-06-16-10_01_53.png

  • beamのパッケージを追加

    • requirements.txtにapache-airflow-providers-apache-beam apache-beam[gcp]を追加します
  • google-cloud-cliを追加
    google-cloud-cli はPyPIにないためDockerfileに直接記述してインストールを行います

    FROM quay.io/astronomer/astro-runtime:13.0.0
    USER root
    
    RUN apt-get update && \
        apt-get install -y curl gnupg && \
        echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] http://packages.cloud.google.com/apt cloud-sdk main" \
        | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list && \
        curl https://packages.cloud.google.com/apt/doc/apt-key.gpg \
        | apt-key --keyring /usr/share/keyrings/cloud.google.gpg add - && \
        apt-get update && \
        apt-get install -y google-cloud-cli
    
    USER astro
    
  • ローカルのAirflowを再起動します

    astro dev restart
    
  • localhost:8080にアクセスしrun_beam_pipeline_dagの実行を確認します

  • Astro Airflowにデプロイを行います

    astro deploy
    
    Select a Deployment
     #     DEPLOYMENT NAME     RELEASE NAME           DEPLOYMENT ID                 DAG DEPLOY ENABLED     
     1     xxx-etl      native-optics-7108     cmbx0ymnt199b01iyy3ih0fph     true                   
    
    > 1
    Building image...
    
    ...
    deploy-2025-06-16T01-35-37: digest: sha256:bf98062461f2c2deb9c5ada5df542d794033bfe95d5cea1950319d3fd47fed57 size: 9084
    Deployed DAG bundle:  2025-06-16T01:41:45.0811805Z
    Deployed Image Tag:  deploy-2025-06-16T01-35-37
    Successfully pushed image to Astronomer registry. Navigate to the Astronomer UI for confirmation that your deploy was successful. To deploy dags only run astro deploy --dags.
    
     Access your Deployment:
    
     Deployment View: https://cloud.astronomer.io/xxx/deployments/cmbx0ymnt199b01iyy3ih0fph/overview
     Airflow UI: https://xxx.astronomer.run/d3ih0fph?orgId=org_695Fq6t8IHpWm3zh
    
  • Astro AirflowのConnectionを対応

    • ローカルと同様に、Dataflowと連携するためのコネクションを追加します
  • Airflow UIにアクセスしrun_beam_pipeline_dagが実行できるのを確認します

スクリーンショット 2025-06-16 10.52.17.png

環境変数

  • ローカル環境は.envを作成しdotenv で読み込み

    # dataflow
    APP_ENV=local
    PROJECT=xxx
    BUCKET_NAME=gs://xxx-data-local
    DATAFLOW_JOB_BUCKET_PATH=gs://xxx-dataflow-local/dataflow/jobs
    REGION=us-central1
    TEMP_LOCATION=gs://xxx-dataflow-local/temp_location
    STAGING_LOCATION=gs://xxx-dataflow-local/staging_location
    
    import os
    
    from airflow import DAG
    from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
    from datetime import datetime
    
    try:
        from dotenv import load_dotenv
        load_dotenv()
    except ImportError:
        pass  # Astroではdotenv不要
    
    APP_ENV = os.getenv("APP_ENV", "local")
    PROJECT = os.getenv("PROJECT", "xxx")
    REGION = os.getenv("REGION", "us-central1")
    BUCKET_NAME = os.getenv("BUCKET_NAME")
    DATAFLOW_JOB_BUCKET_PATH = os.getenv("DATAFLOW_JOB_BUCKET_PATH")
    TEMP_LOCATION = os.getenv("TEMP_LOCATION")
    STAGING_LOCATION = os.getenv("STAGING_LOCATION")
    
  • Astro環境はDeployment → Environment Variablesを選択して追加することで環境変数として利用することができます
    スクリーンショット 2025-06-16 23.27.07.png

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?