LoginSignup
35
23

More than 5 years have passed since last update.

Airflowを使用してDWH向けデータパイプラインを作る

Posted at

前提

GCP内でのデータパイプラインで、DWHはBigQueryを使用

DWHバッチ処理のパターン

バッチ処理ベースのDWHデータパイプラインは
複雑なことをしない限り以下のパターンでほとんど網羅できると思われる。

  • GCSからCSV等のファイルロード(DROP-CREATE-INSERT)
  • GCSからCSV等のファイルを整形してロード(PREP-DROP-CREATE-INSERT)
  • 最新マスタ等のデータ更新(TRUNCATE-INSERT)
  • 蓄積テーブル、サマリのデータ更新(DELETE-INSERT)

image.png

あとはデータパイプラインとは直接関係ないが、運用系として以下の処理パターンもある

  • バッチ処理の開始と終了時のメール通知
  • GCS内でのファイル移動(退避)

Airflowでのデータパイプラインの作成方法

Airflowはpythonを使用して、DAGと呼ばれるジョブの固まりを定義したファイルを作成し、
そのファイルをconfigに定義したdagsディレクトリにアップロードすることでデータパイプラインを作成する

DAGの中の各ジョブはオペレータと呼ばれるAirflowのライブラリを使って作る
このオペレータが一通りのGCP操作を網羅しているので、パラメータの設定、テーブルのJSON定義、SQLファイルの作成だけで、ジョブを作成することができる

DAGファイルサンプル

大きな流れとしては、以下のイメージ

  1. 必要なライブラリ群の読み込み
  2. コンフィグを作ってる場合は、別ファイルから読み込み
  3. バッチ日付の設定
  4. DAGのデフォルトの設定値のセット(default_args)
  5. DAGの定義(with構文推奨)
  6. 依存関係の定義
sample.py
import configparser
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta, timezone

"""
set config
"""
config = configparser.ConfigParser()
config.read('/airflow/dags/daily.config')

bucket_nm = config.get('gcs', 'bucket_nm')
project_nm = config.get('bq', 'project_nm')
dataset_nm = config.get('bq', 'dataset_nm')
dataset_nm_wk = config.get('bq', 'dataset_nm_wk')

"""
set batch date
"""
JST = timezone(timedelta(hours=+9), 'JST')
batch_date = (datetime.now(JST)-timedelta(days=1)).strftime('%Y%m%d')

"""
set dag
"""
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 5, 13, 20, 0),
    'email': ['メールアドレス'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}


#dag
with DAG('daily_job', default_args=default_args, schedule_interval='20 20 * * *', catchup=False) as dag:

    t1 = BashOperator(
       task_id='sleep',
       bash_command='sleep 10',
       retries=1
       )
    t2 = BashOperator(
       task_id='sleep2',
       bash_command='slee 10',
       retries=1
       )

  """
  set dependency
  """
  t1 >> t2 

各処理パターンで使用するオペレータ

GCSからDROP-CREATE-LOAD

GCSの特定のファイルからBigQueryへロードするオペレータ

WRITE_TRUNCATEにしている場合、
ロード元ファイルにカラム名まで含めている場合はautodetect=True にすれば
テーブル定義のJSON不要でファイルからスキーマ判断してロードされる
ファイルにカラム名ない場合は、GCSバケットにテーブル定義のJSONファイル(スキーマオブジェクト)必要

ライブラリ
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
タスク定義
gcsload = GoogleCloudStorageToBigQueryOperator(
       task_id='タスク名',
       bucket=バケット名,
       source_objects=['ソースファイル名'],
       destination_project_dataset_table=プロジェクト名.データセット名.テーブル名,
       schema_object='スキーマオブジェクト名',
       source_format='CSV',
       write_disposition='WRITE_TRUNCATE',
       bigquery_conn_id='airflow_gcp',
       #autodetect=True,
       retries=3
       )
スキーマオブジェクト
[
{"name" : "test_cd","type" : "STRING","mode" : "REQUIRED","description" : "テストコード"},
{"name" : "eda_ban","type" : "STRING","mode" : "NULLABLE","description" : "枝番"}
]

GCSファイルのPREP-DROP-CREATE-LOAD

gcpのDataprepは裏側ではDataflowが動くので、
実行はDataflowのテンプレートを実行するイメージになる。
が、DataflowTemplateOperatorを使うとかなりハマる。
というか私では実行できなかった。(出力をBigQueryにしている場合)
(パラメータ指定時に複数の辞書変数を使うときにカンマ区切りになってしまうから、パラメータのデリミタをカンマ以外に指定しないといけないが、DataflowTemplateOperatorのパラメータでデリミタを変更することができなそうだった)

なので、オペレータはBashOperatorを使用し、gcloudコマンドでテンプレート実行する方式になる

ライブラリ
from airflow.operators.bash_operator import BashOperator
タスク定義
bash_cmd='gcloud dataflow jobs run ジョブ名 \
    --gcs-location gs://テンプレートの場所 \
    --parameters ^*^\
inputLocations="{\\"location1\\":\\"gs://インプットファイル\\"}"*\
outputLocations="{\\"location1\\":\\"アウトプットテーブル\\", \\"location2\\":\\"ロケーション2のアウトプット\\",\\"location3\\":\\"ロケーション3のアウトプット\\"}"*\
customGcsTempLocation="gs://テンプレートファイルのディレクトリ"'

