移行の目的
別プロジェクトでGCPのComposer(Airflow)を利用しているが、以下のような課題を感じていたため、新規プロジェクトはAstroの利用を検討。
現状の課題
- ローカルで簡単に Airflow 環境を構築したい
- 利用していない時間帯でも Composer は最低数万円の費用が発生する(ゼロデプロイ不可)
- Dataflow など GCP サービスとの連携を Composer 経由だけでなく、ローカルからも柔軟に実行したい
Astro とは?
Astro は Astronomer が提供する Airflow のマネージドサービス。以下のような特徴がある:
-
Astro CLI
によって ローカル環境構築が非常に簡単 - Development モードでは、使っていないときに Scheduler や Web UI を停止でき、コスト削減(ゼロデプロイ)が可能
- ローカル環境でも Cloud Dataflow などの GCP サービスと連携可能
アカウント登録・ログイン
以下からアカウント作成・ログインを行う:
Deployment 作成手順
Airflow 環境 = Astro における「Deployment」に相当。
作成手順
- Astro UI サイドバー →
Deployments
→+Deployment
をクリック - 各設定を以下のように入力:
- Basic
- NAME / DESCRIPTION: 任意
- MODE: Hosted Execution
- Cluster
- Standard Cluster
- PROVIDER: GCP(Dataflow 利用のため GCP 推奨)
- REGION: us-central1(推奨)
- Execution
-
Astro Runtime / Airflow Version:
13.0.0 (Airflow v2.11.0 ベース)
- バージョンにより利用できる Provider パッケージが異なる
- ※
apache-airflow-providers-google
は Airflow v3 非対応のため、v2 系を使用 - 参考: Provider Compatibility List
- EXECUTOR: Celery Executor
- Worker Queues / KubernetesPodOperator Pods: デフォルトでOK(後から変更可)
-
Astro Runtime / Airflow Version:
- Advanced
- DEVELOPMENT MODE: ON
- Web UI・Scheduler を停止可能(ゼロデプロイ対応)
- 本番では冗長構成が有効になるため、開発時のみON推奨
- Hibernation Schedules / Scheduler: デフォルトでOK
- Basic
- 全ての設定完了後、「Create Deployment」をクリック
作成後の確認
作成が完了したら、以下からAirflow UIを確認できる:
ローカル環境
セットアップ
-
Astro CLIをインストール(mac)
brew install astro
-
Astro Projectを作成
% astro dev init xxx-etl && cd xxx-etl
-
以下のような初期ファイルが作成
xxx-etl/ ├── .astro/ ├── dags/ ├── include/ ├── plugins/ ├── tests/ ├── .dockerignore ├── .env ├── .gitignore ├── airflow_settings.yaml ├── Dockerfile ├── packages.txt ├── README.md └── requirements.txt
-
Dockerfileを変更
バージョンをAstro Airflowと合わせますFROM quay.io/astronomer/astro-runtime:13.0.0
-
既存Dag削除
このDagは上記バージョンだと実行できませんrm dags/exampledag.py
-
hello_world.pyを作成
from airflow.decorators import dag, task from datetime import datetime @task def hello_world(): print("Hello World") @dag( dag_id='hello_world', start_date=datetime(2023, 1, 1), schedule='@daily', catchup=False, default_args={'owner': 'airflow', 'retries': 1}, ) def hello_world_dag(): hello_world_task = hello_world() hello_world_dag()
-
ローカル環境を実行
astro dev start
-
localhost:8080にアクセスしAirflowが起動を確認します
Usename, Passwordはadmin:admin
Astro Deploy
-
このコマンドでAstro 環境にデプロイを行う
% astro deploy Authenticated to Astro Select a Deployment # DEPLOYMENT NAME RELEASE NAME DEPLOYMENT ID DAG DEPLOY ENABLED 1 xxx-etl native-optics-7108 xxxxxxxxxxxxxxxxxxxxxxxxx true > 1 ... Deployed DAG bundle: 2025-06-15T04:40:26.8295719Z Deployed Image Tag: deploy-2025-06-15T04-38-12 Successfully pushed image to Astronomer registry. Navigate to the Astronomer UI for confirmation that your deploy was successful. To deploy dags only run astro deploy --dags. Access your Deployment: Deployment View: https://cloud.astronomer.io/xxx/deployments/cmbx0ymnt199b01iyy3ih0fph/overview Airflow UI: https://xxx.astronomer.run/d3ih0fph?orgId=org_695Fq6t8IHpWm3zh
Dataflow連携
-
サンプル用のdagとbeamを作成
xxx-etl/ ├── dags/ │ ├── hello_world.py │ └── run_beam_pipeline_dag.py └──dataflow/ └── jobs/ ├── yf_mart/ └── yf_raw/ └── import_xxx_log_daily.py
-
import_xxx_log_daily.pyを作成
GCSにある .jsonl ファイルを読み込んで、BigQuery の指定テーブルに書き込む Dataflow パイプラインを実行する。import argparse import json import logging from datetime import datetime import apache_beam as beam import pytz from apache_beam.io import ReadAllFromText, WriteToBigQuery from apache_beam.io.fileio import MatchFiles from apache_beam.options.pipeline_options import PipelineOptions job_name = "import_xxx_log_daily" def run(argv=None): parser = argparse.ArgumentParser() parser.add_argument( "--runner", dest="runner", default="DataflowRunner", help="Apache Beam runner. DataflowRunner or DirectRunner", ) parser.add_argument( "--input", dest="input", help="Input Date (e.g., YYYYMMDD)", ) parser.add_argument("--environment") parser.add_argument("--project") parser.add_argument("--region") parser.add_argument("--temp_location") parser.add_argument("--staging_location") parser.add_argument("--bucket_name") known_args, pipeline_args = parser.parse_known_args(argv) pipeline_args += [ f"--runner={known_args.runner}", f"--project={known_args.project}", f"--region={known_args.region}", f"--temp_location={known_args.temp_location}", f"--staging_location={known_args.staging_location}", "--worker_machine_type=e2-medium", "--num_workers=1", "--max_num_workers=2", ] pipeline_options = PipelineOptions(pipeline_args) # タイムスタンプを生成して出力ファイル名を動的に設定 if known_args.input: current_date = known_args.input else: current_date = ( datetime.now(pytz.timezone("Asia/Tokyo")).date().strftime("%Y%m%d") ) environment = known_args.environment bucket_name = known_args.bucket_name # フォルダ以下のすべての JSONL ファイルを対象 input_files_pattern = f"{bucket_name}/threads/logs/{current_date}/*.jsonl" # BigQueryのテーブル指定 table_spec = f"xxx:yf_raw_{environment}.xxx_log_daily" with beam.Pipeline(options=pipeline_options) as p: ( p | "Match Files" >> MatchFiles(input_files_pattern) # ファイルをマッチング | "Read All JSONL Files" >> ReadAllFromText() # 全ファイルを読み込む | "Parse JSONL" >> beam.Map(lambda line: json.loads(line)) | "Write to BigQuery" >> WriteToBigQuery( table=table_spec, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER, # 既存テーブルを使用 method="STREAMING_INSERTS", ) ) if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) run()
-
run_beam_pipeline_dag.pyを作成
GCS 上の import_xxx_log_daily.py(Beamスクリプト)を、Airflow DAGから BeamRunPythonPipelineOperator を使って実行しますfrom airflow import DAG from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator from datetime import datetime with DAG( dag_id="run_beam_from_gcs", start_date=datetime(2024, 1, 1), schedule=None, catchup=False, ) as dag: run_beam = BeamRunPythonPipelineOperator( task_id="beam_run_gcs_script", py_file="gs://xxx-dataflow-local/dataflow/jobs/yf_raw/import_xxx_log_daily.py", runner="DataflowRunner", pipeline_options={ "runner": "DataflowRunner", "input": "20250518", "environment": "local", "project": "xxx", "region": "us-central1", "bucket_name": "gs://xxx-data-local", "temp_location": "gs://xxx-dataflow-local/temp_location", "staging_location": "gs://xxx-dataflow-local/staging_location", }, gcp_conn_id="google_cloud_default", )
-
import_xxx_log_daily.pyを指定のGCSに保存します
この場合はxxx-dataflow-local -
Dataflowと連携するためにコネクションの追加をします
※ 設定しなくても自動追加されることもあります- Airflow→Connections→Add a new record
- Connection Id:google_cloud_default
- Connection Type:Google Cloud
- Project Id:yf_analyst
- Keyfile Path:サービスアカウントのjsonlのパスを追加
- Number of Retries:5
-
beamのパッケージを追加
- requirements.txtに
apache-airflow-providers-apache-beam
apache-beam[gcp]
を追加します
- requirements.txtに
-
google-cloud-cli
を追加
google-cloud-cli
はPyPIにないためDockerfileに直接記述してインストールを行いますFROM quay.io/astronomer/astro-runtime:13.0.0 USER root RUN apt-get update && \ apt-get install -y curl gnupg && \ echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] http://packages.cloud.google.com/apt cloud-sdk main" \ | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list && \ curl https://packages.cloud.google.com/apt/doc/apt-key.gpg \ | apt-key --keyring /usr/share/keyrings/cloud.google.gpg add - && \ apt-get update && \ apt-get install -y google-cloud-cli USER astro
-
ローカルのAirflowを再起動します
astro dev restart
-
localhost:8080にアクセスし
run_beam_pipeline_dag
の実行を確認します -
Astro Airflowにデプロイを行います
astro deploy Select a Deployment # DEPLOYMENT NAME RELEASE NAME DEPLOYMENT ID DAG DEPLOY ENABLED 1 xxx-etl native-optics-7108 cmbx0ymnt199b01iyy3ih0fph true > 1 Building image... ... deploy-2025-06-16T01-35-37: digest: sha256:bf98062461f2c2deb9c5ada5df542d794033bfe95d5cea1950319d3fd47fed57 size: 9084 Deployed DAG bundle: 2025-06-16T01:41:45.0811805Z Deployed Image Tag: deploy-2025-06-16T01-35-37 Successfully pushed image to Astronomer registry. Navigate to the Astronomer UI for confirmation that your deploy was successful. To deploy dags only run astro deploy --dags. Access your Deployment: Deployment View: https://cloud.astronomer.io/xxx/deployments/cmbx0ymnt199b01iyy3ih0fph/overview Airflow UI: https://xxx.astronomer.run/d3ih0fph?orgId=org_695Fq6t8IHpWm3zh
-
Astro AirflowのConnectionを対応
- ローカルと同様に、Dataflowと連携するためのコネクションを追加します
-
Airflow UIにアクセスしrun_beam_pipeline_dagが実行できるのを確認します
環境変数
-
ローカル環境は.envを作成し
dotenv
で読み込み# dataflow APP_ENV=local PROJECT=xxx BUCKET_NAME=gs://xxx-data-local DATAFLOW_JOB_BUCKET_PATH=gs://xxx-dataflow-local/dataflow/jobs REGION=us-central1 TEMP_LOCATION=gs://xxx-dataflow-local/temp_location STAGING_LOCATION=gs://xxx-dataflow-local/staging_location
import os from airflow import DAG from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator from datetime import datetime try: from dotenv import load_dotenv load_dotenv() except ImportError: pass # Astroではdotenv不要 APP_ENV = os.getenv("APP_ENV", "local") PROJECT = os.getenv("PROJECT", "xxx") REGION = os.getenv("REGION", "us-central1") BUCKET_NAME = os.getenv("BUCKET_NAME") DATAFLOW_JOB_BUCKET_PATH = os.getenv("DATAFLOW_JOB_BUCKET_PATH") TEMP_LOCATION = os.getenv("TEMP_LOCATION") STAGING_LOCATION = os.getenv("STAGING_LOCATION")
-
Astro環境はDeployment → Environment Variablesを選択して追加することで環境変数として利用することができます