4
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Terraform から入門する Apache Airflow (Cloud Composer)【Python】

Last updated at Posted at 2020-09-02

はじめに

この記事では、BigQuery 上での分析を定期実行するワークフローを Apache Airflow を用いて作成します。

<モチベーション>
業務を開始する頃には既に分析を完了しているシステムがあれば時間的コストを削減できそうです。
マネージドサービスを用いれば互換性のないアップデートに対応するコストを削減することもできます。
メリットはありますが BigQuery のワークフローのために GKE 上で Cloud Composer を動かすことは避けたい思いもあります。
(Cloud Composer の料金の詳細)

Terraform

Terraform は HashiCorp により提供されているインフラストラクチャの構築管理を行うツールです。 インフラストラクチャ の設定を HCL(HashiCorp Configuration Language : DSL)を使用してコード化することによりマルチクラウド環境で同じコードを利用することができます。 このセクションでは、 Terraform を用いて Cloud Composer 環境を立ち上げる方法を示します。

<サービスアカウントの作成>
GCP のコンソールから Cloud Shell を開きサービスアカウントを作成します。
Terrafrom から Cloud Composer を立ち上げるために 「編集」と「Composer ワーカー」のロールをサービスアカウントに追加します。

$ gcloud version
$ export GCP_PROJECT_ID=${<project id>}
$ gcloud iam service-accounts create terraform
$ gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID} --member serviceAccount:terraform@${GCP_PROJECT_ID}.iam.gserviceaccount.com --role roles/editor
$ gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID} --member serviceAccount:terraform@${GCP_PROJECT_ID}.iam.gserviceaccount.com --role roles/composer.worker

<インスタンスの立ち上げ>
Compute Engine 上で VM インスタンス を立ち上げます。
その際に先ほど作成したサービスアカウントを選択します。
作成したインスタンスに SSH 接続しそのコマンドライン上で Terraform を用いた Cloud Composer の立ち上げを行います。

<Python 3 環境の構築>
(Python 3 環境で作業する習慣があるため Pythron 3 の環境を構築します。)
デフォルトでは Python 2 系 なので Python 3 系 の環境を構築します。
具体的には airflow というディレクトリを作りそのディレクトリに Python 3 環境を構築します。

$ sudo apt-get update
$ sudo apt-get install -y make build-essential libssl-dev zlib1g-dev \ 
libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev \ 
libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev python-openssl git

$ git clone https://github.com/yyuu/pyenv.git ~/.pyenv

$ echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc
$ echo 'export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc
$ echo -e 'if command -v pyenv 1>/dev/null 2>&1; then\n  eval "$(pyenv init -)"\nfi' >> ~/bashrc
$ source ~/.bashrc

$ pyenv install 3.7.4

$ mkdir airflow
$ cd airflow
$ echo 'export PATH="$HOME/.pyenv/bin:$PATH"' >> ~/.bashrc
$ echo 'eval "$(pyenv init -)"' >> ~/.bashrc
$ source ~/.bashrc

$ pyenv local 3.7.4
$ python -V

<サービスアカウントの鍵の作成>
後の過程で使用するため ~/.config/gcloud の直下に airflow-te-terraform-credential.json を作成します。

$ gcloud iam service-accounts keys create ~/.config/gcloud/airflow-te-terraform-credential.json --iam-account terraform@${GCP_PROJECT_ID}.iam.gserviceaccount.com

<Terraform の導入>
wget を用いてダウンロードした Terraform のファイル(Linux-64bit)を解凍し /usr/local/bin の直下に配置します。
ダウンロードを行う最新のバージョンは公式のダウンロードページをご参照ください。

$ sudo apt-get update
$ sudo apt install unzip
$ wget https://releases.hashicorp.com/terraform/0.13.1/terraform_0.13.1_linux_amd64.zip
$ sudo unzip terraform_0.13.1_linux_amd64.zip -d /usr/local/bin/
$ terrraform version

