0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Amazon Managed Workflows for Apache Airflow (MWAA)って何だろう?

Last updated at Posted at 2025-09-17

はじめに

会社で使うかもという話があったので調べてみたがさっぱりわからない。以下のブログでハンズオン的なものがあったので、そのまま実施してみることにする

  • CloudFormationによる自動化はこちら

S3バケットの用意

  • バケットが必要らしい。以下の階層にする
    • airflow-firstairflow-handson-lab
      • dags/

別の一意になる名前にしてね

image.png

Airflowの環境を作成

マネジメントコンソールでMWAAにアクセスし、[環境を作成]ボタンをクリックします。

image.png

環境作成に約30分かかります

画面名:詳細を指定

大項目 中項目 選択肢 予備欄/備考
環境の詳細 名前 MyAirflowEnvironment デフォルト値です
Airflow バージョン ✅2.10.3(最新)
週 1 回のメンテナンスウィンドウ開始 (UTC) ✅Sunday
✅13:30
Amazon S3 の DAG コード 🔴S3 バケット s3://airflow-firstairflow-handson-lab
🔴DAG フォルダ s3://airflow-firstairflow-handson-lab/dags/
プラグインファイル - オプション
要件ファイル - オプション
スタートアップスクリプトファイル - オプション

画面名:詳細設定を構成

大項目 中項目 選択肢 予備欄/備考
ネットワーク 🔴Virtual Private Cloud (VPC) MWAA VPC を作成して選択します。
サブネット 1 MWAA VPCを選択すると自動入力されます
サブネット 2 MWAA VPCを選択すると自動入力されます
🔴ウェブサーバーのアクセス ⬜非公開ネットワーク (インターネットアクセスなし)
✅公開ネットワーク (インターネットにアクセス可能)
🔴セキュリティグループ ✅新しいセキュリティグループを作成
Existing security group(s)
エンドポイント管理 ✅サービスマネージドエンドポイント (推奨)
⬜カスタマーマネージドエンドポイント
環境クラス 🔴 ✅mw1.micro
最大ワーカー数 1 クラス毎にデフォルト値変動
最小ワーカー数 1 クラス毎にデフォルト値変動
Web サーバーの最大数 1 クラス毎にデフォルト値変動
最小 Web サーバー数 1 クラス毎にデフォルト値変動
スケジューラ数 1 クラス毎にデフォルト値変動
暗号化 ⬜暗号化設定をカスタマイズする (高度)
モニタリング Airflow ログ設定 ✅Airflow タスクログ
ログレベル ⬜重要
⬜エラー
⬜警告
✅情報
⬜Airflow ウェブサーバーのログ
⬜Airflow スケジューラのログ
⬜Airflow ワーカーログ
⬜Airflow DAG 処理ログ
Airflow 設定オプション - オプション
タグ - オプション
アクセス許可 実行ロール
✅新しいロールを作成
ロール名 AmazonMWAA-MyAirflowEnvironment-BUrDmV デフォルトのままでOK

画面名:スタックのクイック作成

大項目 中項目 選択肢 予備欄/備考
スタック名を提供 MWAA-VPC デフォルト値です
パラメータ EnvironmentName MWAAEnvironment デフォルト値です
PrivateSubnet1CIDR 10.192.20.0/24 デフォルト値です
PrivateSubnet2CIDR 10.192.21.0/24 デフォルト値です
PublicSubnet1CIDR 10.192.10.0/24 デフォルト値です
PublicSubnet2CIDR 10.192.11.0/24 デフォルト値です
VpcCIDR 10.192.0.0/16 デフォルト値です
タグ - オプション
アクセス許可 - オプション
スタックの失敗オプション
スタックポリシー - オプション
ロールバック設定 - オプション
通知オプション - オプション
スタックの作成オプション - オプション

こうなれば完了です。その後、Airflow UI を開くをクリックする
image.png

DAGをアップロードして実行してみた

DAG フォルダにrandom_sleep.pyを入れるとDAG作成されます。

最初、Pausedの画面にいて見つからなかったが、実行した影響なのか、ALLの画面で見れた

image.png

グラフ表示と実行ボタン
image.png

うーん。動いたけどこれのやりたいことが全然わからない。何かデータ集出しないとイメージできない。

AIにもう一つDAGを作ってもらった

dag_s3_sales_pipeline.py

実行すると、二日分のフォルダが生成し、動きが怪しいのですが、動けばOKです

権限追加

AmazonMWAA-MyAirflowEnvironment-XXXXロールに以下をインラインポリシーで与えること。DAGからS3への書き込み権限を付与します

