6
4

More than 1 year has passed since last update.

【Google Cloud】BigqueryのDataformを試してみた

Posted at

はじめに

こんにちは。
前回 【Google Cloud】Bigqueryに組み込まれたDataformとはどのようなサービスなのか? という記事を書かせて頂きましたが実際にDataformを軽く試してみました。

Dataformを試してみる

パターン①

パターン①としては下記内容を試してみました。

  1. test_sales : GCSのファイルをテーブルへロードする
  2. test_id_mst : マスタテーブルを登録する
  3. test_dm : 項番1、2を結合してデータマート(テーブル)を更新する。更新はCTAS文を実行する。
  4. test_dm_snapshot : 項番3のテーブルを利用してスナップショットマートを更新する。更新はDelete&Insert文を実行する。

最終的には以下のDAGが作成されます。
drawing0.png

1. test_sales : GCSのファイルをテーブルへロードする

GCSのファイルをload data文で取り込む処理を実装します。
基本のテーブル作成処理ではないのでtype: 'operations'を利用します。
self関数は自分自身のテーブル名を参照できる関数です。下記の例ではconfigで明示的にテーブル名を指定していないのでファイル名のtest_salesがテーブル名となります。プロジェクト名、データセット名もconfigで明示的に指定していない為、デフォルトで設定したものが利用されます。明示的に記載したい場合下記となります。
※ configに設定しているhasOutputの動作は正確に理解はできていませんが、type: 'operations'self()関数を利用する際には必須となるようです。
※ GCSへアクセスするのでDataform ServiceAccountには適切なGCSの権限設定が必要です。

config {
    database: 'your-project-name',
    schema: 'your-dataset-name',
    name: 'your-table-name',
}

configにtagを記載することでワークフローをグループ化できます。ワークフロー実行の際にtag単位で実行できるので管理が楽になります。
この例ではtags: ['create_dm']を記載しています。

test_sales.sqlx
config {
    type: 'operations',
    hasOutput: true,
    tags: ['create_dm'],
}

create or replace table ${self()}
(
    id int64
    ,sales_amount int64
)
;

load data into ${self()}
from files(
    format = 'CSV',
    field_delimiter = '\t',
    skip_leading_rows = 1,
    uris = ['gs://your-bucket/*.tsv']
)
;

利用したデータファイルです。

test_sales.tsv
id	sales_amount
1	100
2	200
3	300
4	400
5	500
1	1000
3	1500
4	1200

テーブルをクエリした結果です。
image.png

2. test_id_mst : マスタテーブルを登録する

項番3で実行するSQLのソーステーブルを登録します。
Dataformのワークフロー処理で作成•更新されるテーブルではないテーブルを登録することが可能です。主にテーブル更新の際にソースとして利用されるテーブルになると思います。ソーステーブルを登録しておくことで、DAGに依存関係が表示される様になりテーブルの関係性が分かりやすくなるメリットがあります。

テーブルを作成するSQLです。Bigqueryで直接テーブルを作成しておきます。

create or replace table temp.test_id_mst
as
with mst as (
  select 1 as id, '壱' as name
  union all
  select 2 as id, '弐' as name
  union all
  select 3 as id, '参' as name
  union all
  select 4 as id, '肆' as name
  union all
  select 5 as id, '伍' as name
)

select * from st
;

type: 'declaration'を利用してDataformへテーブルを登録します。
明示的にテーブル名を指定していないのでファイル名がテーブル名として登録されます。

test_id_mst.sqlx
config {
    type: 'declaration',
    schema: 'temp'
}

テーブルをクエリした結果です。
image.png

3. test_dm : 項番1、2を結合してデータマート(テーブル)を更新する。更新はCTAS文を実行する

項番1、2で作成したテーブルをソースとしてテーブルを更新します。
基本的なテーブル作成になる為type: 'table'を記載します。type: 'table'は実際にはCTASが実行されます。Dataformのメイン処理の1つです。
登録されたソーステーブルやワークフローで作成されたテーブルはref関数で参照できます。
ref関数はSQLXファイル(≒テーブル)を参照する関数です。ref関数でテーブルを参照することによりテーブル間の依存関係が自動的に登録されます。以下の例ですと、test_dmのソースとしてtest_salestest_id_mstref関数で参照されているので下記のグラフが作成されます。

ref関数を利用せずにテーブル名をハードコードすることで同等の処理を実行することも可能ですが、基本的にはref関数あってのDataformの価値だと思います。
こちらも明示的にテーブル名を指定していないのでファイル名がテーブル名となります。
検証では利用していませんが、下記の記載も可能です。

  • テーブル•カラムのコメント
  • パーティショニング、クラスタリング
  • ポリシータグ
  • メインSQL実行の前後にセット実行できるSQLの記載。例えばメインSQLで作成したテーブルにGrantを実行する。
test_dm.sqlx
config {
    type: 'table',
    tags: ['create_dm'],
}

select
    fact.id
    ,mst.name
    ,sum(fact.sales_amount) as total_amount
