概要
TreasureDataにある大量データ(100GBレベルのtableデータを複数)をSnowflakeへ取り込む処理を実行しました。
しかし、一筋縄ではいかず、試行錯誤経て比較的シュッとできる方法に辿りつきました。役立ったのはSnowflakeの半構造化データ機能でした。
ここでは、同じような問題に直面された方のお役に立てればと結論と伴に試行錯誤の過程を記します
※1年近く運用してわかった改良版も投稿済みです、ご参照ください
結論
TDからSnowflakeへの大量データの移行は「TD→Snowflake」の直接移行は不可能でしたが、
**「TD→クラウドストレージ(s3)→Snowflake」**とクラウドストレージ経由にすることで、ほぼ手動調整無しに実現可能になりました。
手順は以下になります。
1. Snowflakeに適切なtable定義を作る。
TDのSnowflakeExportIntegrationを使ってSnowflakeへ一行だけデータを書き出す。
2. TDからクラウドストレージへデータをexportする。
TDのS3ExportIntegrationV2(AWSの場合)を使うワークフローを作成する。
- Snowflakeバルクロード時の最適データ(圧縮済み100~250MB/ファイル)に合うデータを出力するクエリを回すワークフローを実行。(TD_TIME_RANGE関数でtime範囲を絞るクエリをfor_range等で回す)
###●S3ExportIntegrationV2の設定
- 出力形式を「jsonl」に指定する。(Snowflake側で半構造化機能を使うため)
- gz圧縮を指定する。
- Hive2020.1を使う。(「engine hive」「engine_version: stable」を指定)
3. クラウドストレージ(s3)からSnowflakeへデータをロードする
Snowflakeにて、1で作られたtable定義に対してデータをロードする。
- 「1.」で定義を作ったテーブルからクリーンなテーブルを作成。
create table [NEW_TABLE] like [1.の定義済みtable];
cf. バリアント構文 - Snowflake Documentation
-
並列ロードさせたいファイル数に応じてウェアハウスサイズを調整する。
8:XS, 16:S, 32:M, 64:M, 128:XL, ....etc
-
半構造化データのバルクロードを実行する。
COPYコマンドに「MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE」オプションをつけて実行。
COPY INTO [NEW_TABLE]
FROM @[外部ステージ]
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
FILE_FORMAT = (FORMAT_NAME=[type=json compression=gzipを定義したfileformat]);
cf. CREATE FILE FORMAT - Snowflake Documentation
以下、試行錯誤過程。
大量データではSnowflakeExportIntegration(TD)が使えない...
TDデータをSnowflakeへ移行する場合。順当に考えるとSnowflakeExportIntegrationを使う方法ですが、結論「大量データ」のexportには使えませんでした。これは、不具合ではなく、大量データexportを想定してないという感じです。現に100万行程度のtableデータの移行は成功しました。
Snowflake Export Integration - TreasureData Documentation
なぜSnowflakeExportIntegrationは大量データの書き出しに使えないのか?
実行時の履歴を確認すると。大量のinser処理が走る形になってます...成功した移行処理もWarehouseが長時間稼働を経て完了となっていました...
実装まで追えてはないですが、TDのSnowflakeExportIntegrationは大量データの移行に適した方法ではないと思えます。(バルクロードのCOPYコマンドを使った他tableの移行は1/10以下のコストでした)
TD→クラウドストレージ→Snowflakeへ切り替え。
大量データ移行ではTD→Snowflakeの直接移行に挫折しました。諦めるわけにはいかないので、
次はクラウドストレージを経由した移行法を試しました。
前半:TD→クラウドストレージ(s3)
ここは順当にS3ExportIntegrationV2が使えます。
Amazon S3 Export Integration v2 - TreasureData Documentation
ロードを考慮したexport指定。
大量データ出力の場合hive一択と思いますが、Snowflake側でのロードを考慮した出力ファイルを作る必要があります。
- Snowflakeバルクロード時の最適データにあうデータサイズ(圧縮済み100~250MB/ファイル)で出力するワークフローを作る。(TD_TIME_RANGE関数で1クエリの出力データ量を調節する)
- 半構造化機能を使うため出力形式をjsonlに指定する。
- gz圧縮を指定
- Hive2020.1を使う。(「engine_version: stable」を指定)
ワークフロー例
_export:
td:
database: [db]
table_list:
- [tables...]
end_date: 'yyyy-mm-dd'
+s3v2_export_task:
for_each>:
target_table: ${table_list}
_do:
for_range>:
from: 0
to: 720
step: 1
_parallel: true
_do:
td>:
query: select * from ${target_table} where TD_TIME_RANGE(time, TD_TIME_ADD(TD_TIME_PARSE("${end_date}", 'JST'), "-${range.to}d", 'JST'), TD_TIME_ADD(TD_TIME_PARSE("${end_date}", 'JST'), "-${range.from}d", 'JST'))
database: ${td.database}
engine: hive
engine_version: stable
result_connection: [s3_v2]
result_settings:
bucket: [s3_bucket]
path: [file_path]/data_${range.to}.jsonl.gz
sse_type: sse-s3
format: jsonl
compression: gz
header: false
delimiter: default
null_value: empty
newline: LF
quote_policy: MINIMAL
escape: null
quote: null
part_size: 10
後半:クラウドストレージ(s3)→Snowflake
この段になってデータ連携あるあるな2つの地道ポイントに突き当たりました。
地道ポイント1
Snowflakeへ取り込むためには、tableをあらかじめ作る必要がある。
つまりTDのtable定義を元に、適切にcreate table文を作らないといけない...
(辛いところはカラム数が100程度のtableが複数あるところ...手動で対応するとtypoやらカラム順違いやら地道な確認作業時間がどれだけかかるか...)
cf. データ型の概要 - Snowflake Documentation
地道ポイント2
Snowflakeのtable定義を作り終わったところで、
いざ、s3データのロードを開始すると...早速のエラー。地道に解決するしかないが、途方にくれる…
ハイブリッド方式
最終的に辿りついたのは、
TDの「SnowflakeExportIntegration」「S3ExportIntegrationV2」とSnowflakeの半構造化機能を使ったハイブリッド方式でした。
地道ポイント1の解決
TDのtable定義がSnowflakeのtable定義へ適切に移行されればよい。
⇒「SnowflakeExportIntegration」を使って一行だけSnowflakeへデータを書き出す。これにより自動でSnowflakeへtableが作成される。
地道ポイント2の解決
ポイント1で解決したtable定義が正しければ確実にロードできる方法で実行する。
Snowflakeのロードコマンド(COPY)には半構造化データに対応した「MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE」が用意されている。
これを使うと、ロードデータのオブジェクト(json)とカラム名が合致した場所にデータが入るため、table定義さえ正しければ、ほぼ確実にロードが成功する。(失敗したとしてもエラーが明確で微修正程度)
コピーオプション - Snowflake Documentation
さいごに
「外部のデータを1ストップでロードできない」という一見地味な問題に対して、「運用でカバー」(=手動調整)という危うくデータ連携あるあるな泥沼にハマりそうになりましたが、Snowflakeの柔軟なロード機能により救われた気がします。
Snowflakeが標榜する特性の一つに「メンテナンスを極限まで削減 as a Service」というものがありますが、外部データの取り込み機能にその可能性の一端を見ることができました。