2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

MWAAをローカルでテストするaws-mwaa-local-runner

Last updated at Posted at 2021-06-01

aws-mwaa-local-runnerとは

MWAA実行環境のDockerコンテナイメージをローカルでビルドします。これにより、MWAAにデプロイする前に、ローカルのAirflow環境を実行して、DAG、カスタム プラグイン、および依存関係を開発およびテストできます。

ローカルMWAAセットアップ手順

aws-mwaa-local-runnerのREADME
本Qiitaは基本的にこのREADMEと同じ内容です

手元のパソコンにDockerをインストールしておく

手順割愛

任意の場所にgit clone

git clone https://github.com/aws/aws-mwaa-local-runner.git
cd aws-mwaa-local-runner

Docker Image Build

./mwaa-local-env build-image

ローカルAirflow起動

./mwaa-local-env start

Airflow UIログイン

Username: admin
Password: test
http://localhost:8080/

スクリーンショット 0003-06-01 20.42.46.png

AirflowのConnection作成

Airflow UIにログインし、Connectionを作成する。これはAirflow Operatorから接続する際の接続情報の定義。このあとのDAGで使います。

スクリーンショット 0003-06-01 19.21.17.png

aws_defaultがConn idという接続情報のID。これをこのあと使う。

スクリーンショット 0003-06-01 19.21.30.png

  • Conn id:aws_default
  • Conn Type:Amazon Web Services
  • Login:アクセスキーID
  • Password:シークレットアクセスキーID
  • Extra:リージョン↓
{
  "region_name": "ap-northeast-1"
}

スクリーンショット 0003-06-01 19.22.22.png

ローカルMWAA操作手順

DAG追加は

Cloneしたフォルダのdagsフォルダに入れる

pwd,ls
$ pwd 
/xxx/aws-mwaa-local-runner
aws-mwaa-local-runner $ ls
CODE_OF_CONDUCT.md	VERSION			mwaa-local-env
CONTRIBUTING.md		dags			plugins
LICENSE			db-data
README.md		docker

Requirements.txtの追加は

Cloneしたフォルダの中のdagsフォルダの直下
dags/requirements.txt

mwaa-local-envを使うと、Airflowを実行せずにrequirements.txtを試すことが出来る。

./mwaa-local-env test-requirements

aws-mwaa-local-runner $ pwd
/xxx/mwaa/aws-mwaa-local-runner
aws-mwaa-local-runner $ ls
CODE_OF_CONDUCT.md	README.md		db-data			plugins
CONTRIBUTING.md		VERSION			docker			test-requirements
LICENSE			dags			mwaa-local-env
aws-mwaa-local-runner $ cat test-requirements 
boto3==1.17.54
aws-mwaa-local-runner $ ./mwaa-local-env test-requirements
Container amazon/mwaa-local:2.0 exists. Skipping build
Installing requirements.txt
Collecting boto3==1.17.54
  Downloading boto3-1.17.54-py2.py3-none-any.whl (131 kB)
     |████████████████████████████████| 131 kB 3.4 MB/s 
Requirement already satisfied: jmespath<1.0.0,>=0.7.1 in /usr/local/lib/python3.7/site-packages (from boto3==1.17.54->-r /usr/local/airflow/dags/requirements.txt (line 1)) (0.10.0)
Requirement already satisfied: s3transfer<0.5.0,>=0.4.0 in /usr/local/lib/python3.7/site-packages (from boto3==1.17.54->-r /usr/local/airflow/dags/requirements.txt (line 1)) (0.4.2)
Requirement already satisfied: botocore<1.21.0,>=1.20.54 in /usr/local/lib/python3.7/site-packages (from boto3==1.17.54->-r /usr/local/airflow/dags/requirements.txt (line 1)) (1.20.84)
Requirement already satisfied: python-dateutil<3.0.0,>=2.1 in /usr/local/lib/python3.7/site-packages (from botocore<1.21.0,>=1.20.54->boto3==1.17.54->-r /usr/local/airflow/dags/requirements.txt (line 1)) (2.8.1)
Requirement already satisfied: urllib3<1.27,>=1.25.4 in /usr/local/lib/python3.7/site-packages (from botocore<1.21.0,>=1.20.54->boto3==1.17.54->-r /usr/local/airflow/dags/requirements.txt (line 1)) (1.25.11)
Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.7/site-packages (from python-dateutil<3.0.0,>=2.1->botocore<1.21.0,>=1.20.54->boto3==1.17.54->-r /usr/local/airflow/dags/requirements.txt (line 1)) (1.15.0)
Installing collected packages: boto3
Successfully installed boto3-1.17.54