prep = BashOperator(
       task_id='タスク名',
       bash_command=bash_cmd,
       retries=3
       )

WKテーブルやマスタ(最新データのみ保持)へのTRUNCATE-INSERT

BigQueryOperatorでdestination_dataset_tableを指定する場合は、
SQLファイルのSELECT結果をdestination_dataset_tableにINSERTすることになる

destination_dataset_tableを指定せずに、INSERT SELECTのSQL文を使うこともできるが、
WRITE_TRUNCATEを効かせる場合にはdestination_dataset_tableを指定する必要がある

destination_dataset_tableを指定する場合に、SELECT句で計算式を使う場合、
ASでカラム名を指定しないと「f0_」みたいなカラム名になってしまう

ライブラリ
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
タスク定義
trun_ins = BigQueryOperator(
        task_id='タスク名',
        sql='SQLファイル名(dagsディレクトリからの相対パス)',
        destination_dataset_table=プロジェクト名.データセット名.テーブル名,
        params={"project_nm": project_nm,
                "dataset_nm": dataset_nm,
                "dataset_nm_wk": dataset_nm_wk,
                "in_table_1": "テーブル名"},
        write_disposition='WRITE_TRUNCATE',
        bigquery_conn_id='airflow_gcp',
        use_legacy_sql=False
    )

SQLファイルの中にタスク定義の中の、paramsパラメータの値を使用できる。

SQLファイル
select
  column_a,
  column_b
from 
  `{{ params.project_nm }}.{{ params.dataset_nm_wk }}.{{ params.in_table_1 }}`

蓄積テーブルへの条件付DELETE

BigQueryOperatorを使用し、特定のカラムを条件にテーブルを削除するジョブ
テーブルの日付項目や、キー項目でのDELETEに使用

ライブラリ
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
タスク定義
del = BigQueryOperator(
        task_id='タスク名',
        sql='SQLファイル名(dagsディレクトリからの相対パス)',
        params={"project_nm": project_nm,
                "dataset_nm": dataset_nm,
                "dataset_nm_wk": dataset_nm_wk,
                "delete_column_where": "column_a",
                "delete_column_select": " column_b",
                "dest_table": "削除先テーブル名",
                "in_table_1": "入力テーブル"},
        write_disposition='WRITE_APPEND',
        bigquery_conn_id='airflow_gcp',
        use_legacy_sql=False
    )
SQLファイル
delete from
  `{{ params.project_nm }}.{{ params.dataset_nm }}.{{ params.dest_table }}`
where
  ({{ params.delete_column_where }})
 in
  (
    select
      {{ params.delete_column_select }}
    from
      `{{ params.project_nm }}.{{ params.dataset_nm_wk }}.{{ params.in_table_1 }}`    
  )

蓄積テーブルへの単純INSERT

TRUNCATE-INSERTとの違いは、write_dispositionがWRITE_APPENDになっていること

destination_dataset_tableを指定する場合に、SELECT句で計算式を使う場合、
ASでカラム名を指定しないとこの場合はエラーになってしまう
(既存のテーブルと項目名がずれてしまうとエラーになる)

ライブラリ
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
タスク定義
ins = BigQueryOperator(
        task_id='タスク名',
        sql='SQLファイル名(dagsディレクトリからの相対パス)',
        destination_dataset_table=プロジェクト名.データセット名.テーブル名,
        params={"project_nm": project_nm,
                "dataset_nm": dataset_nm,
                "dataset_nm_wk": dataset_nm_wk,
                "in_table_1": "テーブル名"},
        write_disposition='WRITE_APPEND',
        bigquery_conn_id='airflow_gcp',
        use_legacy_sql=False
    )
SQLファイル
select
  column_a,
  column_b
from 
  `{{ params.project_nm }}.{{ params.dataset_nm_wk }}.{{ params.in_table_1 }}`

メール配信

バッチ処理の開始と終了時にメール通知を行うジョブ

ライブラリ
from airflow.operators.email_operator import EmailOperator
タスク定義
job_start = EmailOperator(
        task_id='タスク名',
        to='送り先メールアドレス',
        subject='メールタイトル',
        html_content='メール本文'
  )

GCS内でのファイル移動

ライブラリ
from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator

move_objectをFalseにすると移動ではなくコピーになる

タスク定義
move_GCS = GoogleCloudStorageToGoogleCloudStorageOperator(
        task_id='タスク名',
        source_bucket='移動元のバケット',
        source_object='移動元のオブジェクト名',
        destination_bucket='移動先のバケット',
        destination_object='移動先のオブジェクト名',
        move_object=True,
        google_cloud_storage_conn_id='airflow_gcp'
  )

参考情報

(2019年5月時点)何か困ったときに日本語の情報はほぼない。。
公式ドキュメントや、「使いたいオペレーター example」みたいな感じでググりながら動かしてみるしかない。。

■bigqueryオペレータでパラメータ使うには
https://stackoverflow.com/questions/52103717/passing-arguments-to-sql-template-from-airflow-operator

■Composer FAQ
https://cloud-textbook.com/69/#Q_JST

■gcloudでのdataflow template実行
https://stackoverflow.com/questions/51751763/executing-a-dataflow-job-with-multiple-inputs-outputs-using-gcloud-cli

35
23
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
35
23