<Cloud Composer 環境の立ち上げ>
立ち上げる Cloud Composer の環境は GKE により管理されますが Cloud Composer 以外のリソースは GKE により管理されないことに注意が必要です。
Cloud Composer 環境の設定は HCL(HashiCorp Configuration Language : DSL)を用いて tf ファイルに記述します。

$ vi composer-te.tf

tf ファイルの書き方については公式ドキュメントをご参照ください。
今回はコードをまとめるためにサービスアカウントの鍵のパスを credentials に指定していますがこの方法は推奨されていません。
export により鍵のパスの変数を作りその変数を credentials に指定する方法を公式も推奨しています。

provider "google" {
  credentials = "${file("~/.config/gcloud/airflow-te-terraform-credential.json")}"
  project     = "<project id>"
  region      = "<region>"
}

resource "google_composer_environment" "test-environment" {
  name    = "<arbitary composer name>"
  project = "<project id>"
  region  = "<arbitary region>"

  config {
    node_count = 4

    node_config {
      zone            = "<arbitary zone>"
      machine_type    = "<arbitary machine_type>"
      disk_size_gb    = 30 # default: 100 GB
      service_account = "terraform@${GCP_PROJECT_ID}.iam.gserviceaccount.com"
    }

    software_config {
      
      pypi_packages = {
        pandas = ""
        numpy  = ""
      }

      env_variables = {
        FOO = "bar"
      }
      image_version = "composer-1.11.3-airflow-1.10.6"
      python_version = 3
    }
  }
}
command:
terraform fmt 作成した tf ファイル のインデントを調整
terraform validate HCL の構文をチェック
terraform plan クラウドの API にアクセスし tf ファイルをシステムに影響を与えることなく実行。表示されるクラウド上のリソース情報を元に tf ファイルを apply により実行するかを判断します。
terraform init Terrafrom の設定ファイル (tf ファイル)を含むワークディレクトリを初期化
terraform apply Cloud Composer 環境の立ち上げ
$ terraform
$ terraform validate
$ terraform plan
$ terraform init
$ terraform apply

GCP の Composer を確認すると環境ができているはずです。
これで Airflow を実行する環境が整いました。

Apache Airflow

Apache Airflow は複数のタスクの実行順序や実行のスケジューリングを管理することができるツールです。 各タスクの依存関係は Python により記述され、ワークフローでの各タスクの依存関係と実行状況を有向非巡回グラフとして Web UI からモニタリングすることができます。 Cloud Composer 環境が立ち上がるのと同時に Cloud Storage にバケットが作成されます。 そのバケットの dags フォルダに ワークフロー(DAG)の設定を記述した Python ファイルをアップデートすることでワークフローを実行します。

<必要なライブラリの読み込み>

import re
import json
import requests
from datetime import datetime, timedelta
import pendulum
import codecs
import pandas_gbq
from airflow.models import Variable
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator
from airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator

<Slack への通知>
タスクの 成功/失敗 を Slack に通知する関数を作ります。
Slack に通知する方法はいくつかありますが以下では Incoming Webhook を用いた方法を採用しています。

Slack への通知方法に関してはこちらの記事(AirflowのDAGの実行状況をSlackに通知して監視する)にシンプルかつ分かりやすいコードが記述されていたので拝借しました。
Incoming Webhook の機能についての記事(slackのIncoming webhookが新しくなっていたのでまとめてみた)も参考にしています。

# webhook URLやチャンネル名などを指定
webhook_url = '<webhook url>' # Variable.get('slack_webhook_url')
webhook_name = '<slack webhook name>' # webhook_name = Variable.get('slack_webhook_name')
channel_name = '<channel name>' # channel_name = Variable.get('slack_channel_name')

# タスク失敗を通知する関数
def failured(status):
    dag_name = re.findall(r'.*\:\s(.*)\>', str(status['dag']))[0]
    task_name = re.findall(r'.*\:\s(.*)\>', str(status['task']))[0]
    data = {
            'username': webhook_name,
            'channel': channel_name,
            'attachments': [{
                'fallback': f'{dag_name}:{task_name}',
                'color': '#e53935',
                'title': f'{dag_name}:{task_name}',
                'text': f'{task_name} was failed...'
                }]
            }
    requests.post(webhook_url, json.dumps(data))