s3-put-handson
{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "ListBucket",
			"Effect": "Allow",
			"Action": [
				"s3:ListBucket"
			],
			"Resource": "arn:aws:s3:::airflow-firstairflow-handson-lab"
		},
		{
			"Sid": "RWObjects",
			"Effect": "Allow",
			"Action": [
				"s3:GetObject",
				"s3:PutObject",
				"s3:DeleteObject"
			],
			"Resource": "arn:aws:s3:::airflow-firstairflow-handson-lab/*"
		}
	]
}

処理概要(s3_sales_pipeline)

  1. サンプルデータ生成(ensure_sample)

    • S3 バケット airflow-firstairflow-handson-lab
      landing/date=YYYY-MM-DD/ に入力ファイルが無ければ、自動でサンプル JSONL を生成して保存。
    • データ内容:coffee / sandwich / tea の売上(price × qty)。
  2. バリデーション & 集計(validate_and_aggregate)

    • landing/date=YYYY-MM-DD/ の JSONL を読み込み。
    • 必須キー(ts, product, price, qty)をチェック。
    • product ごとに 売上合計(price × qty) を算出。
    • 集計結果と処理済みファイル一覧を XCom に保存。
  3. 集計結果の保存(write_result)

    • 集計結果を JSON にまとめて保存。
    • 出力先:processed/date=YYYY-MM-DD/summary.json
    • 例:
      {
        "date": "2025-09-17",
        "revenue_by_product": {
          "coffee": 900,
          "sandwich": 600,
          "tea": 600
        },
        "generated_at": "2025-09-17T07:00:49Z"
      }
      
  4. 入力データの退避(archive_inputs)

    • landing/ 配下の処理済みファイルを archive/ にコピー。
    • コピー完了後、landing/ 側を削除。

✅DAGの流れ
ensure_sample → validate_and_aggregate → write_result → archive_inputs

✅ つまり、この DAG は
「毎日 S3 に置かれた売上データを集計 → 結果を保存 → 元データをアーカイブ」するパイプライン です。
これだけ見ると「S3トリガーで Lambda を呼べば十分では?」と思うかもしれません。
しかし Airflow(MWAA)の強みは ワークフロー全体を段取り管理できること にあります。

  • 複数ステップの依存関係管理

    • 今回の例でも「サンプル生成 → バリデーション → 集計 → アーカイブ」と順番が大事。
    • Lambda単体だと依存管理を自前で書く必要があるが、Airflowなら DAG の矢印で可視化&自動制御できる。
  • 失敗時のリトライや再実行が簡単

    • Lambdaは失敗すれば再実行を手で呼ぶ必要がある。
    • Airflowは「失敗したタスクだけ再実行」や「自動リトライ」設定が可能。
  • スケジューリングと運用監視

    • 「毎日 9時に実行」「週末はスキップ」など柔軟に設定可能。
    • 実行履歴や処理時間をUIで確認でき、可視化された依存グラフで監視しやすい。
  • 拡張性

    • 今はS3処理だけでも、後から Glue / Redshift / SageMaker / ECS など他サービスを組み合わせる際に、Airflowが“指揮者”として全体を統括できる。

👉 まとめると、
Lambdaは「単発の自動処理」に最適ですが、Airflowは「段取りが複雑なデータパイプラインを可視化しながら運用する」のに向いています。

少し理解できた。今回はS3しかリソースを使ってないが、いろいろなデータを自動で処理する仕組みがつくれて、便利な機能がたくさんあるものという理解をしておく。他のリソースでも同じことができるが楽に実装できるのであろう

入力と出力の関係

入力(landing/ の JSONL)
{"ts": "2025-09-17T09:00:00Z", "product": "coffee", "price": 300, "qty": 2}
{"ts": "2025-09-17T10:00:00Z", "product": "sandwich", "price": 600, "qty": 1}
{"ts": "2025-09-17T11:00:00Z", "product": "coffee", "price": 300, "qty": 1}
{"ts": "2025-09-17T12:00:00Z", "product": "tea", "price": 200, "qty": 3}
  • 1行目 → coffee = 300 × 2 = 600
  • 2行目 → sandwich = 600 × 1 = 600
  • 3行目 → coffee = 300 × 1 = 300
  • 4行目 → tea = 200 × 3 = 600
出力(processed/ の summary.json)
{
  "date": "2025-09-17",
  "revenue_by_product": {
    "coffee": 900,
    "sandwich": 600,
    "tea": 600
  },
  "generated_at": "2025-09-17T07:00:49Z"
}

JSONL(JSON Lines)とは
1行に1レコード(1つの JSON オブジェクト)を書く形式
複数行で複数レコードを表現する
改行で区切るだけなのでシンプルでストリーム処理に向いている

Apache Airflowとは?

Amazon Managed Workflows for Apache Airflow (MWAA)とは?

公式マニュアル

ユーザガイド

サンプル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?