from
    ${ref('test_sales')} as fact
left outer join
    ${ref('test_id_mst')} as mst
on
    fact.id = mst.id
group by
    id
    ,name

4. test_dm_snapshot : 項番3のテーブルを利用してスナップショットマートを更新する。更新はDelete&Insert文を実行する

項番3で作成したテーブルをソースとしてテーブルを更新します。
基本のテーブル作成処理ではないのでtype: 'operations'を利用します。
項番3と同じようにref関数でテーブルを参照しているので下記のグラフが作成されます。

明示的にテーブル名を指定していないのでファイル名がテーブル名となります。

test_dm_snapshot
   type: 'operations',
    hasOutput: true,
    tags: ['create_dm'],
}

create table if not exists ${self()}
(
    extract_date date
    ,id int64
    ,name string
    ,total_amount int64
)
;

delete from ${self()} where extract_date = current_date('Asia/Tokyo')
;

insert into ${self()}
select
    current_date('Asia/Tokyo') as extract_date
    ,id
    ,name
    ,total_amount
from
    ${ref('test_dm')}
;

ワークフローを実行する

最終的なグラフです。

実行内容のグラフです。

出来上がったワークフローを実行するとtest_dmtest_dm_snapshotともに正しい結果で実行できました。
image.png
image.png

パターン②

パターン②としては下記内容を試してみました。

  1. GCSファイルをソースとして外部テーブルを作成する
  2. 項番1で作成したテーブルを利用してテーブルを更新する。更新はMerge文を実行する。

最終的には以下のDAGが作成されます。
drawing1.png

1. GCSファイルをソースとして外部テーブルを作成する

GCSのファイルをソースにcreate external table文で外部テーブルを作成します。
基本のテーブル作成処理ではないのでtype: 'operations'を利用します。
tagは'merge_table'を記載しています。
明示的にテーブル名を指定していないのでファイル名がテーブル名となります。
※ パターン1でも記載しましたが、GCSへアクセスするのでDataform ServiceAccountには適切なGCSの権限設定が必要です。

test_sales_external.sqlx
config {
    type: 'operations',
    hasOutput: true,
    tags: ['merge_table'],
}

create or replace external table ${self()}
(
    id int64
    ,sales_amount int64
)
options (
    format = 'CSV',
    field_delimiter = '\t',
    skip_leading_rows = 1,
    uris = ['gs://your-bucket/*.tsv']
)
;

利用したデータファイルです。

test_sales_external_1.tsv
id	sales_amount
1	100
2	200
3	300
4	400
5	500

テーブルをクエリした結果です。
image.png

2. 項番1で作成したテーブルを利用してテーブルを更新する。更新はMerge文を実行する

Mergeでのデータ更新になる為type: 'incremental'を記載します。type: 'incremental'は実際にはInsertかMergeが実行されます。configにuniqueKeyを指定するとMerge、指定しないとInsert処理となります。Dataformのメイン処理の1つです。uniqueKeyはMergeを実行する際のテーブル結合キーです。
初回実行時(正確にはテーブルが存在しない場合だと思われる)にはCTASが実行されテーブルが作成されます。2回目以降(正確にはテーブルが存在する場合と思われる)は、MergeまたはInsertが実行されます。Merge、Insert処理を成功させるにはターゲットテーブル(ここでの例だとtest_sales_merge)とソーステーブル(ここでの例だとtest_sales_external)が同じカラム構成を保っている必要があります。カラム変更が発生する場合はCTASでターゲットテーブルを再作成することも可能です。
この例でもref関数を利用しているので以下のグラフが作成されます。

明示的にテーブル名を指定していないのでファイル名がテーブル名となります。

test_sales_merge.sqlx
config {
    type: 'incremental',
    uniqueKey: ['id'],
    tags: ['merge_table'],
}

select 
    id
    ,sales_amount
from
    ${ref('test_sales_external')}

ワークフローを実行する

最終的なグラフです。

実行内容のグラフです。

出来上がったワークフローを実行するとtest_sales_mergeが正しい結果で実行できました(テーブルも作成されています)。
image.png

2回目以降、Merge処理が正しく実行されているか確認してみます。
初回実行で利用したtest_sales_external_1.tsvをGCSから削除の上データファイルを置き換えます。
id 5sales_amountがUpdateされid 6がInsertされていれば成功です。

test_sales_external_2.tsv
id	sales_amount
6	6000
5	5000

正しい結果で実行できました。
id 5のsales_amountが500から5000にupdateされid 6がInsertされています。
image.png

終わりに

簡単な検証ではありますが、実装もめちゃくちゃ簡単でした。SQLさえ書ければすぐに実装できるので抵抗なく利用できそうです。
後はBigquery上からのスケジュール実行や、エラーが発生した際にリトライ•通知(理想はSlack)が出来るようになると更に使いやすくなると思います。
ありがとうございました。

参照

Google CloudのDataform公式ドキュメント

Dataform自体のドキュメント(旧?)

6
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
4