12
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

[Case]Google Cloud Composer(airflow)とBigQueryを使ったテーブルに対するテストを自動化してみる。

Last updated at Posted at 2020-05-24

1. 前提

  • 定常バッチなどで作成されたテーブルに対して、テストを行い演算結果に異常がある場合は検知したい。

  • Google Cloud Composer(airflow) + BigQuery (+slack) 環境を想定

    • (注意:テストを定常実行する場合、対象テーブルのスキャン量を気にしないと、BQ課金額が高額に!)
  • テーブルに対するテストの自動化について、様々な比較軸でアーキテクチャを検討したわけではなく、今回の構成は一例。

    • テストについては、pytestと組み合わせたテストの例が多いが、別途メリデリの比較検討は必要。

2. モチベーション

  • テーブルに対するテスト環境をお手軽に構築して、自動的にテストを行いたい。

    • そのために、Google Cloud Composer(airflow)のデフォルトパッケージだけで完結したい。
  • 複数のテーブルに対して同じテストを実行する仕組みが良いので、設定ファイルの形で管理できるようにする。

    • 例:XXXテーブルのAAAカラムと、YYYテーブルのBBBカラムに対して、NULL値?のチェックを行う。
      • 設定ファイルの記述(テーブルに対して何をテストするのか?を情報集約をする場合):
        • "XXX": { "test_check_null": {"columns": ["AAA"]}}, "YYY": { "test_check_null": {"columns": ["BBB"]}}
  • 我流のSQLに関わるテスト方法について書くことで、より良いテスト方法のツッコミを期待!

3. まずは、テストの観点を整理

テストしたいタイミングと箇所

  • 演算ロジック作成時
    • DWH上の中間テーブル
    • DataMart上の最終テーブル
  • 定常バッチ演算時
    • DataLake上のデータ取り込み箇所
    • データ演算終了後のDataMart上の最終テーブル**(← 今回はここを想定。)**

テストの種類

  • ブラックボックス的なテスト(主に、取込時点のテーブルや最終テーブルなどを対象)(← 今回はここを想定。)
    • 計算結果の件数
      • 例:0件ではないこと
    • カラムがNULL値
    • カラムの属性値が範囲外
      • 例:'Blue'/'Red'のみ(カテゴリ値)、0 <= Rate <= 1(連続値)
    • 最小粒度のチェック
      • 例:Nation/Prefecture/State毎の粒度になっているか?
    • etc
  • ホワイトボックス的なテスト(主に、中間テーブルや最終テーブルを対象)
    • 条件分岐
    • etc

4. そして、テーブルに対するテストのための構成例

4.1. 構成の概要

  • Google Cloud Composer(airflow)は以下を実行する。
    • テーブル作成のDAGファイル/生成用のSQLファイル
      • (スケジュールを実行して、SQLを叩いてBigQuery上にテーブルを生成するだけ。)
    • 上記で生成されたテーブルに対する、テスト用DAGファイル/テンプレートSQLファイル
      • (設定ファイルの情報を解釈して、適切にテーブルテスト用taskを生成して、テスト用のSQLをスケジュール実行し、結果をaiflow UI上に表示。必要に応じてslackに通知する。)

4.2. Google Cloud Composer(airflow)のフォルダ構成例

  • DAGファイルの直下に、dag用Pythonスクリプトと、スクリプトが呼び出すSQLテンプレート
  • test_sqlの直下に、テストのための設定ファイルを設置
