やりたいこと
Amazon Managed Workflows for Apache Airflow (MWAA) で同一アカウントにあるRedshiftクラスターに対してクエリを実行します。
また、Redshift内のDBユーザーに対しての認証は、ID/PASS認証ではなくIAM認証を使用します。つまり、MWAAの実行ロールに付与されているアクセス許可に基づいて一時的なDB認証を作成し、接続します。この方式だとDBユーザー名のみでパスワードは不要となります。
前置き
利用するOperator
今回は、Operatorとしてapache-airflow-providers-common-sqlのSQLExecuteQueryOperator
を利用します。
以前まではRedshiftに対してクエリを実行する際はRedshiftSQLOperator
を用いていましたが、apache-airflow-providers-amazonの v8.0.0 でSQLExecuteQueryOperator
を利用するように修正されました。
Removed deprecated RedshiftSQLOperator in favor of the generic SQLExecuteQueryOperator. The parameter that was passed as redshift_conn_id needs to be changed to conn_id, and the behavior should stay the same.
apache-airflow-providers-XXXって何?
Airflowは外部サービスとの連携のためにそれぞれ独自のプロパイダーパッケージを開発して提供しています。その中のAWSとの連携のために開発されているパッケージがapache-airflow-providers-amazonであり、DBサービスに対して共通的にSQLを発行するためのパッケージがapache-airflow-providers-common-sqlです。
このプロパイダーパッケージは独立して開発されているため、それぞれのバージョンを持っています。自身でAirflowをホストした場合はそれぞれのプロパイダーパッケージを個別にバージョンアップできますが、MWAAではバージョンが固定されています。
MWAAの公式ドキュメントから一部抜粋すると、Airflowのバージョンとそれに対応するapache-airflow-providers-amazon、apache-airflow-providers-common-sqlは以下の通りです。
- Airflow v2.7.2
- AWS Connection : apache-airflow-providers-amazon[aiobotocore]==8.7.1
- Common SQL : apache-airflow-providers-common-sql==1.7.2
- Airflow v2.6.3
- AWS Connection : apache-airflow-providers-amazon[aiobotocore]==8.2.0
- Common SQL : apache-airflow-providers-common-sql==1.5.2
- Airflow v2.5.1
- AWS Connection : apache-airflow-providers-amazon[aiobotocore]==7.1.0
- Common SQL : apache-airflow-providers-common-sql==1.3.3
これを見ると、Redshiftに対してクエリを実行するOperatorがSQLExecuteQueryOperator
になったのは、apache-airflow-providers-amazonがv8.0.0の時なので、MWAAのAirflowで言うとv2.6.3からとなりますね。
SQLExecuteQueryOperatorの使い方
上記のドキュメントに記載されたParametersを指定します。
特にRedshiftに接続するためには、conn_id
にRedshiftのConnectionを指定する必要があります。その他、sql
に実行するSQLを指定したり、database
でConnectionに記載しているデータベース名をオーバーライトしたりできます。
今回の記事で、実際にこのOperatorを使ってみます。
前提条件
- RedshiftのRA3クラスターをバージニア北部リージョン(us-east-1)に作成済み
- クラスター名:redshift-cluster-demo
- DB名:qs_test_db
- ユーザー名:usera
- パブリックアクセスを有効にしています
- MWAAのv2.7.2の環境を同じくバージニア北部リージョン(us-east-1)に作成済み
やってみる
実行ロールに権限付与
以下の内容のポリシーを作成して、MWAAの実行ロールに追加します(アカウントIDは999999999999に変更してます)。
今回はGetClusterCredentials-usera
という名前のポリシーを作成しました。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "redshift:GetClusterCredentials",
"Resource": [
"arn:aws:redshift:us-east-1:999999999999:dbuser:redshift-cluster-demo/usera",
"arn:aws:redshift:us-east-1:999999999999:dbname:redshift-cluster-demo/qs_test_db"
]
},
{
"Effect": "Allow",
"Action": "redshift:DescribeClusters",
"Resource": "*"
}
]
}
以下のように、実行ロールに追加します。
Redshift Connection作成
Airflow UI上で、Redshiftの接続に使うConnectionを作成します。
上部のAdmin→Connectionsをクリックすると一覧が表示されますが、今回はその中のredshift_default
を以下のように修正していきます。
Extraの中の"iam"をtrueに指定することで、ID/PASS認証ではなくIAM認証を利用します。
{
"iam": true,
"cluster_identifier": "redshift-cluster-demo",
"db_user": "usera"
}
DAGコードをS3に格納
以下の.pyファイルをS3のdagsフォルダにアップロードします。
SQLExecuteQueryOperator
を2回使ってDAGを作成しています。2つ目のSQLで先ほどのConnectionで指定したRedshiftのクラスター、DBに同じくConnectionで指定したDBユーザーでIAM認証してSelect文を実行しています。
このままだとSELECT文を実行するだけで終わってしまうので、実際はSELECTしたデータを他のテーブルにINSERTしたり、新たにテーブルをCREATEしたりするケースが多いと思います。
from airflow import DAG
from airflow.utils.dates import days_ago
from datetime import datetime
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'depends_on_past': False
}
dag = DAG(
'Redshift_Query_IAM',
description='testDAG',
schedule_interval=None,
default_args=default_args,
start_date=datetime(2023, 10, 11),
catchup=False
)
execute_query = SQLExecuteQueryOperator(
task_id="execute_query",
sql=f"SELECT 1;",
split_statements=True,
return_last=False,
dag=dag,
conn_id="redshift_default",
)
execute_query_2 = SQLExecuteQueryOperator(
task_id="execute_query_2",
sql=f"SELECT * FROM schema_a.table_a",
split_statements=True,
return_last=False,
dag=dag,
conn_id ="redshift_default",
)
execute_query >> execute_query_2
上記のDAGは、以下のページを参考にしました。
Redshiftのセキュリティグループ更新
今回は、MWAAのVPC内に作成されている2つのNAT GatewayのグローバルIPを、Redshift側のセキュリティグループで許可します。これでネットワーク的にも接続されますね。
DAG実行
作成されたDAGを実行したところ、以下のように2つのTaskとも成功となりました(色々と実験していたので、何回か失敗の赤い棒グラフが表示されています)。
参考に、2つ目のSELECT文実行したTaskのログは以下のような感じでした。実行したSQL文はログにも出るんですね。
ip-10-192-20-231.ec2.internal
*** Reading remote log from Cloudwatch log_group: airflow-MyAirflowEnvironment-public-272-Task log_stream: dag_id=Redshift_Query_IAM/run_id=manual__2024-02-23T14_27_32.016961+00_00/task_id=execute_query_2/attempt=1.log.
[2024-02-23 14:27:39,775] Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: Redshift_Query_IAM.execute_query_2 manual__2024-02-23T14:27:32.016961+00:00 [queued]>
[2024-02-23 14:27:39,791] Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: Redshift_Query_IAM.execute_query_2 manual__2024-02-23T14:27:32.016961+00:00 [queued]>
[2024-02-23 14:27:39,792] Starting attempt 1 of 1
[2024-02-23 14:27:39,820] Executing <Task(SQLExecuteQueryOperator): execute_query_2> on 2024-02-23 14:27:32.016961+00:00
[2024-02-23 14:27:39,823] Started process 15173 to run task
[2024-02-23 14:27:39,825] Running: ['airflow', 'tasks', 'run', 'Redshift_Query_IAM', 'execute_query_2', 'manual__2024-02-23T14:27:32.016961+00:00', '--job-id', '25', '--raw', '--subdir', 'DAGS_FOLDER/redshift_query_dag_IAM.py', '--cfg-path', '/tmp/tmpdn9jl8e6']
[2024-02-23 14:27:39,826] Job 25: Subtask execute_query_2
[2024-02-23 14:27:40,120] Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='Redshift_Query_IAM' AIRFLOW_CTX_TASK_ID='execute_query_2' AIRFLOW_CTX_EXECUTION_DATE='2024-02-23T14:27:32.016961+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-02-23T14:27:32.016961+00:00'
[2024-02-23 14:27:40,120] Executing: SELECT * FROM schema_a.table_a
[2024-02-23 14:27:40,645] Running statement: SELECT * FROM schema_a.table_a, parameters: None
[2024-02-23 14:27:40,751] Marking task as SUCCESS. dag_id=Redshift_Query_IAM, task_id=execute_query_2, execution_date=20240223T142732, start_date=20240223T142739, end_date=20240223T142740
[2024-02-23 14:27:40,920] Task exited with return code 0
[2024-02-23 14:27:40,945] 0 downstream tasks scheduled from follow-on schedule check
Redshift側の確認
念のため、Redshiftクラスターの「クエリのモニタリング」タブから、クエリ履歴を見てみます。
意図したユーザーで実行されていますね。
注意
どこかのドキュメントに、RedshiftのConnectionのExtraの欄にprofile:default
があったのでそれをそのまま実行していたら、以下のようなエラーが出てしまいました。profileを指定すると、MWAAの実行ロールではなく指定したprofileが利用されるようです(当たり前と言えば当たり前なのですが…)。
redshift_connector.error.InterfaceError: The config profile (default) could not be found
おわりに
今回は、MWAAからRedshiftにIAM認証を使ってクエリを実行してみました。AWS内で完結しているのなら、認証は全てIAMにしたいというユースケースは結構あるのではないでしょうか。
実は別アカウントでも色々試してみて成功したので、次の記事に書ければと思ってます。