やりたかったのはMySQL上にあるテーブルを1:nの関連テーブルとJOINしてグルーピングし、入れ子になったRECORD型を作ってBigQueryにアップロードすること。
例えば、以下の様なテーブルがあるとする。
users
| name | type | 
|---|---|
| id | INT | 
| nickname | VARCHAR | 
comments
| name | type | 
|---|---|
| id | INT | 
| body | VARCHAR | 
| user_id | INT | 
これをJOINして、最終的には以下の様なJSONを準備してBigQueryにアップロードしたい。
{"id": 1, "nickname": "joker1007", "comments": [{"id": 1, "body": "comment body"}, {"id": 2, "body": "next comment"}] 
これをembulkで実現するにはどうすればいいかを考えた。
まず、SQLで無理やりJSON文字列を作る。
SELECT
  CONCAT(
    '{',
    '"id":',users.id,
    ',"nickname":',IF(users.nickname, CONCAT('"', users.nickname, '"'), "null"),
    ',"comments":',CONCAT('[', GROUP_CONCAT((CONCAT('{"id":', comments.id), CONCAT(',"body":"',comments.body,'"}')), ']'),
    '}'
  ) AS payload
FROM users
INNER JOIN comments ON comments.user_id = users.id
GROUP BY
  users.id,
  users.nickname
異様に読み辛いので合ってるのか分からんけど、GROUP_CONCATと文字列加工を駆使して頑張って作る。
ちなみに上のクエリではエスケープとかは考慮してない。
そして、以下の様なコンフィグを書く
in:
  # 省略
out:
  type: bigquery
  auth_method: json_key
  json_keyfile: "./google-key.json"
  project: project_id
  dataset: dataset_id
  auto_create_table: true
  file_ext: json.gz
  source_format: NEWLINE_DELIMITED_JSON
  formatter:
    type: csv
    delimiter: "\0"
    quote:
    quote_policy: NONE
    header_line: false
  encoders:
  - type: gzip
  table: users
  schema_file: "./users_schema.json"
  path_prefix: "./users_load"
csv formatterの設定を調節しテキストがそのままファイルに出力されるようにする。
embulk-output-bigqueryは基本的な処理はほとんどFileOutputプラグインと同じで、こうしておくと一時ファイルとして出力されるテキストはいい感じに1行毎のJSONファイルになる。
後はembulk-output-bigqueryが普通にファイルをアップロードしてLoad Jobを実行してくれる。
SQLでJSON文字列を正しく生成することさえ出来れば、後はembulkプラグインの設定の書き方で何とかなることが分かった。まあ、それが辛いんだけど……。
MySQL-5.7系とかPostgreSQLのJSON関数が使えれば、もうちょっと楽にデータ生成が出来る気がするけど、MySQL-5.6系では辛い。
2016/2/3 追記
拙作のembulk-filter-ruby_procにより、もうちょっと楽にできるようになった。
SELECT
  users.id,
  users.nickname
  CONCAT('[', GROUP_CONCAT((CONCAT('{"id":', comments.id), CONCAT(',"body":"',comments.body,'"}')), ']') AS comments,
  "" AS payload
FROM users
INNER JOIN comments ON comments.user_id = users.id
GROUP BY
  users.id,
  users.nickname
in:
  # 省略
filter:
  - type: ruby_proc
    columns:
      - name: payload
        proc: |
          ->(_payload, record) do
            comments = JSON.parse(record["comments"])
            record.reject { |k, v| k == "payload" }.tap { |r|
              r["comments"] = comments
            }
          end
        type: json
  - type: column
    columns:
      - {name: payload}
out:
  type: bigquery
  auth_method: json_key
  json_keyfile: "./google-key.json"
  project: project_id
  dataset: dataset_id
  auto_create_table: true
  file_ext: json.gz
  source_format: NEWLINE_DELIMITED_JSON
  formatter:
    type: csv
    delimiter: "\0"
    quote:
    quote_policy: NONE
    header_line: false
  encoders:
  - type: gzip
  table: users
  schema_file: "./users_schema.json"
  path_prefix: "./users_load"
こんな感じで、SQLの出力からJSON型を直接作って後ろに回せるようになった。
後はfilter-columnでJSON型のカラムだけにして一時ファイルを出力する。
まあ、やりたい事次第では一部のデータについては頑張ってJSONをSQLで生成する必要はある。
CASE文と組み合わせてカラムを増やして調整できるなら、filter側で何とかする余地もある。
やっぱ、PostgreSQLなら、もうちょい楽になるかも……。