-- airflow
   |-- dags
   |  |-- sql                       「バッチ用SQLフォルダ」
   |  |  |-- aaa1.sql               (DAG"aaa"で作成するテーブルaaa1に対応するSQL)
   |  |  |-- ・・・
   |  |-- test_sql                  「バッチ用SQLで作成されたテーブルに対するテストフォルダ」
   |     |-- template               「テスト用テンプレートSQLのフォルダ」
   |        |-- test_check_null.sql (テーブルのテストSQLをここに追加(参照4.3))
   |        |-- ・・・
   |     |-- test_aaa.json          (DAG"aaa"で作成したテーブルのテスト用設定ファイル(参照4.4))
   |-- aaa.py                       (DAG"aaa"のスクリプト)
   |-- test_aaa.py                  (DAG"aaa"で作成されたテーブルに対するテスト用のDAG(参照4.5))

 実行順番

  • ”test_aaa.py” <ーー(1.読み込み)ーー ”test_aaa.json”

  • ”test_aaa.py” <ーー(2.jsonを元に読み込み、テスト実行)ーー ”test_check_null.sqlなど”

4.3. テーブルに対するテストのためのテンプレートSQLについて

  • テストSQLは色々作れるので、ここでは一つのテストSQL例を取り上げる。
  • 自分は、テストSQLについては、以下の4つの情報を吐き出している。
      1. table_name: テスト対象テーブル
      1. check_content: 何をチェックしたのか?などのLog情報
      1. has_error: テストの成功有無
      1. error_cnt: エラーのレコード数
  • テンプレート内の、$xxxは置換対象
    • 置換方法は、DAGファイル内で実行。自分は、ファイルにSQLファイルを読み込み、replace関数で置換。
-- 例:NULL値のチェックをするSQL(test_check_null.sql)
-- $TARGET_TABLE:        テスト対象のテーブル名
-- $WHERE_PARTITION_KEY: テスト対象のテーブルのパーティションを指定
-- $TARGET_COLUMN:       テスト対象のテーブルのカラムを指定
WITH
base_table AS (
  SELECT
    *
  FROM
    `$TARGET_TABLE`
  $WHERE_PARTITION_KEY
),

-- 今回はNULL値をチェックするWith句だが、ここを変更をすれば属性値の逸脱チェックなども可能。
check_null_column AS (
  SELECT
    COUNT(1) AS error_cnt
  FROM
    base_table
  WHERE
    $TARGET_COLUMN IS NULL
),
log_format AS (
  SELECT
    '$TARGET_TABLE' AS table_name
    , 'check null about $TARGET_COLUMN' AS check_content
)

SELECT
  table_name
  , check_content
  , error_cnt <> 0 AS has_error
  , error_cnt
FROM
  log_format
CROSS JOIN
  check_null_column
;

4.4. テスト用設定ファイルについて

  • 自分の設定ファイルでは、
    • dataset名(例:xxx_data_mart) > table名(例:dm_xxx)の順番
    • table名の下に、テスト項目を並べています。(例:test_check_number_of_lines, test_check_nullなど)
      • テストを"test_check_null": {}のdict構造にしているのは、テストの種類によって、変数が増える事を考慮してます。
    • テストについては、table内の複数のcolumnに対して実行したいので、list構造で併記できるようにしました。

(※以下、参考画像)

4.5. Google Cloud Composer(airflow)のテスト用DAGファイルのポイント

  • 自分のDAGファイルでは、
    • 要望1:
      • with airflow.DAG()句内をすっきりさせたい。
    • 対応策1:
      • 設定ファイル(例:test_aaa.json)を読み込んで、for文でdagを作る。
    • 要望2:
      • BigQueryOperator.execute()を実行し、BigQueryの実行結果の情報を受け取り、テストのエラー時にはairflowのtaskがエラーなるようにして、テストの失敗をairflow UIで確認できるようにしたい。
    • 対応策2:
      • BigQueryOperator.execute()のために、context情報を渡す必要があるため、with airflow.DAG()句内PythonOperatorでprovide_context=Trueを行う。
    • 要望3:
      • airflow UIのtask数の増加を抑えたいため、同じテストを複数のカラムに行うのを1task内で完結させる。
    • 対応策3:
      • テーブルとタスク毎にPythonOperatorタスクを生成して、その中でfor文でカラム毎のテストクエリを生成する。