Custom plugins

aws-mwaa-local-runner/pluginsフォルダに入れる

サンプルDAGファイルで試してみる

元になるDagファイル

これをaws-mwaa-local-runner用のDAGファイルに修正する。AWSサービスのMWAAではMWAA自体にIAMロールがアタッチされるが、aws-mwaa-local-runnerをローカル実行した場合はIAMロールはアタッチ出来ない。そのため、AWSサービスにアクセスするのに、主にアクセスキー周りを追加や修正する。

元になるDAGファイル

OriginalDag
import boto3
import airflow
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator
from airflow.utils.task_group import TaskGroup

args = {
    "owner": "airflow",
    "start_date": airflow.utils.dates.days_ago(1),
    "provide_context": False,
    "email_on_failure": True,
    "email_on_retry": True,
    "retries":1,
    "retry_delay":120
    }

s3_bucket_name = "test-airflow"

athena_results = f"s3://{s3_bucket_name}/results/"
output_path = "out0"
input_path = "in0"

athena_drop_output_table_query="DROP TABLE IF EXISTS default.handson_output_parquet"

athena_create_input_table_query=f"CREATE EXTERNAL TABLE IF NOT EXISTS default.handson_input_csv(  deviceid string,  uuid bigint,  appid bigint,  country string,  year bigint,  month bigint,  day bigint,  hour bigint)ROW FORMAT DELIMITED  FIELDS TERMINATED BY ','STORED AS INPUTFORMAT  'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'LOCATION  's3://{s3_bucket_name}/{input_path}/' TBLPROPERTIES (  'classification'='csv',  'delimiter'=',',  'skip.header.line.count'='1' )"

athena_ctas_new_table_query=f"CREATE TABLE \"default\".\"handson_output_parquet\"WITH (format = 'PARQUET',external_location='s3://{s3_bucket_name}/{output_path}/',parquet_compression = 'SNAPPY')AS SELECT *FROM \"default\".\"handson_input_csv\"WHERE deviceid = 'iphone' OR deviceid = 'android'"

def s3_bucket_cleaning_job():
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(s3_bucket_name)
    bucket.objects.filter(Prefix=output_path).delete()

with DAG(
    dag_id="etl_athena_job",
    description="etl athena DAG",
    default_args=args,
    schedule_interval="*/60 * * * *",
    catchup=False,
    tags=['handson']
) as dag:
    with TaskGroup("preprocess") as preprocess:
        t1 = AWSAthenaOperator(task_id="athena_drop_output_table",query=athena_drop_output_table_query, database="default", output_location=athena_results)
        t2 = PythonOperator(task_id="s3_bucket_cleaning", python_callable=s3_bucket_cleaning_job)
    t3 = AWSAthenaOperator(task_id="athena_create_input_table",query=athena_create_input_table_query, database="default", output_location=athena_results)
    t4 = AWSAthenaOperator(task_id="athena_ctas_new_table",query=athena_ctas_new_table_query, database="default", output_location=athena_results)

    preprocess >> t3 >> t4

DAGファイル修正

