Embulkの各プラグインのconfig設定例をメモしていきます。
(MySQL、Bigquery、Redshit、Redis、S3など)
プラグインのインストールが必要
例:embulk gem install embulk-output-bigquery
embulkをたくさん順序よく叩いて、あいまにちょっとした処理も行いたいので、
digdagも試します。
Embulkプラグインのconfig(liquid前提)
Input
embulk-input-s3
in:
type: s3
path_prefix: yyy/xxxxxx
file_ext: .zip
access_key_id: {{ env.AWS_ACCESS_KEY }}
secret_access_key: {{ env.AWS_SECRET_KEY }}
iam_user_name: yyy-xxxxxx-access
bucket: {{ env.S3_YY_BUCKET }}
endpoint: s3-ap-northeast-1.amazonaws.com
decoders:
- {type: gzip}
parser:
charset: UTF-8
newline: LF
type: csv
delimiter: ','
quote: '"'
escape: '"'
max_quoted_size_limit: 134217728
columns:
- {name: url, type: string}
- {name: site, type: string}
- {name: contents, type: string}
embulk csv
in:
type: file
path_prefix: "yyy/xxxxxx.csv"
parser:
charset: UTF-8
newline: LF
type: csv
default_timezone: "Asia/Tokyo"
delimiter: ','
null_string: 'NULL'
columns:
- {name: x, type: string}
- {name: y, type: string}
embulk-input-gcs
in:
type: gcs
bucket: xx-yyy-zzz
path_prefix: xxx/zzz_
auth_method: private_key
service_account_email: {{ env.SERVICE_ACCOUNT_EMAIL }}
p12_keyfile: ../bq/key/{{ env.P12_FILENAME }}
application_name: zzzz
tasks: 1
parser:
charset: UTF-8
newline: LF
header_line: true
type: csv
delimiter: ','
quote: '"'
columns:
- {name: site, type: string}
- {name: category, type: string}
embulk-input-redis
in:
type: redis
host: localhost
port: 6379
db: 1
key_prefix: yyy-xxxxxx
encode: hash
columns:
- {name: site, type: string}
- {name: category, type: string}
embulk-input-mysql
in:
type: mysql
host: {{ env.XXXXX_DB_HOST }}
user: {{ env.XXXXX_DB_USER }}
password: {{ env.XXXXX_DB_PASSWORD }}
database: {{ env.XXXXX_DB_NAME }}
table: xxxx
select: "aaa, bbb, ccc"
where: "status = 0 AND id = 12345"
embulk-input-redshift
type: redshift
host: {{ env.XXXXX_RS_DB_HOST }}
user: {{ env.XXXXX_RS_DB_USER }}
password: {{ env.XXXXX_DATABASE_PASSWORD }}
database: {{ env.XXXXX_DATABASE_NAME }}
table: xxx
select: "name,url,title"
fetch_rows: 1000
Output
embulk-output-bigquery
out:
type: bigquery
mode: append
auth_method: private_key
service_account_email: {{ env.SERVICE_ACCOUNT_EMAIL }}
p12_keyfile: key/{{ env.P12_FILENAME }}
path_prefix: tmp/
file_ext: csv.gz
source_format: CSV
project: {{ env.XXXX_PROJECT }}
dataset: xxx_yyyy_zzz
auto_create_table: true
table: xxxes
schema_file: schema/yyyy_zzzz_schema.json
prevent_duplicate_insert: true
formatter:
type: csv
header_line: false
timezone: Asia/Tokyo
encoders:
- {type: gzip}
allow_quoted_newlines: 1
大量データを投入する場合にはtimeout_secとopen_timeout_secを設定
timeout_sec: 6000
open_timeout_sec: 6000
そうしないと
Caused by: org.jruby.exceptions.RaiseException: (TransmissionError) execution expired
embulk-output-td
out:
type: td
apikey: {{ env.TD_API_KEY }}
endpoint: api.treasuredata.com
database: xxx_yyyy_zzz
table: xxxes
default_timezone: Asia/Tokyo
embulk-output-redis
out:
type: redis
host: localhost
port: 6379
db: 0
key_prefix: "yyy-xxxxxx"
encode: hash
key: url
columns:
- {name: url, type: string}
- {name: category, type: string}
embulk-output-dynamodb
out:
type: dynamodb
mode: upsert
region: ap-northeast-1
auth_method: basic
access_key_id: {{ env.AWS_ACCESS_KEY }}
secret_access_key: {{ env.AWS_SECRET_KEY }}
auto_create_table: false
table: xxxes
primary_key: id
primary_key_type: Number
write_capacity_units:
normal: 5
raise: 20
read_capacity_units:
normal: 6
raise: 30
embulk-output-mysql
out:
type: mysql
host: {{ env.XXX_DB_HOST }}
user: {{ env.XXX_DB_USER }}
password: {{ env.XXX_DB_PASSWORD }}
database: {{ env.XXX_DB_NAME }}
table: yyyy
mode: truncate_insert
default_timezone: "Japan"
embulk csv
out:
type: file
path_prefix: tmp/yyy
file_ext: txt
formatter:
type: csv
charset: UTF-8
delimiter: '\'
header_line: false
newline: LF
quote: ''
quote_policy: NONE
escape: '\'
embulk-output-redshift
out:
type: redshift
host: {{ env.XXXXX_RS_DB_HOST }}
user: {{ env.XXXXX_RS_DB_USER }}
password: {{ env.XXXXX_RS_DB_PASSWORD }}
database: {{ env.XXXXX_RS_DB_NAME }}
table: xxxes_tmp
access_key_id: {{ env.AWS_ACCESS_KEY }}
secret_access_key: {{ env.AWS_SECRET_KEY }}
iam_user_name: yyy-xxxxxx-access
s3_bucket: {{ env.S3_BUCKET }}
s3_key_prefix: redshift
mode: truncate_insert
default_timezone: "Japan"
Filter
embulk-filter-ruby_proc
filters:
- type: ruby_proc
requires:
- cgi
variables:
multiply: 1
before:
- proc: |
-> do
puts "before proc"
@started_at = Time.now
end
after:
- proc: |
-> do
puts "after proc"
p Time.now - @started_at
end
columns:
- name: words
proc: |
->(words) do
words # ここにrubyで処理を書く
end
type: string
parser
embulk-parser-none
parseさせたくないときもあります。
in:
type: file
path_prefix: example/example.jsonl
parser:
type: none
column_name: payload
out:
type: bigquery
payload_column_index: 0 # or, payload_column: payload
etc
max_quoted_size_limit
The size of the quoted value exceeds the limit size (131072)
http://www.embulk.org/docs/built-in.html
134217728などの大きめの値にしたら、エラーが出なくなった
Digdag
install
sudo curl -o /usr/local/bin/digdag --create-dirs -L "https://dl.digdag.io/digdag-latest"
sudo chmod +x /usr/local/bin/digdag
javaのバージョンがJava8以上でないとだめ
sudo yum -y remove java
sudo yum -y install java-1.8.0-openjdk-devel
java -version
javac -version
ワークフローの初期化・作成
digdag init workflow
2016-07-06 15:14:26 +0900: Digdag v0.8.3
Creating workflow/.gitignore
Creating workflow/tasks/shell_sample.sh
Creating workflow/tasks/repeat_hello.sh
Creating workflow/tasks/__init__.py
Creating workflow/workflow.dig
Done. Type `cd workflow` and then `digdag run workflow.dig` to run the workflow. Enjoy!
timezone: Asia/Tokyo
+step1:
sh>: embulk run s3/yyy-xxxxxx.yml.liquid
+last:
py>: tasks.XXXYYYWorkflow.last_notification
#coding:utf-8
import os
import slackweb
import digdag
class XXXYYYWorkflow(object):
def xxx_notification(self):
slack = slackweb.Slack(url=os.environ["SLACK_INCOMING_URL"])
slack.notify(text="This is a *digdag test*...", \
channel="#system-log", username="dig_dagger", icon_emoji=":coffee", mrkdwn=True)
実行
embulkを実行し、最後にslackへ通知した
> digdag run workflow
Success. Task state is saved at .digdag/status/20160706T000000+0900 directory.
再実行
一度実行すると.digdag/status下に実行ログ(&ステータス)がたまる。
その下のフォルダやファイルを削除しても再実行できるが、rerunというオプションもある
> digdag run --rerun workflow
architecture
結論
- embulkはすばらしい!
- digdagは今後に期待