test_aaa.pyのwith句部分
    with airflow.DAG(
            ・・・
            default_args=default_dag_args) as dag:

        start_task = DummyOperator(task_id='start')
        finish_task = DummyOperator(task_id='finish')

        # Todo: test_json_pathは、テスト設定ファイルのjsonのパスを指定。
        with open(test_json_path, "r") as f:
            load_json = json.load(f)
        test_tables = load_json[dataset]

        # Todo: target_tablesは、テスト対象のテーブル名のlistです。
        for target_table in target_tables:
            obj_table_start_task = DummyOperator(task_id=f"start_{target_table}")
            start_task >> obj_table_start_task

            for test_name, test_info in test_tables[target_table]['test_list'].items():
                main_task_name = f"{test_name}_for_{target_table}"
                test_info['dataset'] = dataset
                test_info['table'] = target_table
                # PythonOperatorのため、Cloud Composerにデプロイした時点では、SQLは展開されていない。
                test_task = PythonOperator(
                    task_id=main_task_name,
                    python_callable=test_bigquery_table,
                    provide_context=True,
                    op_kwargs=test_info
                )
                obj_table_start_task >> test_task >> finish_task
test_aaa.pyのtest_bigquery_task句部分
def set_test_setting(test_name, table, column):
    test_setting = {
        'bql_template_base_path': '',
        'error_template': ''
    }
    if test_name == 'test_check_null':
        test_setting['bql_template_base_path'] = 'test_results/template/test_check_null.sql'
        test_setting['error_template'] = 'Found Null value'
    # Todo: 以下略

def make_bql(target):
    # Todo: 冗長なので直したい。
    dataset = target['dataset']
    table = target['table']
    partition = target['partition']
    bql_template_base_path = target['bql_template_base_path']
    target_column = target['target_column']
    #####
    target_table = ''
    if partition is True:
        target_table = f'{PARAMS["DESTINATION_PROJECT"]}.{dataset}.{table}${{{{ ds_nodash }}}}'
    else:
        target_table = f'{PARAMS["DESTINATION_PROJECT"]}.{dataset}.{table}'

    bql_template_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), bql_template_base_path)
    bql = open(bql_template_path).read()

    # Todo: テストテンプレートを追加した場合は、パラメータ変更ルールも追加
    params = {
        "$TARGET_TABLE": target_table,
        "$TARGET_COLUMN": target_column,
        "$WHERE_PARTITION_KEY": f'WHERE {target["partition_key"]} = "${{{{ ds_nodash }}}}"' if partition is True else ''
    }

    for key, value in params.items():
        bql = bql.replace(key, value)
    return bql

def test_bigquery_table(**kwargs):
    kwargs.update(set_table_info(kwargs['dataset'], kwargs['table']))
    for column in kwargs['columns']:
        # Todo: set_test_settingは、テスト用SQLのパスとテスト種類毎のエラーメッセージを呼び出す関数
        kwargs.update(set_test_setting(kwargs['test_name'], kwargs['table'], column))
        task_name = f"bq_test-{kwargs['table']}"
        kwargs['target_column'] = column
        # Todo: make_sqlは、テスト用SQLテンプレートから実際のSQLを生成する関数
        bql = make_bql(kwargs)
        bq_task = BigQueryOperator(
            task_id=task_name,
            sql=bql,
            use_legacy_sql=False,
        )

        # BigQueryにSQL発行し、実行結果を取得
        bq_task.execute(context=kwargs)
        bq_task.bq_cursor.job_id = bq_task.bq_cursor.running_job_id
        result = bq_task.bq_cursor.next()
        logging.info(result)
        # result[2]は、テストクエリの成功有無、Falseなら、例外処理でエラーを返して、taskを失敗させる。
        error_status_code = result[2]
        if error_status_code:
            raise ValueError(f"{kwargs['error_template']} in {column} of {kwargs['table']}")

参考:実行結果

test用DAGのairflow UI

  • テストが増えるとさすがに見辛いので、人にとってPull型の通知にした方が良い。

slackによるエラー通知

  • Errorをキャッチして、通知するようにしておくとわかりやすい。

以上です。

12
10
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?