# タスク成功を通知する関数
def successed(status):
    dag_name = re.findall(r'.*\:\s(.*)\>', str(status['dag']))[0]
    data = {
            'username': webhook_name,
            'channel': channel_name,
            'attachments': [{
                'fallback': dag_name,
                'color': '#1e88e5',
                'title': dag_name,
                'text': f'{dag_name} was successed!'
                }]
            }
    requests.post(webhook_url, json.dumps(data))

webhook_url, webhook_name, channel_name それぞれの変数を作り Variable.get('') を用いて変数を読み込む方法もあります。
Terraform を用いて Cloud Composer 環境を立ち上げる際には以下のようにして Airflow に用いる変数を設定できます。

resource {
  config {
    software_config {
      env_variables = {
        SLACK_WEBHOOK_URL = "<webhook url>"
        SLACK_WEBHOOK_NAME = "<slack webhook name>"
        CHANNEL_NAME = "<channel name>"
      }
    }
  }
}

<SQL ファイルのインポート>
SQL 構文を DAG の設定ファイルに記述するとクエリやワークフローが複雑になった場合に DAG の設定ファイルのコードが SQL 構文のために長くなります。
回避するために SQL 構文を別のファイルに記述し Cloud Composer 環境にインポートします。
Cloud Composer 環境にインポートした SQL ファイルを DAG の設定ファイルに読み込みます。

$ mkdir sql
$ cd sql
$ vi airflow-te.sql
with with_1 as (
select
count(distinct purchase_id) as Z_count
from `<project id>.<dataset>.<table>`
), 
with_2 as (
select 
a.purchase_id, 
a.product_id, 
count(*) over(partition by a.product_id) as product_count, 
b.Z_count
from `<project id>.<dataset>.<table>` as a cross join (select Z_count from  with_1) as b
), 
with_3 as ( 
select 
a.product_id as A, 
b.product_id as B, 
a.product_count as A_count, 
b.product_count as B_count, 
count(*) as A_B_count, 
a.Z_count 
from with_2 as a 
inner join with_2 as b on a.purchase_id = b.purchase_id 
where A <> B 
group by A, B, A_count, B_count, Z_count
)
select 
c.A, 
c.B,
round(100.0 * (c.A_B_count / c.Z_count) , 2) as support,
round(100.0 * (c.A_B_count / c.A_count) , 2) as confidence,
round(100.0 * ((c.A_B_count / c.A_count) / (c.B_count / c.Z_count)) , 2) as lift
from with_3 as c
order by A, B
;

以下の gcloud コマンドで Cloud Composer にインポートされた フィルは /home/airflow/gcs/plugins に配置されます。

$ cd ..
$ gcloud composer environments storage plugins import --project <project id> --environment <environment name> --location <region in which to create the environment> --source sql

Reference:
gcloud composer environments storage plugins import
Airflow コマンドライン インターフェース
Airflow 構成をオーバーライドする

<ワークフロー(DAG)の設定>
以下の 3 ステップを順に実行するワークフローを作成します。

  1. 前回の分析結果を持つテーブルを削除。
  2. 削除したテーブルと同じスキーマを持つ空のテーブルを作成。
  3. 作成した空のテーブルにクエリの結果を送る。

BigQuery を操作するためには Google BigQuery operators を使用します。

