はじめに
会社で使うかもという話があったので調べてみたがさっぱりわからない。以下のブログでハンズオン的なものがあったので、そのまま実施してみることにする
- CloudFormationによる自動化はこちら
S3バケットの用意
- バケットが必要らしい。以下の階層にする
- airflow-firstairflow-handson-lab
- dags/
- airflow-firstairflow-handson-lab
別の一意になる名前にしてね
Airflowの環境を作成
マネジメントコンソールでMWAAにアクセスし、[環境を作成]ボタンをクリックします。
環境作成に約30分かかります
画面名:詳細を指定
大項目 | 中項目 | 選択肢 | 予備欄/備考 |
---|---|---|---|
環境の詳細 | 名前 | MyAirflowEnvironment | デフォルト値です |
Airflow バージョン | ✅2.10.3(最新) | ||
週 1 回のメンテナンスウィンドウ開始 (UTC) | ✅Sunday | ||
✅13:30 | |||
Amazon S3 の DAG コード | 🔴S3 バケット | s3://airflow-firstairflow-handson-lab | |
🔴DAG フォルダ | s3://airflow-firstairflow-handson-lab/dags/ | ||
プラグインファイル - オプション | |||
要件ファイル - オプション | |||
スタートアップスクリプトファイル - オプション |
画面名:詳細設定を構成
大項目 | 中項目 | 選択肢 | 予備欄/備考 |
---|---|---|---|
ネットワーク | 🔴Virtual Private Cloud (VPC) | MWAA VPC を作成して選択します。 | |
サブネット 1 | MWAA VPCを選択すると自動入力されます | ||
サブネット 2 | MWAA VPCを選択すると自動入力されます | ||
🔴ウェブサーバーのアクセス | ⬜非公開ネットワーク (インターネットアクセスなし) ✅公開ネットワーク (インターネットにアクセス可能) |
||
🔴セキュリティグループ | ✅新しいセキュリティグループを作成 | ||
Existing security group(s) | |||
エンドポイント管理 | ✅サービスマネージドエンドポイント (推奨) ⬜カスタマーマネージドエンドポイント |
||
環境クラス | 🔴 | ✅mw1.micro | |
最大ワーカー数 | 1 | クラス毎にデフォルト値変動 | |
最小ワーカー数 | 1 | クラス毎にデフォルト値変動 | |
Web サーバーの最大数 | 1 | クラス毎にデフォルト値変動 | |
最小 Web サーバー数 | 1 | クラス毎にデフォルト値変動 | |
スケジューラ数 | 1 | クラス毎にデフォルト値変動 | |
暗号化 | ⬜暗号化設定をカスタマイズする (高度) | ||
モニタリング | Airflow ログ設定 | ✅Airflow タスクログ | |
ログレベル | ⬜重要 ⬜エラー ⬜警告 ✅情報 |
||
⬜Airflow ウェブサーバーのログ | |||
⬜Airflow スケジューラのログ | |||
⬜Airflow ワーカーログ | |||
⬜Airflow DAG 処理ログ | |||
Airflow 設定オプション - オプション | |||
タグ - オプション | |||
アクセス許可 | 実行ロール | ||
✅新しいロールを作成 | |||
ロール名 | AmazonMWAA-MyAirflowEnvironment-BUrDmV | デフォルトのままでOK |
画面名:スタックのクイック作成
大項目 | 中項目 | 選択肢 | 予備欄/備考 |
---|---|---|---|
スタック名を提供 | MWAA-VPC | デフォルト値です | |
パラメータ | EnvironmentName | MWAAEnvironment | デフォルト値です |
PrivateSubnet1CIDR | 10.192.20.0/24 | デフォルト値です | |
PrivateSubnet2CIDR | 10.192.21.0/24 | デフォルト値です | |
PublicSubnet1CIDR | 10.192.10.0/24 | デフォルト値です | |
PublicSubnet2CIDR | 10.192.11.0/24 | デフォルト値です | |
VpcCIDR | 10.192.0.0/16 | デフォルト値です | |
タグ - オプション | |||
アクセス許可 - オプション | |||
スタックの失敗オプション | |||
スタックポリシー - オプション | |||
ロールバック設定 - オプション | |||
通知オプション - オプション | |||
スタックの作成オプション - オプション |
こうなれば完了です。その後、Airflow UI を開く
をクリックする
DAGをアップロードして実行してみた
DAG フォルダにrandom_sleep.py
を入れるとDAG作成されます。
最初、Pausedの画面にいて見つからなかったが、実行した影響なのか、ALLの画面で見れた
うーん。動いたけどこれのやりたいことが全然わからない。何かデータ集出しないとイメージできない。
AIにもう一つDAGを作ってもらった
実行すると、二日分のフォルダが生成し、動きが怪しいのですが、動けばOKです
権限追加
AmazonMWAA-MyAirflowEnvironment-XXXXロールに以下をインラインポリシーで与えること。DAGからS3への書き込み権限を付与します
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ListBucket",
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": "arn:aws:s3:::airflow-firstairflow-handson-lab"
},
{
"Sid": "RWObjects",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::airflow-firstairflow-handson-lab/*"
}
]
}
処理概要(s3_sales_pipeline)
-
サンプルデータ生成(ensure_sample)
- S3 バケット
airflow-firstairflow-handson-lab
の
landing/date=YYYY-MM-DD/
に入力ファイルが無ければ、自動でサンプル JSONL を生成して保存。 - データ内容:coffee / sandwich / tea の売上(
price × qty
)。
- S3 バケット
-
バリデーション & 集計(validate_and_aggregate)
-
landing/date=YYYY-MM-DD/
の JSONL を読み込み。 - 必須キー(
ts
,product
,price
,qty
)をチェック。 -
product
ごとに 売上合計(price × qty) を算出。 - 集計結果と処理済みファイル一覧を XCom に保存。
-
-
集計結果の保存(write_result)
- 集計結果を JSON にまとめて保存。
- 出力先:
processed/date=YYYY-MM-DD/summary.json
- 例:
{ "date": "2025-09-17", "revenue_by_product": { "coffee": 900, "sandwich": 600, "tea": 600 }, "generated_at": "2025-09-17T07:00:49Z" }
-
入力データの退避(archive_inputs)
-
landing/
配下の処理済みファイルをarchive/
にコピー。 - コピー完了後、
landing/
側を削除。
-
✅DAGの流れ
ensure_sample → validate_and_aggregate → write_result → archive_inputs
✅ つまり、この DAG は
「毎日 S3 に置かれた売上データを集計 → 結果を保存 → 元データをアーカイブ」するパイプライン です。
これだけ見ると「S3トリガーで Lambda を呼べば十分では?」と思うかもしれません。
しかし Airflow(MWAA)の強みは ワークフロー全体を段取り管理できること にあります。
-
複数ステップの依存関係管理
- 今回の例でも「サンプル生成 → バリデーション → 集計 → アーカイブ」と順番が大事。
- Lambda単体だと依存管理を自前で書く必要があるが、Airflowなら DAG の矢印で可視化&自動制御できる。
-
失敗時のリトライや再実行が簡単
- Lambdaは失敗すれば再実行を手で呼ぶ必要がある。
- Airflowは「失敗したタスクだけ再実行」や「自動リトライ」設定が可能。
-
スケジューリングと運用監視
- 「毎日 9時に実行」「週末はスキップ」など柔軟に設定可能。
- 実行履歴や処理時間をUIで確認でき、可視化された依存グラフで監視しやすい。
-
拡張性
- 今はS3処理だけでも、後から Glue / Redshift / SageMaker / ECS など他サービスを組み合わせる際に、Airflowが“指揮者”として全体を統括できる。
👉 まとめると、
Lambdaは「単発の自動処理」に最適ですが、Airflowは「段取りが複雑なデータパイプラインを可視化しながら運用する」のに向いています。
少し理解できた。今回はS3しかリソースを使ってないが、いろいろなデータを自動で処理する仕組みがつくれて、便利な機能がたくさんあるものという理解をしておく。他のリソースでも同じことができるが楽に実装できるのであろう
入力と出力の関係
{"ts": "2025-09-17T09:00:00Z", "product": "coffee", "price": 300, "qty": 2}
{"ts": "2025-09-17T10:00:00Z", "product": "sandwich", "price": 600, "qty": 1}
{"ts": "2025-09-17T11:00:00Z", "product": "coffee", "price": 300, "qty": 1}
{"ts": "2025-09-17T12:00:00Z", "product": "tea", "price": 200, "qty": 3}
- 1行目 → coffee = 300 × 2 = 600
- 2行目 → sandwich = 600 × 1 = 600
- 3行目 → coffee = 300 × 1 = 300
- 4行目 → tea = 200 × 3 = 600
{
"date": "2025-09-17",
"revenue_by_product": {
"coffee": 900,
"sandwich": 600,
"tea": 600
},
"generated_at": "2025-09-17T07:00:49Z"
}
JSONL(JSON Lines)とは
1行に1レコード(1つの JSON オブジェクト)を書く形式
複数行で複数レコードを表現する
改行で区切るだけなのでシンプルでストリーム処理に向いている
Apache Airflowとは?
Amazon Managed Workflows for Apache Airflow (MWAA)とは?
公式マニュアル
ユーザガイド
サンプル