#前提
GCP内でのデータパイプラインで、DWHはBigQueryを使用
#DWHバッチ処理のパターン
バッチ処理ベースのDWHデータパイプラインは
複雑なことをしない限り以下のパターンでほとんど網羅できると思われる。
- GCSからCSV等のファイルロード(DROP-CREATE-INSERT)
- GCSからCSV等のファイルを整形してロード(PREP-DROP-CREATE-INSERT)
- 最新マスタ等のデータ更新(TRUNCATE-INSERT)
- 蓄積テーブル、サマリのデータ更新(DELETE-INSERT)
あとはデータパイプラインとは直接関係ないが、運用系として以下の処理パターンもある
- バッチ処理の開始と終了時のメール通知
- GCS内でのファイル移動(退避)
#Airflowでのデータパイプラインの作成方法
Airflowはpythonを使用して、DAGと呼ばれるジョブの固まりを定義したファイルを作成し、
そのファイルをconfigに定義したdagsディレクトリにアップロードすることでデータパイプラインを作成する
DAGの中の各ジョブはオペレータと呼ばれるAirflowのライブラリを使って作る
このオペレータが一通りのGCP操作を網羅しているので、パラメータの設定、テーブルのJSON定義、SQLファイルの作成だけで、ジョブを作成することができる
##DAGファイルサンプル
大きな流れとしては、以下のイメージ
- 必要なライブラリ群の読み込み
- コンフィグを作ってる場合は、別ファイルから読み込み
- バッチ日付の設定
- DAGのデフォルトの設定値のセット(default_args)
- DAGの定義(with構文推奨)
- 依存関係の定義
import configparser
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta, timezone
"""
set config
"""
config = configparser.ConfigParser()
config.read('/airflow/dags/daily.config')
bucket_nm = config.get('gcs', 'bucket_nm')
project_nm = config.get('bq', 'project_nm')
dataset_nm = config.get('bq', 'dataset_nm')
dataset_nm_wk = config.get('bq', 'dataset_nm_wk')
"""
set batch date
"""
JST = timezone(timedelta(hours=+9), 'JST')
batch_date = (datetime.now(JST)-timedelta(days=1)).strftime('%Y%m%d')
"""
set dag
"""
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 5, 13, 20, 0),
'email': ['メールアドレス'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
#dag
with DAG('daily_job', default_args=default_args, schedule_interval='20 20 * * *', catchup=False) as dag:
t1 = BashOperator(
task_id='sleep',
bash_command='sleep 10',
retries=1
)
t2 = BashOperator(
task_id='sleep2',
bash_command='slee 10',
retries=1
)
"""
set dependency
"""
t1 >> t2
#各処理パターンで使用するオペレータ
##GCSからDROP-CREATE-LOAD
GCSの特定のファイルからBigQueryへロードするオペレータ
WRITE_TRUNCATEにしている場合、
ロード元ファイルにカラム名まで含めている場合はautodetect=True にすれば
テーブル定義のJSON不要でファイルからスキーマ判断してロードされる
ファイルにカラム名ない場合は、GCSバケットにテーブル定義のJSONファイル(スキーマオブジェクト)必要
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
gcsload = GoogleCloudStorageToBigQueryOperator(
task_id='タスク名',
bucket=バケット名,
source_objects=['ソースファイル名'],
destination_project_dataset_table=プロジェクト名.データセット名.テーブル名,
schema_object='スキーマオブジェクト名',
source_format='CSV',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='airflow_gcp',
#autodetect=True,
retries=3
)
[
{"name" : "test_cd","type" : "STRING","mode" : "REQUIRED","description" : "テストコード"},
{"name" : "eda_ban","type" : "STRING","mode" : "NULLABLE","description" : "枝番"}
]
##GCSファイルのPREP-DROP-CREATE-LOAD
gcpのDataprepは裏側ではDataflowが動くので、
実行はDataflowのテンプレートを実行するイメージになる。
が、DataflowTemplateOperatorを使うとかなりハマる。
というか私では実行できなかった。(出力をBigQueryにしている場合)
(パラメータ指定時に複数の辞書変数を使うときにカンマ区切りになってしまうから、パラメータのデリミタをカンマ以外に指定しないといけないが、DataflowTemplateOperatorのパラメータでデリミタを変更することができなそうだった)
なので、オペレータはBashOperatorを使用し、gcloudコマンドでテンプレート実行する方式になる
from airflow.operators.bash_operator import BashOperator
bash_cmd='gcloud dataflow jobs run ジョブ名 \
--gcs-location gs://テンプレートの場所 \
--parameters ^*^\
inputLocations="{\\"location1\\":\\"gs://インプットファイル\\"}"*\
outputLocations="{\\"location1\\":\\"アウトプットテーブル\\", \\"location2\\":\\"ロケーション2のアウトプット\\",\\"location3\\":\\"ロケーション3のアウトプット\\"}"*\
customGcsTempLocation="gs://テンプレートファイルのディレクトリ"'
prep = BashOperator(
task_id='タスク名',
bash_command=bash_cmd,
retries=3
)
##WKテーブルやマスタ(最新データのみ保持)へのTRUNCATE-INSERT
BigQueryOperatorでdestination_dataset_tableを指定する場合は、
SQLファイルのSELECT結果をdestination_dataset_tableにINSERTすることになる
destination_dataset_tableを指定せずに、INSERT SELECTのSQL文を使うこともできるが、
WRITE_TRUNCATEを効かせる場合にはdestination_dataset_tableを指定する必要がある
destination_dataset_tableを指定する場合に、SELECT句で計算式を使う場合、
ASでカラム名を指定しないと「f0_」みたいなカラム名になってしまう
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
trun_ins = BigQueryOperator(
task_id='タスク名',
sql='SQLファイル名(dagsディレクトリからの相対パス)',
destination_dataset_table=プロジェクト名.データセット名.テーブル名,
params={"project_nm": project_nm,
"dataset_nm": dataset_nm,
"dataset_nm_wk": dataset_nm_wk,
"in_table_1": "テーブル名"},
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='airflow_gcp',
use_legacy_sql=False
)
SQLファイルの中にタスク定義の中の、paramsパラメータの値を使用できる。
select
column_a,
column_b
from
`{{ params.project_nm }}.{{ params.dataset_nm_wk }}.{{ params.in_table_1 }}`
##蓄積テーブルへの条件付DELETE
BigQueryOperatorを使用し、特定のカラムを条件にテーブルを削除するジョブ
テーブルの日付項目や、キー項目でのDELETEに使用
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
del = BigQueryOperator(
task_id='タスク名',
sql='SQLファイル名(dagsディレクトリからの相対パス)',
params={"project_nm": project_nm,
"dataset_nm": dataset_nm,
"dataset_nm_wk": dataset_nm_wk,
"delete_column_where": "column_a",
"delete_column_select": " column_b",
"dest_table": "削除先テーブル名",
"in_table_1": "入力テーブル"},
write_disposition='WRITE_APPEND',
bigquery_conn_id='airflow_gcp',
use_legacy_sql=False
)
delete from
`{{ params.project_nm }}.{{ params.dataset_nm }}.{{ params.dest_table }}`
where
({{ params.delete_column_where }})
in
(
select
{{ params.delete_column_select }}
from
`{{ params.project_nm }}.{{ params.dataset_nm_wk }}.{{ params.in_table_1 }}`
)
##蓄積テーブルへの単純INSERT
TRUNCATE-INSERTとの違いは、write_dispositionがWRITE_APPENDになっていること
destination_dataset_tableを指定する場合に、SELECT句で計算式を使う場合、
ASでカラム名を指定しないとこの場合はエラーになってしまう
(既存のテーブルと項目名がずれてしまうとエラーになる)
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
ins = BigQueryOperator(
task_id='タスク名',
sql='SQLファイル名(dagsディレクトリからの相対パス)',
destination_dataset_table=プロジェクト名.データセット名.テーブル名,
params={"project_nm": project_nm,
"dataset_nm": dataset_nm,
"dataset_nm_wk": dataset_nm_wk,
"in_table_1": "テーブル名"},
write_disposition='WRITE_APPEND',
bigquery_conn_id='airflow_gcp',
use_legacy_sql=False
)
select
column_a,
column_b
from
`{{ params.project_nm }}.{{ params.dataset_nm_wk }}.{{ params.in_table_1 }}`
##メール配信
バッチ処理の開始と終了時にメール通知を行うジョブ
from airflow.operators.email_operator import EmailOperator
job_start = EmailOperator(
task_id='タスク名',
to='送り先メールアドレス',
subject='メールタイトル',
html_content='メール本文'
)
##GCS内でのファイル移動
from airflow.contrib.operators.gcs_to_gcs import GoogleCloudStorageToGoogleCloudStorageOperator
move_objectをFalseにすると移動ではなくコピーになる
move_GCS = GoogleCloudStorageToGoogleCloudStorageOperator(
task_id='タスク名',
source_bucket='移動元のバケット',
source_object='移動元のオブジェクト名',
destination_bucket='移動先のバケット',
destination_object='移動先のオブジェクト名',
move_object=True,
google_cloud_storage_conn_id='airflow_gcp'
)
#参考情報
(2019年5月時点)何か困ったときに日本語の情報はほぼない。。
公式ドキュメントや、「使いたいオペレーター example」みたいな感じでググりながら動かしてみるしかない。。
■bigqueryオペレータでパラメータ使うには
https://stackoverflow.com/questions/52103717/passing-arguments-to-sql-template-from-airflow-operator
■Composer FAQ
https://cloud-textbook.com/69/#Q_JST
■gcloudでのdataflow template実行
https://stackoverflow.com/questions/51751763/executing-a-dataflow-job-with-multiple-inputs-outputs-using-gcloud-cli