LoginSignup
0
0

MWAAでRedshiftにIAM認証で接続してクエリを実行する(同一アカウント)

Last updated at Posted at 2024-02-23

やりたいこと

Amazon Managed Workflows for Apache Airflow (MWAA) で同一アカウントにあるRedshiftクラスターに対してクエリを実行します。
また、Redshift内のDBユーザーに対しての認証は、ID/PASS認証ではなくIAM認証を使用します。つまり、MWAAの実行ロールに付与されているアクセス許可に基づいて一時的なDB認証を作成し、接続します。この方式だとDBユーザー名のみでパスワードは不要となります。

前置き

利用するOperator

今回は、Operatorとしてapache-airflow-providers-common-sqlSQLExecuteQueryOperatorを利用します。

以前まではRedshiftに対してクエリを実行する際はRedshiftSQLOperatorを用いていましたが、apache-airflow-providers-amazonの v8.0.0 でSQLExecuteQueryOperatorを利用するように修正されました。

Removed deprecated RedshiftSQLOperator in favor of the generic SQLExecuteQueryOperator. The parameter that was passed as redshift_conn_id needs to be changed to conn_id, and the behavior should stay the same.

apache-airflow-providers-XXXって何?

Airflowは外部サービスとの連携のためにそれぞれ独自のプロパイダーパッケージを開発して提供しています。その中のAWSとの連携のために開発されているパッケージがapache-airflow-providers-amazonであり、DBサービスに対して共通的にSQLを発行するためのパッケージがapache-airflow-providers-common-sqlです。

このプロパイダーパッケージは独立して開発されているため、それぞれのバージョンを持っています。自身でAirflowをホストした場合はそれぞれのプロパイダーパッケージを個別にバージョンアップできますが、MWAAではバージョンが固定されています。
MWAAの公式ドキュメントから一部抜粋すると、Airflowのバージョンとそれに対応するapache-airflow-providers-amazonapache-airflow-providers-common-sqlは以下の通りです。

  • Airflow v2.7.2
    • AWS Connection : apache-airflow-providers-amazon[aiobotocore]==8.7.1
    • Common SQL : apache-airflow-providers-common-sql==1.7.2
  • Airflow v2.6.3
    • AWS Connection : apache-airflow-providers-amazon[aiobotocore]==8.2.0
    • Common SQL : apache-airflow-providers-common-sql==1.5.2
  • Airflow v2.5.1
    • AWS Connection : apache-airflow-providers-amazon[aiobotocore]==7.1.0
    • Common SQL : apache-airflow-providers-common-sql==1.3.3

これを見ると、Redshiftに対してクエリを実行するOperatorがSQLExecuteQueryOperatorになったのは、apache-airflow-providers-amazonがv8.0.0の時なので、MWAAのAirflowで言うとv2.6.3からとなりますね。

SQLExecuteQueryOperatorの使い方

上記のドキュメントに記載されたParametersを指定します。
特にRedshiftに接続するためには、conn_idにRedshiftのConnectionを指定する必要があります。その他、sqlに実行するSQLを指定したり、databaseでConnectionに記載しているデータベース名をオーバーライトしたりできます。
今回の記事で、実際にこのOperatorを使ってみます。

前提条件

  • RedshiftのRA3クラスターをバージニア北部リージョン(us-east-1)に作成済み
    • クラスター名:redshift-cluster-demo
    • DB名:qs_test_db
    • ユーザー名:usera
    • パブリックアクセスを有効にしています
  • MWAAのv2.7.2の環境を同じくバージニア北部リージョン(us-east-1)に作成済み

やってみる

実行ロールに権限付与

以下の内容のポリシーを作成して、MWAAの実行ロールに追加します(アカウントIDは999999999999に変更してます)。
今回はGetClusterCredentials-useraという名前のポリシーを作成しました。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "redshift:GetClusterCredentials",
            "Resource": [
                "arn:aws:redshift:us-east-1:999999999999:dbuser:redshift-cluster-demo/usera",
                "arn:aws:redshift:us-east-1:999999999999:dbname:redshift-cluster-demo/qs_test_db"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "redshift:DescribeClusters",
            "Resource": "*"
        }
    ]
}

以下のように、実行ロールに追加します。

image.png

Redshift Connection作成

Airflow UI上で、Redshiftの接続に使うConnectionを作成します。
上部のAdmin→Connectionsをクリックすると一覧が表示されますが、今回はその中のredshift_defaultを以下のように修正していきます。

image.png

Extraの中の"iam"をtrueに指定することで、ID/PASS認証ではなくIAM認証を利用します。

{
  "iam": true,
  "cluster_identifier": "redshift-cluster-demo",
  "db_user": "usera"
}

DAGコードをS3に格納

以下の.pyファイルをS3のdagsフォルダにアップロードします。
SQLExecuteQueryOperatorを2回使ってDAGを作成しています。2つ目のSQLで先ほどのConnectionで指定したRedshiftのクラスター、DBに同じくConnectionで指定したDBユーザーでIAM認証してSelect文を実行しています。

