BigQueryは高速で安価なデータ分析サービスとして有用であり、BigQueryにデータをロードするツールとして、Embulkはとても相性がよい。
データをロードする際、取り込んではいけないデータや不必要なデータは落としてからロードする必要があるケースがある。
例えば以下のjsonでuserIdを取り除いてデータロードするケースについて記載する。
{
"a":{
"b":{
userId:"xxx"
}
}
"c":"yyy"
}
■インストール
・Javaのインストールと環境変数設定:省略
・Embulkのインストール
curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
・S3 inputプラグインのインストール
https://github.com/embulk/embulk-input-s3
$ embulk gem install embulk-input-s3
・filter-json_keyのインストール
https://github.com/civitaspo/embulk-filter-json_key
$embulk gem install embulk-filter-json_key
・BigQuery outputプラグインのインストール
https://github.com/embulk/embulk-output-bigquery
$ embulk gem install embulk-output-bigquery
■設定ファイルの作成
config.yaml
in:
type: s3
bucket:★S3のバケット名
path_prefix: ★S3に配置したファイルへのパス
endpoint:★エンドポイント。s3-us-west-1.amazonaws.comみたいなの
auth_method: basic
access_key_id:★AWSのアクセスキーID
secret_access_key:★AWSのシークレットアクセスキー
parser:
type: csv
delimiter: "\t"
charset: UTF-8
newline: CRLF
null_string: 'NULL'
skip_header_lines: 0
comment_line_marker: '#'
allow_extra_columns: true
columns:
- {name: json_payload, type: string}
filters:
- type: json_key
column: json_payload
nested_key_delimiter: "."
drop_keys:
- {key: "a.b.userId"}
out:
type: bigquery
mode: append
auth_method: json_key
json_keyfile:★サービスアカウントやユーザのシークレットキー(json)
project: ★GCPのプロジェクト
dataset: ★BigQueryのデータセット
table: ★BigQueryのテーブル
auto_create_dataset: true
auto_create_table: true
schema_file: insert_schema.json ★BQのテーブルのスキーマ
column_options:
- {name: json_payload, type: string }
timeout_sec: 300
open_timeout_sec: 300
retries: 3
path_prefix: temp_data
file_ext: .gz
delete_from_local_when_job_end: true
source_format: CSV
max_bad_records: 0
formatter:
type: jsonl
insert_schema.json
[
{ "name":"payload", "type":"string", "mode":"nullable", "description":"1column1json" }
]
■実行
$ embulk run config.yaml