aws-mwaa-local-runnerとは
MWAA実行環境のDockerコンテナイメージをローカルでビルドします。これにより、MWAAにデプロイする前に、ローカルのAirflow環境を実行して、DAG、カスタム プラグイン、および依存関係を開発およびテストできます。
ローカルMWAAセットアップ手順
aws-mwaa-local-runnerのREADME
本Qiitaは基本的にこのREADMEと同じ内容です
手元のパソコンにDockerをインストールしておく
手順割愛
任意の場所にgit clone
git clone https://github.com/aws/aws-mwaa-local-runner.git
cd aws-mwaa-local-runner
Docker Image Build
./mwaa-local-env build-image
ローカルAirflow起動
./mwaa-local-env start
Airflow UIログイン
Username: admin
Password: test
http://localhost:8080/
AirflowのConnection作成
Airflow UIにログインし、Connectionを作成する。これはAirflow Operatorから接続する際の接続情報の定義。このあとのDAGで使います。
aws_defaultがConn idという接続情報のID。これをこのあと使う。
- Conn id:aws_default
- Conn Type:Amazon Web Services
- Login:アクセスキーID
- Password:シークレットアクセスキーID
- Extra:リージョン↓
{
"region_name": "ap-northeast-1"
}
ローカルMWAA操作手順
DAG追加は
Cloneしたフォルダのdagsフォルダに入れる
$ pwd
/xxx/aws-mwaa-local-runner
aws-mwaa-local-runner $ ls
CODE_OF_CONDUCT.md VERSION mwaa-local-env
CONTRIBUTING.md dags plugins
LICENSE db-data
README.md docker
Requirements.txtの追加は
Cloneしたフォルダの中のdagsフォルダの直下
dags/requirements.txt
mwaa-local-envを使うと、Airflowを実行せずにrequirements.txtを試すことが出来る。
./mwaa-local-env test-requirements
aws-mwaa-local-runner $ pwd
/xxx/mwaa/aws-mwaa-local-runner
aws-mwaa-local-runner $ ls
CODE_OF_CONDUCT.md README.md db-data plugins
CONTRIBUTING.md VERSION docker test-requirements
LICENSE dags mwaa-local-env
aws-mwaa-local-runner $ cat test-requirements
boto3==1.17.54
aws-mwaa-local-runner $ ./mwaa-local-env test-requirements
Container amazon/mwaa-local:2.0 exists. Skipping build
Installing requirements.txt
Collecting boto3==1.17.54
Downloading boto3-1.17.54-py2.py3-none-any.whl (131 kB)
|████████████████████████████████| 131 kB 3.4 MB/s
Requirement already satisfied: jmespath<1.0.0,>=0.7.1 in /usr/local/lib/python3.7/site-packages (from boto3==1.17.54->-r /usr/local/airflow/dags/requirements.txt (line 1)) (0.10.0)
Requirement already satisfied: s3transfer<0.5.0,>=0.4.0 in /usr/local/lib/python3.7/site-packages (from boto3==1.17.54->-r /usr/local/airflow/dags/requirements.txt (line 1)) (0.4.2)
Requirement already satisfied: botocore<1.21.0,>=1.20.54 in /usr/local/lib/python3.7/site-packages (from boto3==1.17.54->-r /usr/local/airflow/dags/requirements.txt (line 1)) (1.20.84)
Requirement already satisfied: python-dateutil<3.0.0,>=2.1 in /usr/local/lib/python3.7/site-packages (from botocore<1.21.0,>=1.20.54->boto3==1.17.54->-r /usr/local/airflow/dags/requirements.txt (line 1)) (2.8.1)
Requirement already satisfied: urllib3<1.27,>=1.25.4 in /usr/local/lib/python3.7/site-packages (from botocore<1.21.0,>=1.20.54->boto3==1.17.54->-r /usr/local/airflow/dags/requirements.txt (line 1)) (1.25.11)
Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.7/site-packages (from python-dateutil<3.0.0,>=2.1->botocore<1.21.0,>=1.20.54->boto3==1.17.54->-r /usr/local/airflow/dags/requirements.txt (line 1)) (1.15.0)
Installing collected packages: boto3
Successfully installed boto3-1.17.54
Custom plugins
aws-mwaa-local-runner/pluginsフォルダに入れる
サンプルDAGファイルで試してみる
元になるDagファイル
これをaws-mwaa-local-runner用のDAGファイルに修正する。AWSサービスのMWAAではMWAA自体にIAMロールがアタッチされるが、aws-mwaa-local-runnerをローカル実行した場合はIAMロールはアタッチ出来ない。そのため、AWSサービスにアクセスするのに、主にアクセスキー周りを追加や修正する。
元になるDAGファイル
import boto3
import airflow
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator
from airflow.utils.task_group import TaskGroup
args = {
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(1),
"provide_context": False,
"email_on_failure": True,
"email_on_retry": True,
"retries":1,
"retry_delay":120
}
s3_bucket_name = "test-airflow"
athena_results = f"s3://{s3_bucket_name}/results/"
output_path = "out0"
input_path = "in0"
athena_drop_output_table_query="DROP TABLE IF EXISTS default.handson_output_parquet"
athena_create_input_table_query=f"CREATE EXTERNAL TABLE IF NOT EXISTS default.handson_input_csv( deviceid string, uuid bigint, appid bigint, country string, year bigint, month bigint, day bigint, hour bigint)ROW FORMAT DELIMITED FIELDS TERMINATED BY ','STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'LOCATION 's3://{s3_bucket_name}/{input_path}/' TBLPROPERTIES ( 'classification'='csv', 'delimiter'=',', 'skip.header.line.count'='1' )"
athena_ctas_new_table_query=f"CREATE TABLE \"default\".\"handson_output_parquet\"WITH (format = 'PARQUET',external_location='s3://{s3_bucket_name}/{output_path}/',parquet_compression = 'SNAPPY')AS SELECT *FROM \"default\".\"handson_input_csv\"WHERE deviceid = 'iphone' OR deviceid = 'android'"
def s3_bucket_cleaning_job():
s3 = boto3.resource('s3')
bucket = s3.Bucket(s3_bucket_name)
bucket.objects.filter(Prefix=output_path).delete()
with DAG(
dag_id="etl_athena_job",
description="etl athena DAG",
default_args=args,
schedule_interval="*/60 * * * *",
catchup=False,
tags=['handson']
) as dag:
with TaskGroup("preprocess") as preprocess:
t1 = AWSAthenaOperator(task_id="athena_drop_output_table",query=athena_drop_output_table_query, database="default", output_location=athena_results)
t2 = PythonOperator(task_id="s3_bucket_cleaning", python_callable=s3_bucket_cleaning_job)
t3 = AWSAthenaOperator(task_id="athena_create_input_table",query=athena_create_input_table_query, database="default", output_location=athena_results)
t4 = AWSAthenaOperator(task_id="athena_ctas_new_table",query=athena_ctas_new_table_query, database="default", output_location=athena_results)
preprocess >> t3 >> t4
DAGファイル修正
PythonOperatorで実行するPythonコードがS3(AWSリソース)にアクセスするため、Sessionをインポートし、アクセスキーIDとシークレットアクセスキーIDを設定する。
修正したらdagsフォルダに入れる。結構すぐ反映される
import boto3
from boto3.session import Session
accesskey_id='XXXXXXXXXXXXXXXXXXXX'
secret_accesskey_id='yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy'
region='ap-northeast-1'
session = Session(aws_access_key_id=accesskey_id, aws_secret_access_key=secret_accesskey_id, region_name=region)
AthenaOperatorなどPythonOperator以外のオペレータに、"aws_conn_id"を設定する
aws_conn_id="aws_default"
修正したDAGファイル
import boto3
from boto3.session import Session
import airflow
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator
from airflow.utils.task_group import TaskGroup
accesskey_id='XXXXXXXXXXXXXXXXXXXX'
secret_accesskey_id='yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy'
region='ap-northeast-1'
args = {
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(1),
"provide_context": False,
"email_on_failure": True,
"email_on_retry": True,
"retries":1,
"retry_delay":120
}
s3_bucket_name = "tmp-airflow"
athena_results = f"s3://{s3_bucket_name}/results/"
output_path = "out0"
input_path = "in0"
athena_drop_output_table_query="DROP TABLE IF EXISTS default.handson_output_parquet"
athena_create_input_table_query=f"CREATE EXTERNAL TABLE IF NOT EXISTS default.handson_input_csv( deviceid string, uuid bigint, appid bigint, country string, year bigint, month bigint, day bigint, hour bigint)ROW FORMAT DELIMITED FIELDS TERMINATED BY ','STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'LOCATION 's3://{s3_bucket_name}/{input_path}/' TBLPROPERTIES ( 'classification'='csv', 'delimiter'=',', 'skip.header.line.count'='1' )"
athena_ctas_new_table_query=f"CREATE TABLE \"default\".\"handson_output_parquet\"WITH (format = 'PARQUET',external_location='s3://{s3_bucket_name}/{output_path}/',parquet_compression = 'SNAPPY')AS SELECT *FROM \"default\".\"handson_input_csv\"WHERE deviceid = 'iphone' OR deviceid = 'android'"
def s3_bucket_cleaning_job():
session = Session(aws_access_key_id=accesskey_id, aws_secret_access_key=secret_accesskey_id, region_name=region)
s3 = session.resource('s3')
bucket = s3.Bucket(s3_bucket_name)
bucket.objects.filter(Prefix=output_path).delete()
with DAG(
dag_id="etl_athena_job",
description="etl athena DAG",
default_args=args,
schedule_interval="*/60 * * * *",
catchup=False,
tags=['handson']
) as dag:
with TaskGroup("preprocess") as preprocess:
t1 = AWSAthenaOperator(task_id="athena_drop_output_table",query=athena_drop_output_table_query, aws_conn_id="aws_default",database="default", output_location=athena_results)
t2 = PythonOperator(task_id="s3_bucket_cleaning", python_callable=s3_bucket_cleaning_job)
t3 = AWSAthenaOperator(task_id="athena_create_input_table",query=athena_create_input_table_query, aws_conn_id="aws_default", database="default", output_location=athena_results)
t4 = AWSAthenaOperator(task_id="athena_ctas_new_table",query=athena_ctas_new_table_query, aws_conn_id="aws_default", database="default", output_location=athena_results)
preprocess >> t3 >> t4
実行結果
ローカルのMWAA Airflow環境からS3やAthenaへのアクセスがあるDAGの実行が正常完了している
カスタムプラグインなどの依存関係テスト
時間が出来たらやります。
メリット
ローカルテストでトライアンドエラー!!!