Embulk(エンバルク)組み込みプラグインの設定覚え書き


Embulk組み込みプラグインの設定覚え書き

その他の情報はFluentdのバッチ版Embulkのまとめをご覧ください。

Embulkには組み込みでいくつかのプラグインが用意されています。この資料ではEmbukの基本的な概念を説明したのち、組み込みのプラグインについて解説をおこないます。

詳細は、オフィシャルのマニュアルを参照してください。Configuration (英語)


1, レコード、カラム、そして型

Embulkは、内部でデータをレコードとして取り扱いします。

レコードは、複数のカラムから構成されるデータ1件のことを表します。

例えば住所録では、人1名の情報がレコードです。1名の情報には、氏名や

年齢といったその人固有の情報があります。この固有の情報一つ一つが

カラムです。それぞれのカラムには型があります。姓や名であれば文字列、

年齢であれば数字(整数)、生年月日であれば日付(日時)、体重であれば小数点といったものが

型になります。

姓:文字列
名:文字列
年齢:整数
生年月日:日時
体重:浮動小数点

山田
太郎
20
2000/01/01
65.5

田中
花子
25
2003/12/12
45.3


1.1, レコード内で利用できる型

Embulkではカラム(スキーマ)では、次の型をサポートしています。対応する各言語の型は次のとおりです。(この情報プラグインを開発する際に必要です)

JSON型はembulk 0.8からサポート

Embulk
説明
Ruby
Java

boolean
真偽値
Boolean
Boolean

long
整数型
Integer
Long

timestamp
時刻
Time
Timestamp

double
浮動小数点
Float
Double

string
文字列
String
String

json
JSON
確認中
確認中


2, 設定ファイル

Embulkの設定はYAML形式と呼ばれる構造化形式で記述を行います。


embulk.yaml

in:

type: file
path_prefix: /data/embulk/datas/access_log-
decoders:
- type: gzip
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: ''
null_string: 'NULL'
header_line: true
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
# filters:
out:
type: stdout


  • in: インプットプラグインに関する記述

  • out: アウトプットプラグインに関する記述

  • filters: フィルタプラグイン関する記述

  • exec: エクゼキュータに関する記述


3, 組み込みインプットプラグイン

Embukのインプットプラグインは、データベースやクラウドサービスなどに格納された

ファイルやデータベースのテーブルをレコードの元データとして取り込みします。

Embulk本体には、ローカルのファイルからデータを読み込みするためのファイルプラグインが

搭載されていいます。

inのエントリの中のtypeの所に、インプットプラグインで利用するプラグインの種類を記述します。


input.yaml

in:

type: # ここに種類を書く

Emblulk組み込みで指定できるインプットプラグイン

type
入力元
備考

file
ファイル


3.1, ファイルプラグインの詳細

fileプラグインで指定できる設定は次のとおりです。

設定項目
データ型
説明
初期値等

path_prefix
文字列
入力ファイル名のプレフィックスパス

decoders
配列
デコーダーに関する設定(後述)

parser
ハッシュ
パーサーに関する設定(後述)

last_path
文字列
最後に読み込みしたファイルを記述

記述する内容は多岐にわたりますが、Embulkにはguessという設定ファイルを自動生成してくれる便利な機能があります。通常はこちらを使い、必要な場合にのみ一部を修正するという手順になります。

ただしデータの場所を記述するpath_prefixは利用者自身が設定をおこないます。

path_prefixはEmbulkで読み込みを行いたいファイルの場所をディレクトリも含めて記述します。

例えば、ファイル/data/embulk/datasの中に、access_log-YYYYMMDD.gzというファイルがある場合次のように記述します。


  • path_prefix: /data/embulk/datas/access_log-

また、last_pathはEmbulkが記述するもので、ユーザ自身が設定をすることは通常ありません。

※ path_prefixに指定したファイルはUnicordのソート順所によってOSに指定された文字列のソート順序によって読み込まれます。last_pathが指定されていると、指定されたファイルよりソート順があとのファイルのみデータが読み込まれます。

last_pathのソート順はOS非依存です。unicodeの順序に依存します。 (2015/03/02追記)

随時データが作成される場合、ファイルが古いファイルよりも後ろに並ぶようにファイルを命名してください。