PythonOperatorで実行するPythonコードがS3(AWSリソース)にアクセスするため、Sessionをインポートし、アクセスキーIDとシークレットアクセスキーIDを設定する。
修正したらdagsフォルダに入れる。結構すぐ反映される

import boto3
from boto3.session import Session
accesskey_id='XXXXXXXXXXXXXXXXXXXX'
secret_accesskey_id='yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy'
region='ap-northeast-1'
session = Session(aws_access_key_id=accesskey_id, aws_secret_access_key=secret_accesskey_id, region_name=region)

AthenaOperatorなどPythonOperator以外のオペレータに、"aws_conn_id"を設定する

aws_conn_id="aws_default"

修正したDAGファイル

ModifyDag
import boto3
from boto3.session import Session
import airflow
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator
from airflow.utils.task_group import TaskGroup

accesskey_id='XXXXXXXXXXXXXXXXXXXX'
secret_accesskey_id='yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy'
region='ap-northeast-1'

args = {
    "owner": "airflow",
    "start_date": airflow.utils.dates.days_ago(1),
    "provide_context": False,
    "email_on_failure": True,
    "email_on_retry": True,
    "retries":1,
    "retry_delay":120
    }

s3_bucket_name = "tmp-airflow"

athena_results = f"s3://{s3_bucket_name}/results/"
output_path = "out0"
input_path = "in0"

athena_drop_output_table_query="DROP TABLE IF EXISTS default.handson_output_parquet"

athena_create_input_table_query=f"CREATE EXTERNAL TABLE IF NOT EXISTS default.handson_input_csv(  deviceid string,  uuid bigint,  appid bigint,  country string,  year bigint,  month bigint,  day bigint,  hour bigint)ROW FORMAT DELIMITED  FIELDS TERMINATED BY ','STORED AS INPUTFORMAT  'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'LOCATION  's3://{s3_bucket_name}/{input_path}/' TBLPROPERTIES (  'classification'='csv',  'delimiter'=',',  'skip.header.line.count'='1' )"

athena_ctas_new_table_query=f"CREATE TABLE \"default\".\"handson_output_parquet\"WITH (format = 'PARQUET',external_location='s3://{s3_bucket_name}/{output_path}/',parquet_compression = 'SNAPPY')AS SELECT *FROM \"default\".\"handson_input_csv\"WHERE deviceid = 'iphone' OR deviceid = 'android'"

def s3_bucket_cleaning_job():
    session = Session(aws_access_key_id=accesskey_id, aws_secret_access_key=secret_accesskey_id, region_name=region)
    s3 = session.resource('s3')
    bucket = s3.Bucket(s3_bucket_name)
    bucket.objects.filter(Prefix=output_path).delete()

with DAG(
    dag_id="etl_athena_job",
    description="etl athena DAG",
    default_args=args,
    schedule_interval="*/60 * * * *",
    catchup=False,
    tags=['handson']
) as dag:
    with TaskGroup("preprocess") as preprocess:
        t1 = AWSAthenaOperator(task_id="athena_drop_output_table",query=athena_drop_output_table_query, aws_conn_id="aws_default",database="default", output_location=athena_results)
        t2 = PythonOperator(task_id="s3_bucket_cleaning", python_callable=s3_bucket_cleaning_job)
    t3 = AWSAthenaOperator(task_id="athena_create_input_table",query=athena_create_input_table_query, aws_conn_id="aws_default", database="default", output_location=athena_results)
    t4 = AWSAthenaOperator(task_id="athena_ctas_new_table",query=athena_ctas_new_table_query, aws_conn_id="aws_default", database="default", output_location=athena_results)

    preprocess >> t3 >> t4

実行結果

ローカルのMWAA Airflow環境からS3やAthenaへのアクセスがあるDAGの実行が正常完了している

スクリーンショット 0003-06-01 20.32.01.png

カスタムプラグインなどの依存関係テスト

時間が出来たらやります。

メリット

ローカルテストでトライアンドエラー!!!

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?