このままだとSELECT文を実行するだけで終わってしまうので、実際はSELECTしたデータを他のテーブルにINSERTしたり、新たにテーブルをCREATEしたりするケースが多いと思います。

from airflow import DAG
from airflow.utils.dates import days_ago

from datetime import datetime

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'depends_on_past': False
}

dag = DAG(
    'Redshift_Query_IAM',
    description='testDAG',
    schedule_interval=None,
    default_args=default_args,
    start_date=datetime(2023, 10, 11),
    catchup=False
)

execute_query = SQLExecuteQueryOperator(
task_id="execute_query",
sql=f"SELECT 1;",
split_statements=True,
return_last=False,
dag=dag,
conn_id="redshift_default",
)

execute_query_2 = SQLExecuteQueryOperator(
task_id="execute_query_2",
sql=f"SELECT * FROM schema_a.table_a",
split_statements=True,
return_last=False,
dag=dag,
conn_id ="redshift_default",
)

execute_query >> execute_query_2

上記のDAGは、以下のページを参考にしました。

Redshiftのセキュリティグループ更新

今回は、MWAAのVPC内に作成されている2つのNAT GatewayのグローバルIPを、Redshift側のセキュリティグループで許可します。これでネットワーク的にも接続されますね。

image.png

DAG実行

作成されたDAGを実行したところ、以下のように2つのTaskとも成功となりました(色々と実験していたので、何回か失敗の赤い棒グラフが表示されています)。

image.png

参考に、2つ目のSELECT文実行したTaskのログは以下のような感じでした。実行したSQL文はログにも出るんですね。

ip-10-192-20-231.ec2.internal
*** Reading remote log from Cloudwatch log_group: airflow-MyAirflowEnvironment-public-272-Task log_stream: dag_id=Redshift_Query_IAM/run_id=manual__2024-02-23T14_27_32.016961+00_00/task_id=execute_query_2/attempt=1.log.
[2024-02-23 14:27:39,775] Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: Redshift_Query_IAM.execute_query_2 manual__2024-02-23T14:27:32.016961+00:00 [queued]>
[2024-02-23 14:27:39,791] Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: Redshift_Query_IAM.execute_query_2 manual__2024-02-23T14:27:32.016961+00:00 [queued]>
[2024-02-23 14:27:39,792] Starting attempt 1 of 1
[2024-02-23 14:27:39,820] Executing <Task(SQLExecuteQueryOperator): execute_query_2> on 2024-02-23 14:27:32.016961+00:00
[2024-02-23 14:27:39,823] Started process 15173 to run task
[2024-02-23 14:27:39,825] Running: ['airflow', 'tasks', 'run', 'Redshift_Query_IAM', 'execute_query_2', 'manual__2024-02-23T14:27:32.016961+00:00', '--job-id', '25', '--raw', '--subdir', 'DAGS_FOLDER/redshift_query_dag_IAM.py', '--cfg-path', '/tmp/tmpdn9jl8e6']
[2024-02-23 14:27:39,826] Job 25: Subtask execute_query_2
[2024-02-23 14:27:40,120] Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='Redshift_Query_IAM' AIRFLOW_CTX_TASK_ID='execute_query_2' AIRFLOW_CTX_EXECUTION_DATE='2024-02-23T14:27:32.016961+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-02-23T14:27:32.016961+00:00'
[2024-02-23 14:27:40,120] Executing: SELECT * FROM schema_a.table_a
[2024-02-23 14:27:40,645] Running statement: SELECT * FROM schema_a.table_a, parameters: None
[2024-02-23 14:27:40,751] Marking task as SUCCESS. dag_id=Redshift_Query_IAM, task_id=execute_query_2, execution_date=20240223T142732, start_date=20240223T142739, end_date=20240223T142740
[2024-02-23 14:27:40,920] Task exited with return code 0
[2024-02-23 14:27:40,945] 0 downstream tasks scheduled from follow-on schedule check

Redshift側の確認

念のため、Redshiftクラスターの「クエリのモニタリング」タブから、クエリ履歴を見てみます。

image.png

意図したユーザーで実行されていますね。

注意

どこかのドキュメントに、RedshiftのConnectionのExtraの欄にprofile:defaultがあったのでそれをそのまま実行していたら、以下のようなエラーが出てしまいました。profileを指定すると、MWAAの実行ロールではなく指定したprofileが利用されるようです(当たり前と言えば当たり前なのですが…)。

redshift_connector.error.InterfaceError: The config profile (default) could not be found

おわりに

今回は、MWAAからRedshiftにIAM認証を使ってクエリを実行してみました。AWS内で完結しているのなら、認証は全てIAMにしたいというユースケースは結構あるのではないでしょうか。
実は別アカウントでも色々試してみて成功したので、次の記事に書ければと思ってます。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0