LoginSignup
13

More than 5 years have passed since last update.

Embulkプラグイン(+ Digdag) memo

Last updated at Posted at 2016-09-06

Embulkの各プラグインのconfig設定例をメモしていきます。
(MySQL、Bigquery、Redshit、Redis、S3など)

プラグインのインストールが必要
例:embulk gem install embulk-output-bigquery

embulkをたくさん順序よく叩いて、あいまにちょっとした処理も行いたいので、
digdagも試します。

Embulkプラグインのconfig(liquid前提)

Input

embulk-input-s3

in:
  type: s3
  path_prefix: yyy/xxxxxx
  file_ext: .zip
  access_key_id: {{ env.AWS_ACCESS_KEY }}
  secret_access_key: {{ env.AWS_SECRET_KEY }}
  iam_user_name: yyy-xxxxxx-access
  bucket: {{ env.S3_YY_BUCKET }}
  endpoint: s3-ap-northeast-1.amazonaws.com
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    max_quoted_size_limit: 134217728
    columns:
    - {name: url, type: string}
    - {name: site, type: string}
    - {name: contents, type: string}

embulk csv

in:
  type: file
  path_prefix: "yyy/xxxxxx.csv"
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    default_timezone: "Asia/Tokyo"
    delimiter: ','
    null_string: 'NULL'
    columns:
    - {name: x, type: string}
    - {name: y, type: string}

embulk-input-gcs

in:
  type: gcs
  bucket: xx-yyy-zzz
  path_prefix: xxx/zzz_
  auth_method: private_key
  service_account_email: {{ env.SERVICE_ACCOUNT_EMAIL }} 
  p12_keyfile: ../bq/key/{{ env.P12_FILENAME }}
  application_name: zzzz
  tasks: 1
  parser:
    charset: UTF-8
    newline: LF
    header_line: true
    type: csv
    delimiter: ','
    quote: '"'
    columns:
    - {name: site, type: string}
    - {name: category, type: string}

embulk-input-redis

in:
  type: redis 
  host: localhost
  port: 6379
  db: 1
  key_prefix: yyy-xxxxxx
  encode: hash
  columns:
  - {name: site, type: string}
  - {name: category, type: string}

embulk-input-mysql

in:
  type: mysql
  host: {{ env.XXXXX_DB_HOST }}
  user: {{ env.XXXXX_DB_USER }}
  password: {{ env.XXXXX_DB_PASSWORD }} 
  database: {{ env.XXXXX_DB_NAME }}
  table: xxxx
  select: "aaa, bbb, ccc"
  where: "status = 0 AND id = 12345"

embulk-input-redshift

  type: redshift
  host: {{ env.XXXXX_RS_DB_HOST }}
  user: {{ env.XXXXX_RS_DB_USER }}
  password: {{ env.XXXXX_DATABASE_PASSWORD }}
  database: {{ env.XXXXX_DATABASE_NAME }}
  table: xxx
  select: "name,url,title"
  fetch_rows: 1000

Output

embulk-output-bigquery

out:
  type: bigquery
  mode: append
  auth_method: private_key
  service_account_email: {{ env.SERVICE_ACCOUNT_EMAIL }}
  p12_keyfile: key/{{ env.P12_FILENAME }}
  path_prefix: tmp/
  file_ext: csv.gz
  source_format: CSV
  project: {{ env.XXXX_PROJECT }}
  dataset: xxx_yyyy_zzz
  auto_create_table: true
  table: xxxes
  schema_file: schema/yyyy_zzzz_schema.json
  prevent_duplicate_insert: true
  formatter:
    type: csv
    header_line: false
    timezone: Asia/Tokyo
  encoders:
  - {type: gzip}
  allow_quoted_newlines: 1

大量データを投入する場合にはtimeout_secとopen_timeout_secを設定

timeout_sec: 6000
open_timeout_sec: 6000

そうしないと

Caused by: org.jruby.exceptions.RaiseException: (TransmissionError) execution expired

embulk-output-td

out:
  type: td
  apikey: {{ env.TD_API_KEY }}
  endpoint: api.treasuredata.com
  database: xxx_yyyy_zzz
  table: xxxes
  default_timezone: Asia/Tokyo 

embulk-output-redis

out:
  type: redis
  host: localhost
  port: 6379
  db: 0
  key_prefix: "yyy-xxxxxx"
  encode: hash
  key: url
  columns:
  - {name: url, type: string}
  - {name: category, type: string}

embulk-output-dynamodb

