概要
DWHなんかを担当していると、まあ、嫌になるほどデータの補正をやらされるかと思います。
ということで、毎回個別に処理を書くのもしんどいので、修正前データ/修正後データを用意しておけば、そのデータで補正を行ってくれるmacroを作成してみました。
例えば以下のようなテーブル(model)があるとします。
- test_model
key | date | value |
---|---|---|
1 | 1月1日 | テスト1 |
2 | 1月2日 | テスト2 |
3 | 1月3日 | テスト3 |
4 | 1月4日 | テスト4 |
この時、以下のように補正前/補正後のデータを保持するテーブル・データを用意しておくことで
- adjust__test_model__before (修正前データ)
key | date | value |
---|---|---|
3 | 1月3日 | テスト3 |
4 | 1月4日 | テスト4 |
- adjust__test_model__after (修正後データ)
key | date | value |
---|---|---|
4 | 1月4日 | テスト4 - 変更後 |
5 | 1月5日 | テスト5 - 追加 |
以下のように、データを補正してくれる感じです。
- 補正後データ
key | date | value |
---|---|---|
1 | 1月1日 | テスト1 |
2 | 1月2日 | テスト2 |
4 | 1月4日 | テスト4 - 変更後 |
5 | 1月5日 | テスト5 - 追加 |
要するに、動作としては以下です。
- 修正後前データに存在し、修正後データには存在しない ⇒ 削除される
- 修正後前データに存在し、修正後データも存在する ⇒ 上書きされる
- 修正後前データには存在せず、修正後データには存在する ⇒ 追加される
なお、前提として、対象となるテーブル(model)にはユニークキーが設定されている必要があります。
また、補正用のデータを保持するテーブルの名称は、固定で以下となります。
- 修正前データ: adjust__<補正対象テーブル名>__before
- 修正後データ: adjust__<補正対象テーブル名>__after
Macroの使い方・内容
具体的な使い方、内容は以下の通りです。
Macroの使い方
使い方としては、以下のようにmodelの処理の中で、当該macroを呼び出します。
get_adjusted_dataがmacroの名称で、補正を行う対象となるmodelの名称を引数にとります。
with base_data as (
{{ get_adjusted_data("test_model") }}
)
前提として、補正対象のテーブル(model)にはユニークキーが設定されている必要があります。
models:
- name: test_model
tests:
- unique:
column_name: key
Macroの内容
get_adjusted_dataの内容は以下です。
{% macro get_adjusted_data(model_name) %}
{%- set adjust_table_before = "adjust__" + model_name + "__before" -%}
{%- set adjust_table_after = "adjust__" + model_name + "__after" -%}
{%- set model = ref(model_name) -%}
{%- set model_config = get_config_from_model_name(model_name) -%}
{%- set unique_key = model_config.unique_key -%}
{%- if unique_key is not string -%} {# checking if unique_key is list or scalar #}
{%- set keys = unique_key -%}
{%- else %}
{%- set keys = [unique_key] -%}
{%- endif %}
select * from {{ ref(model_name) }} as t1
where
not exists (
select 1 from {{ ref(adjust_table_before) }} t2
where
{% for key in keys -%}
t2.{{ key }} = t1.{{ key }}{{ '' if loop.last else ' and ' }}
{%- endfor %}
)
union all
select * from {{ ref(adjust_table_after) }} t3
{% endmacro %}
当該macroの動作には、以下のmacroが必要です。
こちらもmacroフォルダに格納しておいてください。
{% macro get_config_from_model_name(model_name) %}
{% set model_ref = ref(model_name) %}
{% set model_schema = model_ref.schema %}
{% set ns = namespace(config=none) %}
{% if execute %}
{% for node in graph.nodes.values() %}
{%- if node.schema == model_schema and (node.name == model_name or node.alias == model_name) -%}
{% set ns.config = node.config %}
{%- endif -%}
{% endfor %}
{% endif %}
{{ return(ns.config) }}
{% endmacro %}
参考
以下の手順で開発環境を作成すれば、当該macroの動作が確認できます。
以下のモデルで当該の補正処理を使用しています。
- stg_qaweb__incident_comments
- stg_qaweb__incidents
その他
本当は、修正前データを実データを比較して、変更がないことをチェックする処理を入れるつもりだったのですが、ちょっとそこまで手が回りませんでした。
修正前データがキーだけでなく、データ全体を保持しているのはそのためです。
試していないですが、修正前データの方は、unique keyの列のみのテーブルでも動作はすると思います。