やりたかったのはMySQL上にあるテーブルを1:nの関連テーブルとJOINしてグルーピングし、入れ子になったRECORD型を作ってBigQueryにアップロードすること。
例えば、以下の様なテーブルがあるとする。
users
name | type |
---|---|
id | INT |
nickname | VARCHAR |
comments
name | type |
---|---|
id | INT |
body | VARCHAR |
user_id | INT |
これをJOINして、最終的には以下の様なJSONを準備してBigQueryにアップロードしたい。
{"id": 1, "nickname": "joker1007", "comments": [{"id": 1, "body": "comment body"}, {"id": 2, "body": "next comment"}]
これをembulkで実現するにはどうすればいいかを考えた。
まず、SQLで無理やりJSON文字列を作る。
SELECT
CONCAT(
'{',
'"id":',users.id,
',"nickname":',IF(users.nickname, CONCAT('"', users.nickname, '"'), "null"),
',"comments":',CONCAT('[', GROUP_CONCAT((CONCAT('{"id":', comments.id), CONCAT(',"body":"',comments.body,'"}')), ']'),
'}'
) AS payload
FROM users
INNER JOIN comments ON comments.user_id = users.id
GROUP BY
users.id,
users.nickname
異様に読み辛いので合ってるのか分からんけど、GROUP_CONCATと文字列加工を駆使して頑張って作る。
ちなみに上のクエリではエスケープとかは考慮してない。
そして、以下の様なコンフィグを書く
in:
# 省略
out:
type: bigquery
auth_method: json_key
json_keyfile: "./google-key.json"
project: project_id
dataset: dataset_id
auto_create_table: true
file_ext: json.gz
source_format: NEWLINE_DELIMITED_JSON
formatter:
type: csv
delimiter: "\0"
quote:
quote_policy: NONE
header_line: false
encoders:
- type: gzip
table: users
schema_file: "./users_schema.json"
path_prefix: "./users_load"
csv formatterの設定を調節しテキストがそのままファイルに出力されるようにする。
embulk-output-bigqueryは基本的な処理はほとんどFileOutputプラグインと同じで、こうしておくと一時ファイルとして出力されるテキストはいい感じに1行毎のJSONファイルになる。
後はembulk-output-bigqueryが普通にファイルをアップロードしてLoad Jobを実行してくれる。
SQLでJSON文字列を正しく生成することさえ出来れば、後はembulkプラグインの設定の書き方で何とかなることが分かった。まあ、それが辛いんだけど……。
MySQL-5.7系とかPostgreSQLのJSON関数が使えれば、もうちょっと楽にデータ生成が出来る気がするけど、MySQL-5.6系では辛い。
2016/2/3 追記
拙作のembulk-filter-ruby_procにより、もうちょっと楽にできるようになった。
SELECT
users.id,
users.nickname
CONCAT('[', GROUP_CONCAT((CONCAT('{"id":', comments.id), CONCAT(',"body":"',comments.body,'"}')), ']') AS comments,
"" AS payload
FROM users
INNER JOIN comments ON comments.user_id = users.id
GROUP BY
users.id,
users.nickname
in:
# 省略
filter:
- type: ruby_proc
columns:
- name: payload
proc: |
->(_payload, record) do
comments = JSON.parse(record["comments"])
record.reject { |k, v| k == "payload" }.tap { |r|
r["comments"] = comments
}
end
type: json
- type: column
columns:
- {name: payload}
out:
type: bigquery
auth_method: json_key
json_keyfile: "./google-key.json"
project: project_id
dataset: dataset_id
auto_create_table: true
file_ext: json.gz
source_format: NEWLINE_DELIMITED_JSON
formatter:
type: csv
delimiter: "\0"
quote:
quote_policy: NONE
header_line: false
encoders:
- type: gzip
table: users
schema_file: "./users_schema.json"
path_prefix: "./users_load"
こんな感じで、SQLの出力からJSON型を直接作って後ろに回せるようになった。
後はfilter-columnでJSON型のカラムだけにして一時ファイルを出力する。
まあ、やりたい事次第では一部のデータについては頑張ってJSONをSQLで生成する必要はある。
CASE文と組み合わせてカラムを増やして調整できるなら、filter側で何とかする余地もある。
やっぱ、PostgreSQLなら、もうちょい楽になるかも……。