10
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Embulkを使ってS3からBigQueryへデータをフィルタリングしてロードする

Posted at

BigQueryは高速で安価なデータ分析サービスとして有用であり、BigQueryにデータをロードするツールとして、Embulkはとても相性がよい。
データをロードする際、取り込んではいけないデータや不必要なデータは落としてからロードする必要があるケースがある。
例えば以下のjsonでuserIdを取り除いてデータロードするケースについて記載する。

{
"a":{
   "b":{
     userId:"xxx"
    }
  }
"c":"yyy"
}

■インストール
・Javaのインストールと環境変数設定:省略
・Embulkのインストール

curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc

・S3 inputプラグインのインストール
https://github.com/embulk/embulk-input-s3

$ embulk gem install embulk-input-s3

・filter-json_keyのインストール
https://github.com/civitaspo/embulk-filter-json_key

$embulk gem install embulk-filter-json_key

・BigQuery outputプラグインのインストール
https://github.com/embulk/embulk-output-bigquery

$ embulk gem install embulk-output-bigquery

■設定ファイルの作成

config.yaml
in:
  type: s3
  bucket:★S3のバケット名
  path_prefix: ★S3に配置したファイルへのパス
  endpoint:★エンドポイント。s3-us-west-1.amazonaws.comみたいなの
  auth_method: basic
  access_key_id:★AWSのアクセスキーID
  secret_access_key:★AWSのシークレットアクセスキー
  parser:
    type: csv
    delimiter: "\t"
    charset: UTF-8
    newline: CRLF
    null_string: 'NULL'
    skip_header_lines: 0
    comment_line_marker: '#'
    allow_extra_columns: true
    columns:
      - {name: json_payload, type: string}
filters:
  - type: json_key
    column: json_payload
    nested_key_delimiter: "."
    drop_keys:
    - {key: "a.b.userId"}
out:
  type: bigquery
  mode: append
  auth_method: json_key
  json_keyfile:★サービスアカウントやユーザのシークレットキー(json)
  project: ★GCPのプロジェクト
  dataset: ★BigQueryのデータセット
  table: ★BigQueryのテーブル
  auto_create_dataset: true
  auto_create_table: true
  schema_file: insert_schema.json ★BQのテーブルのスキーマ
  column_options:
    - {name: json_payload, type: string }
  timeout_sec: 300
  open_timeout_sec: 300
  retries: 3
  path_prefix: temp_data
  file_ext: .gz
  delete_from_local_when_job_end: true
  source_format: CSV
  max_bad_records: 0
  formatter:
    type: jsonl
insert_schema.json
[
 { "name":"payload", "type":"string", "mode":"nullable", "description":"1column1json" }
]

■実行

$ embulk run config.yaml
10
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
10
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?