4, デコーダ

読み込みするデータは、gzipなどのプログラムのよって圧縮されている場合があります。ファイルを読み込みする際に、伸張しながらデータを読み込みするのがデコーダです。また標準のプラグインには用意されていませんが、暗号化されたデータを複合化しながら読み込みするのもデコーダの役割です。

type
説明
備考

gzip
gzip形式のデータを伸張します

bzip2
bzip2形式のデータを伸張します
v0.8.4~

gzip,bzip2デコーダには現在オプションがありません。


5, パーサー

インプットプラグインで読み込んだデータを整形して、レコードの形式にするのがパーサです。

組み込みプラグインではCSVとJSONを読み込みするためのプラグインが用意されています。

type
説明

csv
CSV形式のデータを読み込む

json
JSON形式のデータを読み込む


5.1, CSVパーサープラグイン

設定キー
説明

delimiter
区切り文字を指定します。',', タブの場合は"\t"

quote
文字列として扱う場合に利用されている記号

escape
特別な文字をエスケープするための文字

skip_header_lines
スキップヘッダの行数(0.5.2以降)

null_string
NULL値として扱う文字列

trim_if_not_quoted
クォートされていないと文字列前後の空白を削除、初期値false

comment_line_maker
行頭が指定した文字列で始まっっていたらデータとして取り扱わずスキップする

allow_optional_columns
true時、カラムが足りない行をスキップする代わりにnullをセット(v0.5.4~)

allow_extra_columns
trueだった場合、おおすぎるカラムを無視する、falseの場合該当行をスキップする

max_quoted_size_limit
クォートの閉じ忘れなどがあった場合にスキップ

stop_on_invalid_record
パースエラーが発生した場合にトランザクションを停止するかどうか(デフォルトfalse)

default_timezone
Asia/Tokyo など初期値UTC

default_date
時刻のみのフィールドにつける日付(default: 1970-1-1) 0.8.9~

newline
改行の形式(CRLF, LF, CR)

charset
文字エンコーディング(UTF-8), Shift_JIS等

columns
カラムに関する情報、日(後述)

header_line(旧オプション)
ヘッダ行があるかどうか

trim_if_not_quotedについて

下記のようなデータで、trim_if_not_quotedがtrueの場合

,

"",""
,

出力結果は次のようになる。



,

,

,

skip_header_linesについて

次のようなデータがあり、skip_header_linesが4となっている場合、5行目からをデータとして利用します。skip_header_linesの導入でheader_lineは古い設定となりました。今後はskip_header_linesを使いましょう。

created date,2015-03-05

created by,me
device,xyz
time,key1,key2,key3,key4
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d

カラムに関する指定は次の通りです。

設定キー
説明

name
カラムの名前を指定します。

type
カラムの型を指定します。前述の型

format
カラムの型がtimestampの場合、日付の形式を指定します。

date
時刻のみのフィールドにつける日付

formatで指定する形式は、man strftime(3)または、strftimeなどを参考にしてください。

※ 日時のフォーマットが全て動作するかは未確認です。不具合があればお知らせください。

null_stringについて

設定
引用なし空文字列
引用あり空文字列
\N 引用なし
"\N" 引用あり

null_string: null(default)
null
空文字列
\N
\N

null_string: ""
null
null
\N
\N

null_string: \N
空文字列
空文字列
null
null


5.2, JSONパーサープラグイン

次のようなデータをすべてカラム名recordでjson型としてデータを読み込みします。

{"time":1455829282,"ip":"93.184.216.34","name":frsyuki}

{"time":1455829282,"ip":"172.36.8.109":sadayuki}
{"time":1455829284,"ip":"example.com","name":Treasure Data}
{"time":1455829282,"ip":"10.98.43.1","name":MessagePack}

設定キー
説明

stop_on_invalid_record
読み込みできないデータがあった場合に停止します。(デフォルトfalse)

Embulk 0.9.15で実験的に新機能が使えるようになっています。


6, 組み込みアウトプットプラグイン

typeに出力したいアウトプットを種類を指定します。現在次のアウトプットが指定できます。

type
出力先
備考

stdout
標準出力

file
ファイル

