1. 前提
-
定常バッチなどで作成されたテーブルに対して、テストを行い演算結果に異常がある場合は検知したい。
-
Google Cloud Composer(airflow) + BigQuery (+slack) 環境を想定。
- (注意:テストを定常実行する場合、対象テーブルのスキャン量を気にしないと、BQ課金額が高額に!)
-
テーブルに対するテストの自動化について、様々な比較軸でアーキテクチャを検討したわけではなく、今回の構成は一例。
- テストについては、pytestと組み合わせたテストの例が多いが、別途メリデリの比較検討は必要。
2. モチベーション
-
テーブルに対するテスト環境をお手軽に構築して、自動的にテストを行いたい。
- そのために、Google Cloud Composer(airflow)のデフォルトパッケージだけで完結したい。
-
複数のテーブルに対して同じテストを実行する仕組みが良いので、設定ファイルの形で管理できるようにする。
- 例:XXXテーブルのAAAカラムと、YYYテーブルのBBBカラムに対して、NULL値?のチェックを行う。
- 設定ファイルの記述(テーブルに対して何をテストするのか?を情報集約をする場合):
- "XXX": { "test_check_null": {"columns": ["AAA"]}}, "YYY": { "test_check_null": {"columns": ["BBB"]}}
- 設定ファイルの記述(テーブルに対して何をテストするのか?を情報集約をする場合):
- 例:XXXテーブルのAAAカラムと、YYYテーブルのBBBカラムに対して、NULL値?のチェックを行う。
-
我流のSQLに関わるテスト方法について書くことで、より良いテスト方法のツッコミを期待!
3. まずは、テストの観点を整理
テストしたいタイミングと箇所
- 演算ロジック作成時
- DWH上の中間テーブル
- DataMart上の最終テーブル
- 定常バッチ演算時
- DataLake上のデータ取り込み箇所
- データ演算終了後のDataMart上の最終テーブル**(← 今回はここを想定。)**
テストの種類
- ブラックボックス的なテスト(主に、取込時点のテーブルや最終テーブルなどを対象)(← 今回はここを想定。)
- 計算結果の件数
- 例:0件ではないこと
- カラムがNULL値
- カラムの属性値が範囲外
- 例:'Blue'/'Red'のみ(カテゴリ値)、0 <= Rate <= 1(連続値)
- 最小粒度のチェック
- 例:Nation/Prefecture/State毎の粒度になっているか?
- etc
- 計算結果の件数
- ホワイトボックス的なテスト(主に、中間テーブルや最終テーブルを対象)
- 条件分岐
- etc
4. そして、テーブルに対するテストのための構成例
4.1. 構成の概要
- Google Cloud Composer(airflow)は以下を実行する。
- テーブル作成のDAGファイル/生成用のSQLファイル
- (スケジュールを実行して、SQLを叩いてBigQuery上にテーブルを生成するだけ。)
- 上記で生成されたテーブルに対する、テスト用DAGファイル/テンプレートSQLファイル
- (設定ファイルの情報を解釈して、適切にテーブルテスト用taskを生成して、テスト用のSQLをスケジュール実行し、結果をaiflow UI上に表示。必要に応じてslackに通知する。)
- テーブル作成のDAGファイル/生成用のSQLファイル
4.2. Google Cloud Composer(airflow)のフォルダ構成例
- DAGファイルの直下に、dag用Pythonスクリプトと、スクリプトが呼び出すSQLテンプレート
- test_sqlの直下に、テストのための設定ファイルを設置
-- airflow
|-- dags
| |-- sql 「バッチ用SQLフォルダ」
| | |-- aaa1.sql (DAG"aaa"で作成するテーブルaaa1に対応するSQL)
| | |-- ・・・
| |-- test_sql 「バッチ用SQLで作成されたテーブルに対するテストフォルダ」
| |-- template 「テスト用テンプレートSQLのフォルダ」
| |-- test_check_null.sql (テーブルのテストSQLをここに追加(参照4.3))
| |-- ・・・
| |-- test_aaa.json (DAG"aaa"で作成したテーブルのテスト用設定ファイル(参照4.4))
|-- aaa.py (DAG"aaa"のスクリプト)
|-- test_aaa.py (DAG"aaa"で作成されたテーブルに対するテスト用のDAG(参照4.5))
実行順番
-
”test_aaa.py” <ーー(1.読み込み)ーー ”test_aaa.json”
-
”test_aaa.py” <ーー(2.jsonを元に読み込み、テスト実行)ーー ”test_check_null.sqlなど”
4.3. テーブルに対するテストのためのテンプレートSQLについて
- テストSQLは色々作れるので、ここでは一つのテストSQL例を取り上げる。
- 自分は、テストSQLについては、以下の4つの情報を吐き出している。
-
- table_name: テスト対象テーブル
-
- check_content: 何をチェックしたのか?などのLog情報
-
- has_error: テストの成功有無
-
- error_cnt: エラーのレコード数
-
- テンプレート内の、$xxxは置換対象
- 置換方法は、DAGファイル内で実行。自分は、ファイルにSQLファイルを読み込み、replace関数で置換。
-- 例:NULL値のチェックをするSQL(test_check_null.sql)
-- $TARGET_TABLE: テスト対象のテーブル名
-- $WHERE_PARTITION_KEY: テスト対象のテーブルのパーティションを指定
-- $TARGET_COLUMN: テスト対象のテーブルのカラムを指定
WITH
base_table AS (
SELECT
*
FROM
`$TARGET_TABLE`
$WHERE_PARTITION_KEY
),
-- 今回はNULL値をチェックするWith句だが、ここを変更をすれば属性値の逸脱チェックなども可能。
check_null_column AS (
SELECT
COUNT(1) AS error_cnt
FROM
base_table
WHERE
$TARGET_COLUMN IS NULL
),
log_format AS (
SELECT
'$TARGET_TABLE' AS table_name
, 'check null about $TARGET_COLUMN' AS check_content
)
SELECT
table_name
, check_content
, error_cnt <> 0 AS has_error
, error_cnt
FROM
log_format
CROSS JOIN
check_null_column
;
4.4. テスト用設定ファイルについて
- 自分の設定ファイルでは、
- dataset名(例:xxx_data_mart) > table名(例:dm_xxx)の順番
- table名の下に、テスト項目を並べています。(例:test_check_number_of_lines, test_check_nullなど)
- テストを"test_check_null": {}のdict構造にしているのは、テストの種類によって、変数が増える事を考慮してます。
- テストについては、table内の複数のcolumnに対して実行したいので、list構造で併記できるようにしました。
4.5. Google Cloud Composer(airflow)のテスト用DAGファイルのポイント
- 自分のDAGファイルでは、
- 要望1:
- with airflow.DAG()句内をすっきりさせたい。
- 対応策1:
- 設定ファイル(例:test_aaa.json)を読み込んで、for文でdagを作る。
- 要望2:
- BigQueryOperator.execute()を実行し、BigQueryの実行結果の情報を受け取り、テストのエラー時にはairflowのtaskがエラーなるようにして、テストの失敗をairflow UIで確認できるようにしたい。
- 対応策2:
- BigQueryOperator.execute()のために、context情報を渡す必要があるため、with airflow.DAG()句内PythonOperatorでprovide_context=Trueを行う。
- 要望3:
- airflow UIのtask数の増加を抑えたいため、同じテストを複数のカラムに行うのを1task内で完結させる。
- 対応策3:
- テーブルとタスク毎にPythonOperatorタスクを生成して、その中でfor文でカラム毎のテストクエリを生成する。
- 要望1:
test_aaa.pyのwith句部分
with airflow.DAG(
・・・
default_args=default_dag_args) as dag:
start_task = DummyOperator(task_id='start')
finish_task = DummyOperator(task_id='finish')
# Todo: test_json_pathは、テスト設定ファイルのjsonのパスを指定。
with open(test_json_path, "r") as f:
load_json = json.load(f)
test_tables = load_json[dataset]
# Todo: target_tablesは、テスト対象のテーブル名のlistです。
for target_table in target_tables:
obj_table_start_task = DummyOperator(task_id=f"start_{target_table}")
start_task >> obj_table_start_task
for test_name, test_info in test_tables[target_table]['test_list'].items():
main_task_name = f"{test_name}_for_{target_table}"
test_info['dataset'] = dataset
test_info['table'] = target_table
# PythonOperatorのため、Cloud Composerにデプロイした時点では、SQLは展開されていない。
test_task = PythonOperator(
task_id=main_task_name,
python_callable=test_bigquery_table,
provide_context=True,
op_kwargs=test_info
)
obj_table_start_task >> test_task >> finish_task
test_aaa.pyのtest_bigquery_task句部分
def set_test_setting(test_name, table, column):
test_setting = {
'bql_template_base_path': '',
'error_template': ''
}
if test_name == 'test_check_null':
test_setting['bql_template_base_path'] = 'test_results/template/test_check_null.sql'
test_setting['error_template'] = 'Found Null value'
# Todo: 以下略
def make_bql(target):
# Todo: 冗長なので直したい。
dataset = target['dataset']
table = target['table']
partition = target['partition']
bql_template_base_path = target['bql_template_base_path']
target_column = target['target_column']
#####
target_table = ''
if partition is True:
target_table = f'{PARAMS["DESTINATION_PROJECT"]}.{dataset}.{table}${{{{ ds_nodash }}}}'
else:
target_table = f'{PARAMS["DESTINATION_PROJECT"]}.{dataset}.{table}'
bql_template_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), bql_template_base_path)
bql = open(bql_template_path).read()
# Todo: テストテンプレートを追加した場合は、パラメータ変更ルールも追加
params = {
"$TARGET_TABLE": target_table,
"$TARGET_COLUMN": target_column,
"$WHERE_PARTITION_KEY": f'WHERE {target["partition_key"]} = "${{{{ ds_nodash }}}}"' if partition is True else ''
}
for key, value in params.items():
bql = bql.replace(key, value)
return bql
def test_bigquery_table(**kwargs):
kwargs.update(set_table_info(kwargs['dataset'], kwargs['table']))
for column in kwargs['columns']:
# Todo: set_test_settingは、テスト用SQLのパスとテスト種類毎のエラーメッセージを呼び出す関数
kwargs.update(set_test_setting(kwargs['test_name'], kwargs['table'], column))
task_name = f"bq_test-{kwargs['table']}"
kwargs['target_column'] = column
# Todo: make_sqlは、テスト用SQLテンプレートから実際のSQLを生成する関数
bql = make_bql(kwargs)
bq_task = BigQueryOperator(
task_id=task_name,
sql=bql,
use_legacy_sql=False,
)
# BigQueryにSQL発行し、実行結果を取得
bq_task.execute(context=kwargs)
bq_task.bq_cursor.job_id = bq_task.bq_cursor.running_job_id
result = bq_task.bq_cursor.next()
logging.info(result)
# result[2]は、テストクエリの成功有無、Falseなら、例外処理でエラーを返して、taskを失敗させる。
error_status_code = result[2]
if error_status_code:
raise ValueError(f"{kwargs['error_template']} in {column} of {kwargs['table']}")
参考:実行結果
test用DAGのairflow UI
- テストが増えるとさすがに見辛いので、人にとってPull型の通知にした方が良い。
slackによるエラー通知
- Errorをキャッチして、通知するようにしておくとわかりやすい。
以上です。