out:
  type: dynamodb
  mode: upsert
  region: ap-northeast-1
  auth_method: basic
  access_key_id: {{ env.AWS_ACCESS_KEY }}
  secret_access_key: {{ env.AWS_SECRET_KEY }}
  auto_create_table: false
  table: xxxes
  primary_key: id
  primary_key_type: Number
  write_capacity_units:
    normal: 5
    raise: 20
  read_capacity_units:
    normal: 6
    raise: 30

embulk-output-mysql

out:
  type: mysql
  host: {{ env.XXX_DB_HOST }}
  user: {{ env.XXX_DB_USER }}
  password: {{ env.XXX_DB_PASSWORD }}
  database: {{ env.XXX_DB_NAME }}
  table: yyyy
  mode: truncate_insert
  default_timezone: "Japan"

embulk csv

out:
  type: file
  path_prefix: tmp/yyy
  file_ext: txt
  formatter:
    type: csv
    charset: UTF-8
    delimiter: '\'
    header_line: false
    newline: LF
    quote: ''
    quote_policy: NONE
    escape: '\'

embulk-output-redshift

out:
  type: redshift
  host: {{ env.XXXXX_RS_DB_HOST }}
  user: {{ env.XXXXX_RS_DB_USER }}
  password: {{ env.XXXXX_RS_DB_PASSWORD }}
  database: {{ env.XXXXX_RS_DB_NAME }}
  table: xxxes_tmp
  access_key_id: {{ env.AWS_ACCESS_KEY }}
  secret_access_key: {{ env.AWS_SECRET_KEY }}
  iam_user_name: yyy-xxxxxx-access
  s3_bucket: {{ env.S3_BUCKET }}
  s3_key_prefix: redshift
  mode: truncate_insert
  default_timezone: "Japan"

Filter

embulk-filter-ruby_proc

filters:
  - type: ruby_proc
    requires:
      - cgi
    variables:
      multiply: 1
    before:
      - proc: |
          -> do
            puts "before proc"
            @started_at = Time.now
          end
    after:
      - proc: |
          -> do
            puts "after proc"
            p Time.now - @started_at
          end
    columns:
      - name: words
        proc: |
          ->(words) do
            words # ここにrubyで処理を書く
          end
        type: string

parser

embulk-parser-none

parseさせたくないときもあります。

in:
  type: file
  path_prefix: example/example.jsonl
  parser:
    type: none
    column_name: payload
out:
  type: bigquery
  payload_column_index: 0 # or, payload_column: payload

etc

max_quoted_size_limit

The size of the quoted value exceeds the limit size (131072)
http://www.embulk.org/docs/built-in.html
134217728などの大きめの値にしたら、エラーが出なくなった

Digdag

install

sudo curl -o /usr/local/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
sudo chmod +x /usr/local/bin/digdag

javaのバージョンがJava8以上でないとだめ

sudo yum -y remove java
sudo yum -y install java-1.8.0-openjdk-devel
java -version
javac -version

ワークフローの初期化・作成

digdag init workflow
2016-07-06 15:14:26 +0900: Digdag v0.8.3
  Creating workflow/.gitignore
  Creating workflow/tasks/shell_sample.sh
  Creating workflow/tasks/repeat_hello.sh
  Creating workflow/tasks/__init__.py
  Creating workflow/workflow.dig
Done. Type `cd workflow` and then `digdag run workflow.dig` to run the workflow. Enjoy!
workflow.dig
timezone: Asia/Tokyo

+step1:
  sh>: embulk run s3/yyy-xxxxxx.yml.liquid

+last:
  py>: tasks.XXXYYYWorkflow.last_notification
tasks/__init__.py
#coding:utf-8

import os
import slackweb
import digdag

class XXXYYYWorkflow(object):
    def xxx_notification(self):
        slack = slackweb.Slack(url=os.environ["SLACK_INCOMING_URL"])
        slack.notify(text="This is a *digdag test*...", \
        channel="#system-log", username="dig_dagger", icon_emoji=":coffee", mrkdwn=True)

実行

embulkを実行し、最後にslackへ通知した
> digdag run workflow

Success. Task state is saved at .digdag/status/20160706T000000+0900 directory.

再実行

一度実行すると.digdag/status下に実行ログ(&ステータス)がたまる。
その下のフォルダやファイルを削除しても再実行できるが、rerunというオプションもある
> digdag run --rerun workflow

architecture

結論

  • embulkはすばらしい!
  • digdagは今後に期待

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13