"null"
何も出力しない

#out: {type: stdout}

#out: {type: "null" } # クォートすること
out:
type: file
path_prefix: ./sampleo
file_ext: .csv.gz
formatter:
type: csv
header_line: true
charset: UTF-8
newline: CRLF
encoders:
- {type: gzip, level: 6 }


6.1, null

何も出力しないアウトプットプラグインです。typeを指定するときはクォートして"null"と記述してください。


6.2, stdout

オプションはありません。


6.3, file

fileアウトプットで指定できる設定は次のとおりです。

設定項目
データ型
説明
初期値等

path_prefix
文字列
出力ファイル名のプレフィックスパス

sequence_format
文字列
出力するファイル名につけるシーケンス
.%03d.%02d

file_ext
文字列
出力先ファイルの末尾の文字列

encoders
配列
エンコーダーに関する設定

formatter
ハッシュ
フォーマッターに関する設定


6.3.1, ファイル出力例


  • path_prefix: /path/to/embulk/sample

  • file_ext: csv

  • sequence_format: ".%03d.%02d"

  • 出力されるファイルの形式: /path/to/embulk/sample.000.00.csv


6.3.2, シーケンスフォーマット(sequence_format)とは

タスクインデックスはインプットプラグインのタスクの数で、インプットプラグインがFileやS3の場合、入力ファイル数がタスクインデックスの数になります。入力がデータベースなどタスク数が1の場合、タスクインデックス数は0となります。

ファイルインデックスは、一つのタスク内複数ファイルを取り扱う場合に増える番号ですが、現状は複数タスクをまとめるがないため常に0になります。


7, 組み込みフォーマッター

typeを使って、利用するフォーマッタを指定します。

利用できるフォーマッタは次のとおりです。

type
フォーマッタ説明

csv
CSV形式の出力をするフォーマッタ


7.1, CSVフォーマッタ

設定キー
データ型
説明

delimiter
文字列
区切り文字を指定します。未指定時",", タブの場合は"\t"

quote
文字列
クォートに使う文字

quote_policy
列挙型
クォートの方針(ALL, MINIMAL, NONE)

escape
文字列
特別な文字をエスケープする文字

header_line
真偽値
ヘッダを出力する(true)/出力しない(false)

null_string
文字列
NULLとして取り扱う文字

newline
列挙型
改行の形式(CRLF, LF, CR)

newline_in_field
列挙型
フィールド内の改行文字(CRLF, LF, CR)

charset
文字列
文字エンコーディング(UTF-8), Shift_JIS等

default_timezone
文字列
時刻出力時のタイムゾーン(初期値UTC)

quote_policy

設定キー
説明

NONE
クォートをしない

MINIMAL
必要な場合のみクォートフィールド内にデリミタ等がある場合のみ

ALL
全てのフィールドをクォート

quote_policy: NONEはEmbulk 0.7.5まで正常に動作しません。Embulk 0.7.6以降を使ってください。

column_options

設定キー
説明

timezone
時刻出力時のタイムゾーン(初期値UTC)

format
時刻のフォーマット(未指定時: %Y-%m-%d %H:%M:%S.%6N %z)


8, 組み込みエンコーダープラグイン

出力したファイルを暗号化したり圧縮するためのプラグインです。


8.1, gzip

gzip形式で圧縮をします。

設定キー
データ型
説明

level
数値
圧縮率(最大9), 最小0(非圧縮)、未指定時6


8.2, bzip2

bzip2形式で圧縮をします。(0.8.4~)

設定キー
データ型
説明

level
数値
圧縮率(最大9), 最小0(非圧縮)、未指定時9


9, 組み込みフィルタプラグイン


9.1, rename

カラムの名前を変更するプラグインです。columnsに変更したい元の名前、変更したい名前のペアをセットします。

下記の例では次のようにカラム名を変更します。


  • my_existing_column1をnew_column1に

  • my_existing_column2をnew_column2に

filters:

...
- type: rename
columns:
my_existing_column1: new_column1
my_existing_column2: new_column2

その他最新のrenameプラグインには様々な機能が追加されています。


9.2, remove_columns

カラムを削るプラグインです。詳細はオフィシャルドキュメントを参照してください。