Operator:
BigQueryTableDeleteOperator テーブルを削除
BigQueryCreateEmptyTableOperator 空のテーブルを作成
BigQueryOperator クエリの結果を INSERT
default_args = {
    'owner': 'airflow', 
    'depends_on_past': True, 
    'start_date': datetime(2020, 8, 31, tzinfo = pendulum.timezone('Asia/Tokyo')) - timedelta(days = 2),  # 2 日前のワークフローから実行
    'retries': 5, 
    'retry_delay': timedelta(minutes = 5), 
    'on_failure_callback': failured # 1 つでもタスクが失敗した場合に通知

GCP_PROJECT_ID = '<project id>'
DATASET_ID = '<arbitary dataset name>'
table_name = '<arbitary table name>'

schedule_interval = '0 9 * * *' # ワークフローを 9:00 に実行

with codecs.open('/home/airflow/gcs/plugins/sql/s-c-l.sql', 'r', 'utf-8') as q:
    query = q.read()
    
with DAG(dag_id = 'airflow-te', 
         default_args = default_args, 
         schedule_interval = schedule_interval
        ) as dag:
    
    delete_table = BigQueryTableDeleteOperator(
        task_id = 'delete_existing_table',
        deletion_dataset_table = '{}.{}.{}'.format(GCP_PROJECT_ID, DATASET_ID, table_name),
        dag = dag
    )
    
    create_empty_table = BigQueryCreateEmptyTableOperator(
        task_id = 'create_empty_table',
        project_id = GCP_PROJECT_ID,
        dataset_id = DATASET_ID,
        table_id = table_name,
        schema_fields =  [{"name": "A", "type": "STRING", "mode": "NULLABLE"},
                          {"name": "B", "type": "STRING", "mode": "NULLABLE"},
                          {"name": "support", "type": "FLOAT", "mode": "NULLABLE"},
                          {"name": "confidence", "type": "FLOAT", "mode": "NULLABLE"}, 
                          {"name": "lift", "type": "FLOAT", "mode": "NULLABLE" }], # '/schema/{}-schema.json'.format(table_name),
        dag = dag
    )
    
    write_to_bq = BigQueryOperator(
        task_id = 'write_to_bq', 
        use_legacy_sql = False, 
        write_disposition = 'WRITE_TRUNCATE', 
        allow_large_results = True, 
        sql = query, 
        destination_dataset_table = '{}.{}.{}'.format(GCP_PROJECT_ID, DATASET_ID, table_name),
        dag = dag, 
        on_success_callback = successed # すべてのタスクが成功した場合に通知
    )
    
    delete_table >> create_empty_table >> write_to_bq
    
    # delete_table.set_upstream(create_empty_table)
    # create_empty_table.set_upstream(write_to_bq)
argument:
ower ワークフロー(DAG) のタスクのオーナー
depends_on_past それぞれのタスクは前回のワークフローの同じタスクの 成功/失敗 に依存する
wait_for_downstream それぞれのタスクは前回のワークフローの同じタスクの下流にあるすべてのタスクの 成功/失敗 に依存する
start_date ワークフローの開始日
retries タスクが失敗した場合のリトライ数
retry_delay タスクが失敗した場合にリトライを行う時間の間隔
on_failure_callback タスクが失敗した場合の Slack 通知
on_success_callback タスクが成功した場合の Slack 通知
dag_id ワークフロー(DAG) の一意の名前
schedule_interval ワークフローを実行する時間や曜日
task_id タスクの名前
deletion_dataset_table 削除テーブル
use_legacy_sql 標準とレガシーどちらの SQL を使用するのか
write_disposition 'WRITE_TRUNCATE' を設定した場合は書き込みの際のバッファを保持
allow_large_results SQL クエリで大きいサイズの宛先テーブルが有効
destination_dataset_table 宛先テーブル
A >> B タスク A の後に タスク B を実行
A.set_upstream(B) タスク A の後に タスク B を実行

Reference: airflow.contrib.operators.bigquery_operator

<ワークフロー(DAG)の実行>
DAG の設定を記述した Python ファイルを dags フォルダにアップロードすることでワークフローを実行します。
ワークフロー(DAG)が最初に実行される時刻は start_date ではなく start_date + schedule_interval であることにも注意が必要です。 Reference: Airflow の流れを制す

以下のコードでは Timezone に明示的に JST を指定していますが Cloud Shell や Web UI から core に "default_timezone = Asia/Tokyo" を設定できます。

import re
import json
import requests
from datetime import datetime, timedelta
import pendulum
import codecs
import pandas_gbq
from airflow.models import Variable
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator
from airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator

webhook_url = '<webhook url>'
webhook_name = '<slack webhook name>'
channel_name = '<channel name>'

def failured(status):
    dag_name = re.findall(r'.*\:\s(.*)\>', str(status['dag']))[0]
    task_name = re.findall(r'.*\:\s(.*)\>', str(status['task']))[0]
    data = {
            'username': webhook_name,
            'channel': channel_name,
            'attachments': [{
                'fallback': f'{dag_name}:{task_name}',
                'color': '#e53935',
                'title': f'{dag_name}:{task_name}',
                'text': f'{task_name} was failed...'
                }]
            }
    requests.post(webhook_url, json.dumps(data))

def successed(status):
    dag_name = re.findall(r'.*\:\s(.*)\>', str(status['dag']))[0]
    data = {
            'username': webhook_name,
            'channel': channel_name,
            'attachments': [{
                'fallback': dag_name,
                'color': '#1e88e5',
                'title': dag_name,
                'text': f'{dag_name} was successed!'
                }]
            }
    requests.post(webhook_url, json.dumps(data))

default_args = {
    'owner': 'airflow', 
    'depends_on_past': True, 
    'start_date': datetime(2020, 8, 31, tzinfo = pendulum.timezone('Asia/Tokyo')) - timedelta(days = 2),
    'retries': 5, 
    'retry_delay': timedelta(minutes = 5), 
    'on_failure_callback': failured

GCP_PROJECT_ID = '<project id>'
DATASET_ID = '<arbitary dataset name>'
table_name = '<arbitary table name>'

schedule_interval = '0 9 * * *'

with codecs.open('/home/airflow/gcs/plugins/sql/s-c-l.sql', 'r', 'utf-8') as q:
    query = q.read()
    
with DAG(dag_id = 'airflow-te', 
         default_args = default_args, 
         schedule_interval = schedule_interval
        ) as dag:
    
    delete_table = BigQueryTableDeleteOperator(
        task_id = 'delete_existing_table',
        deletion_dataset_table = '{}.{}.{}'.format(GCP_PROJECT_ID, DATASET_ID, table_name),
        dag = dag
    )
    
    create_empty_table = BigQueryCreateEmptyTableOperator(
        task_id = 'create_empty_table',
        project_id = GCP_PROJECT_ID,
        dataset_id = DATASET_ID,
        table_id = table_name,
        schema_fields =  [{"name": "A", "type": "STRING", "mode": "NULLABLE"},
                          {"name": "B", "type": "STRING", "mode": "NULLABLE"},
                          {"name": "support", "type": "FLOAT", "mode": "NULLABLE"},
                          {"name": "confidence", "type": "FLOAT", "mode": "NULLABLE"}, 
                          {"name": "lift", "type": "FLOAT", "mode": "NULLABLE" }], # '/schema/{}-schema.json'.format(table_name),
        dag = dag
    )
    
    write_to_bq = BigQueryOperator(
        task_id = 'write_to_bq', 
        use_legacy_sql = False, 
        write_disposition = 'WRITE_TRUNCATE', 
        allow_large_results = True, 
        sql = query, 
        destination_dataset_table = '{}.{}.{}'.format(GCP_PROJECT_ID, DATASET_ID, table_name),
        dag = dag, 
        on_success_callback = successed
    )
    
    delete_table >> create_empty_table >> write_to_bq

成功すれば Web UI の表示と Slack への通知は以下のようになります。
スクリーンショット (738).png
スクリーンショット (735).png
スクリーンショット (736).png

まとめ

Apache Airflow を用いて BigQuery 上での分析を効率化する簡単なワークフローを構築してみました。
より効率的な方法や理解不足なところなどありましたらご指摘のほどよろしくお願いいたします。

4
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?