Embulk組み込みプラグインの設定覚え書き
その他の情報はFluentdのバッチ版Embulkのまとめをご覧ください。
Embulkには組み込みでいくつかのプラグインが用意されています。この資料ではEmbukの基本的な概念を説明したのち、組み込みのプラグインについて解説をおこないます。
詳細は、オフィシャルのマニュアルを参照してください。Configuration (英語)
1, レコード、カラム、そして型
Embulkは、内部でデータをレコードとして取り扱いします。
レコードは、複数のカラムから構成されるデータ1件のことを表します。
例えば住所録では、人1名の情報がレコードです。1名の情報には、氏名や
年齢といったその人固有の情報があります。この固有の情報一つ一つが
カラムです。それぞれのカラムには型があります。姓や名であれば文字列、
年齢であれば数字(整数)、生年月日であれば日付(日時)、体重であれば小数点といったものが
型になります。
姓:文字列 | 名:文字列 | 年齢:整数 | 生年月日:日時 | 体重:浮動小数点 |
---|---|---|---|---|
山田 | 太郎 | 20 | 2000/01/01 | 65.5 |
田中 | 花子 | 25 | 2003/12/12 | 45.3 |
1.1, レコード内で利用できる型
Embulkではカラム(スキーマ)では、次の型をサポートしています。対応する各言語の型は次のとおりです。(この情報プラグインを開発する際に必要です)
JSON型はembulk 0.8からサポート
Embulk | 説明 | Ruby | Java |
---|---|---|---|
boolean | 真偽値 | Boolean | Boolean |
long | 整数型 | Integer | Long |
timestamp | 時刻 | Time | Timestamp |
double | 浮動小数点 | Float | Double |
string | 文字列 | String | String |
json | JSON | 確認中 | 確認中 |
2, 設定ファイル
Embulkの設定はYAML形式と呼ばれる構造化形式で記述を行います。
in:
type: file
path_prefix: /data/embulk/datas/access_log-
decoders:
- type: gzip
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: '"'
escape: ''
null_string: 'NULL'
header_line: true
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
# filters:
out:
type: stdout
- in: インプットプラグインに関する記述
- out: アウトプットプラグインに関する記述
- filters: フィルタプラグイン関する記述
- exec: エクゼキュータに関する記述
3, 組み込みインプットプラグイン
Embukのインプットプラグインは、データベースやクラウドサービスなどに格納された
ファイルやデータベースのテーブルをレコードの元データとして取り込みします。
Embulk本体には、ローカルのファイルからデータを読み込みするためのファイルプラグインが
搭載されていいます。
inのエントリの中のtypeの所に、インプットプラグインで利用するプラグインの種類を記述します。
in:
type: # ここに種類を書く
Emblulk組み込みで指定できるインプットプラグイン
type | 入力元 | 備考 |
---|---|---|
file | ファイル |
3.1, ファイルプラグインの詳細
fileプラグインで指定できる設定は次のとおりです。
設定項目 | データ型 | 説明 | 初期値等 |
---|---|---|---|
path_prefix | 文字列 | 入力ファイル名のプレフィックスパス | |
decoders | 配列 | デコーダーに関する設定(後述) | |
parser | ハッシュ | パーサーに関する設定(後述) | |
last_path | 文字列 | 最後に読み込みしたファイルを記述 |
記述する内容は多岐にわたりますが、Embulkにはguessという設定ファイルを自動生成してくれる便利な機能があります。通常はこちらを使い、必要な場合にのみ一部を修正するという手順になります。
ただしデータの場所を記述するpath_prefixは利用者自身が設定をおこないます。
path_prefixはEmbulkで読み込みを行いたいファイルの場所をディレクトリも含めて記述します。
例えば、ファイル/data/embulk/datasの中に、access_log-YYYYMMDD.gzというファイルがある場合次のように記述します。
- path_prefix: /data/embulk/datas/access_log-
また、last_pathはEmbulkが記述するもので、ユーザ自身が設定をすることは通常ありません。
※ path_prefixに指定したファイルはUnicordのソート順所によってOSに指定された文字列のソート順序によって読み込まれます。last_pathが指定されていると、指定されたファイルよりソート順があとのファイルのみデータが読み込まれます。
※ last_pathのソート順はOS非依存です。unicodeの順序に依存します。 (2015/03/02追記)
随時データが作成される場合、ファイルが古いファイルよりも後ろに並ぶようにファイルを命名してください。
4, デコーダ
読み込みするデータは、gzipなどのプログラムのよって圧縮されている場合があります。ファイルを読み込みする際に、伸張しながらデータを読み込みするのがデコーダです。また標準のプラグインには用意されていませんが、暗号化されたデータを複合化しながら読み込みするのもデコーダの役割です。
type | 説明 | 備考 |
---|---|---|
gzip | gzip形式のデータを伸張します | |
bzip2 | bzip2形式のデータを伸張します | v0.8.4~ |
gzip,bzip2デコーダには現在オプションがありません。
5, パーサー
インプットプラグインで読み込んだデータを整形して、レコードの形式にするのがパーサです。
組み込みプラグインではCSVとJSONを読み込みするためのプラグインが用意されています。
type | 説明 |
---|---|
csv | CSV形式のデータを読み込む |
json | JSON形式のデータを読み込む |
5.1, CSVパーサープラグイン
設定キー | 説明 |
---|---|
delimiter | 区切り文字を指定します。',', タブの場合は"\t" |
quote | 文字列として扱う場合に利用されている記号 |
escape | 特別な文字をエスケープするための文字 |
skip_header_lines | スキップヘッダの行数(0.5.2以降) |
null_string | NULL値として扱う文字列 |
trim_if_not_quoted | クォートされていないと文字列前後の空白を削除、初期値false |
comment_line_maker | 行頭が指定した文字列で始まっっていたらデータとして取り扱わずスキップする |
allow_optional_columns | true時、カラムが足りない行をスキップする代わりにnullをセット(v0.5.4~) |
allow_extra_columns | trueだった場合、おおすぎるカラムを無視する、falseの場合該当行をスキップする |
max_quoted_size_limit | クォートの閉じ忘れなどがあった場合にスキップ |
stop_on_invalid_record | パースエラーが発生した場合にトランザクションを停止するかどうか(デフォルトfalse) |
default_timezone | Asia/Tokyo など初期値UTC |
default_date | 時刻のみのフィールドにつける日付(default: 1970-1-1) 0.8.9~ |
newline | 改行の形式(CRLF, LF, CR) |
charset | 文字エンコーディング(UTF-8), Shift_JIS等 |
columns | カラムに関する情報、日(後述) |
header_line(旧オプション) | ヘッダ行があるかどうか |
- max_quoted_size_limit: 初期値 131072 (128KB)
- default_timezoneの説明
trim_if_not_quotedについて
下記のようなデータで、trim_if_not_quotedがtrueの場合
,
"",""
,
出力結果は次のようになる。
,
,
,
skip_header_linesについて
次のようなデータがあり、skip_header_linesが4となっている場合、5行目からをデータとして利用します。skip_header_linesの導入でheader_lineは古い設定となりました。今後はskip_header_linesを使いましょう。
created date,2015-03-05
created by,me
device,xyz
time,key1,key2,key3,key4
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d
カラムに関する指定は次の通りです。
設定キー | 説明 |
---|---|
name | カラムの名前を指定します。 |
type | カラムの型を指定します。前述の型 |
format | カラムの型がtimestampの場合、日付の形式を指定します。 |
date | 時刻のみのフィールドにつける日付 |
formatで指定する形式は、man strftime(3)または、strftimeなどを参考にしてください。
※ 日時のフォーマットが全て動作するかは未確認です。不具合があればお知らせください。
null_stringについて
設定 | 引用なし空文字列 | 引用あり空文字列 | \N 引用なし | "\N" 引用あり |
---|---|---|---|---|
null_string: null (default) |
null |
空文字列 | \N | \N |
null_string: "" |
null |
null |
\N | \N |
null_string: \N |
空文字列 | 空文字列 | null |
null |
5.2, JSONパーサープラグイン
次のようなデータをすべてカラム名record
でjson型としてデータを読み込みします。
{"time":1455829282,"ip":"93.184.216.34","name":frsyuki}
{"time":1455829282,"ip":"172.36.8.109":sadayuki}
{"time":1455829284,"ip":"example.com","name":Treasure Data}
{"time":1455829282,"ip":"10.98.43.1","name":MessagePack}
設定キー | 説明 |
---|---|
stop_on_invalid_record | 読み込みできないデータがあった場合に停止します。(デフォルトfalse) |
Embulk 0.9.15で実験的に新機能が使えるようになっています。
6, 組み込みアウトプットプラグイン
typeに出力したいアウトプットを種類を指定します。現在次のアウトプットが指定できます。
type | 出力先 | 備考 |
---|---|---|
stdout | 標準出力 | |
file | ファイル | |
"null" | 何も出力しない |
#out: {type: stdout}
#out: {type: "null" } # クォートすること
out:
type: file
path_prefix: ./sampleo
file_ext: .csv.gz
formatter:
type: csv
header_line: true
charset: UTF-8
newline: CRLF
encoders:
- {type: gzip, level: 6 }
6.1, null
何も出力しないアウトプットプラグインです。typeを指定するときはクォートして"null"と記述してください。
6.2, stdout
オプションはありません。
6.3, file
fileアウトプットで指定できる設定は次のとおりです。
設定項目 | データ型 | 説明 | 初期値等 |
---|---|---|---|
path_prefix | 文字列 | 出力ファイル名のプレフィックスパス | |
sequence_format | 文字列 | 出力するファイル名につけるシーケンス | .%03d.%02d |
file_ext | 文字列 | 出力先ファイルの末尾の文字列 | |
encoders | 配列 | エンコーダーに関する設定 | |
formatter | ハッシュ | フォーマッターに関する設定 |
6.3.1, ファイル出力例
- path_prefix: /path/to/embulk/sample
- file_ext: csv
- sequence_format: ".%03d.%02d"
- 出力されるファイルの形式: /path/to/embulk/sample.000.00.csv
6.3.2, シーケンスフォーマット(sequence_format)とは
タスクインデックスはインプットプラグインのタスクの数で、インプットプラグインがFileやS3の場合、入力ファイル数がタスクインデックスの数になります。入力がデータベースなどタスク数が1の場合、タスクインデックス数は0となります。
ファイルインデックスは、一つのタスク内複数ファイルを取り扱う場合に増える番号ですが、現状は複数タスクをまとめるがないため常に0になります。
7, 組み込みフォーマッター
typeを使って、利用するフォーマッタを指定します。
利用できるフォーマッタは次のとおりです。
type | フォーマッタ説明 |
---|---|
csv | CSV形式の出力をするフォーマッタ |
7.1, CSVフォーマッタ
設定キー | データ型 | 説明 |
---|---|---|
delimiter | 文字列 | 区切り文字を指定します。未指定時",", タブの場合は"\t" |
quote | 文字列 | クォートに使う文字 |
quote_policy | 列挙型 | クォートの方針(ALL, MINIMAL, NONE) |
escape | 文字列 | 特別な文字をエスケープする文字 |
header_line | 真偽値 | ヘッダを出力する(true)/出力しない(false) |
null_string | 文字列 | NULLとして取り扱う文字 |
newline | 列挙型 | 改行の形式(CRLF, LF, CR) |
newline_in_field | 列挙型 | フィールド内の改行文字(CRLF, LF, CR) |
charset | 文字列 | 文字エンコーディング(UTF-8), Shift_JIS等 |
default_timezone | 文字列 | 時刻出力時のタイムゾーン(初期値UTC) |
quote_policy
設定キー | 説明 |
---|---|
NONE | クォートをしない |
MINIMAL | 必要な場合のみクォートフィールド内にデリミタ等がある場合のみ |
ALL | 全てのフィールドをクォート |
※ quote_policy: NONEはEmbulk 0.7.5まで正常に動作しません。Embulk 0.7.6以降を使ってください。
column_options
設定キー | 説明 |
---|---|
timezone | 時刻出力時のタイムゾーン(初期値UTC) |
format | 時刻のフォーマット(未指定時: %Y-%m-%d %H:%M:%S.%6N %z) |
8, 組み込みエンコーダープラグイン
出力したファイルを暗号化したり圧縮するためのプラグインです。
8.1, gzip
gzip形式で圧縮をします。
設定キー | データ型 | 説明 |
---|---|---|
level | 数値 | 圧縮率(最大9), 最小0(非圧縮)、未指定時6 |
8.2, bzip2
bzip2形式で圧縮をします。(0.8.4~)
設定キー | データ型 | 説明 |
---|---|---|
level | 数値 | 圧縮率(最大9), 最小0(非圧縮)、未指定時9 |
9, 組み込みフィルタプラグイン
9.1, rename
カラムの名前を変更するプラグインです。columns
に変更したい元の名前、変更したい名前のペアをセットします。
下記の例では次のようにカラム名を変更します。
- my_existing_column1をnew_column1に
- my_existing_column2をnew_column2に
filters:
...
- type: rename
columns:
my_existing_column1: new_column1
my_existing_column2: new_column2
その他最新のrenameプラグインには様々な機能が追加されています。
9.2, remove_columns
カラムを削るプラグインです。詳細はオフィシャルドキュメントを参照してください。