Embulk(エンバルク)組み込みプラグインの設定覚え書き

Embulk組み込みプラグインの設定覚え書き

その他の情報はFluentdのバッチ版Embulkのまとめをご覧ください。

Embulkには組み込みでいくつかのプラグインが用意されています。この資料ではEmbukの基本的な概念を説明したのち、組み込みのプラグインについて解説をおこないます。

詳細は、オフィシャルのマニュアルを参照してください。Configuration (英語)

1, レコード、カラム、そして型

Embulkは、内部でデータをレコードとして取り扱いします。
レコードは、複数のカラムから構成されるデータ1件のことを表します。

例えば住所録では、人1名の情報がレコードです。1名の情報には、氏名や
年齢といったその人固有の情報があります。この固有の情報一つ一つが
カラムです。それぞれのカラムには型があります。姓や名であれば文字列、
年齢であれば数字(整数)、生年月日であれば日付(日時)、体重であれば小数点といったものが
型になります。

姓:文字列 名:文字列 年齢:整数 生年月日:日時 体重:浮動小数点
山田 太郎 20 2000/01/01 65.5
田中 花子 25 2003/12/12 45.3

1.1, レコード内で利用できる型

Embulkではカラム(スキーマ)では、次の型をサポートしています。対応する各言語の型は次のとおりです。(この情報プラグインを開発する際に必要です)

JSON型はembulk 0.8からサポート

Embulk 説明 Ruby Java
boolean 真偽値 Boolean Boolean
long 整数型 Integer Long
timestamp 時刻 Time Timestamp
double 浮動小数点 Float Double
string 文字列 String String
json JSON 確認中 確認中

2, 設定ファイル

Embulkの設定はYAML形式と呼ばれる構造化形式で記述を行います。

embulk.yaml
in:
  type: file
  path_prefix: /data/embulk/datas/access_log-
  decoders:
    - type: gzip 
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: ''
    null_string: 'NULL'
    header_line: true
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
# filters:
out:
  type: stdout
  • in: インプットプラグインに関する記述
  • out: アウトプットプラグインに関する記述
  • filters: フィルタプラグイン関する記述
  • exec: エクゼキュータに関する記述

3, 組み込みインプットプラグイン

Embukのインプットプラグインは、データベースやクラウドサービスなどに格納された
ファイルやデータベースのテーブルをレコードの元データとして取り込みします。

Embulk本体には、ローカルのファイルからデータを読み込みするためのファイルプラグインが
搭載されていいます。

inのエントリの中のtypeの所に、インプットプラグインで利用するプラグインの種類を記述します。

input.yaml
in:
  type: # ここに種類を書く

Emblulk組み込みで指定できるインプットプラグイン

type 入力元 備考
file ファイル

3.1, ファイルプラグインの詳細

fileプラグインで指定できる設定は次のとおりです。

設定項目 データ型 説明 初期値等
path_prefix 文字列 入力ファイル名のプレフィックスパス
decoders 配列 デコーダーに関する設定(後述)
parser ハッシュ パーサーに関する設定(後述)
last_path 文字列 最後に読み込みしたファイルを記述

記述する内容は多岐にわたりますが、Embulkにはguessという設定ファイルを自動生成してくれる便利な機能があります。通常はこちらを使い、必要な場合にのみ一部を修正するという手順になります。

ただしデータの場所を記述するpath_prefixは利用者自身が設定をおこないます。

path_prefixはEmbulkで読み込みを行いたいファイルの場所をディレクトリも含めて記述します。

例えば、ファイル/data/embulk/datasの中に、access_log-YYYYMMDD.gzというファイルがある場合次のように記述します。

  • path_prefix: /data/embulk/datas/access_log- 


また、last_pathはEmbulkが記述するもので、ユーザ自身が設定をすることは通常ありません。

※ path_prefixに指定したファイルはUnicordのソート順所によってOSに指定された文字列のソート順序によって読み込まれます。last_pathが指定されていると、指定されたファイルよりソート順があとのファイルのみデータが読み込まれます。

last_pathのソート順はOS非依存です。unicodeの順序に依存します。 (2015/03/02追記)

随時データが作成される場合、ファイルが古いファイルよりも後ろに並ぶようにファイルを命名してください。

4, デコーダ

読み込みするデータは、gzipなどのプログラムのよって圧縮されている場合があります。ファイルを読み込みする際に、伸張しながらデータを読み込みするのがデコーダです。また標準のプラグインには用意されていませんが、暗号化されたデータを複合化しながら読み込みするのもデコーダの役割です。

type 説明 備考
gzip gzip形式のデータを伸張します
bzip2 bzip2形式のデータを伸張します v0.8.4~

gzip,bzip2デコーダには現在オプションがありません。

5, パーサー

インプットプラグインで読み込んだデータを整形して、レコードの形式にするのがパーサです。
組み込みプラグインではCSVとJSONを読み込みするためのプラグインが用意されています。

type 説明
csv CSV形式のデータを読み込む
json JSON形式のデータを読み込む

5.1, CSVパーサープラグイン

設定キー 説明
delimiter 区切り文字を指定します。',', タブの場合は"\t"
quote 文字列として扱う場合に利用されている記号
escape 特別な文字をエスケープするための文字
skip_header_lines スキップヘッダの行数(0.5.2以降)
null_string NULL値として扱う文字列
trim_if_not_quoted クォートされていないと文字列前後の空白を削除、初期値false
comment_line_maker 行頭が指定した文字列で始まっっていたらデータとして取り扱わずスキップする
allow_optional_columns true時、カラムが足りない行をスキップする代わりにnullをセット(v0.5.4~)
allow_extra_columns trueだった場合、おおすぎるカラムを無視する、falseの場合該当行をスキップする
max_quoted_size_limit クォートの閉じ忘れなどがあった場合にスキップ
stop_on_invalid_record パースエラーが発生した場合にトランザクションを停止するかどうか(デフォルトfalse)
default_timezone Asia/Tokyo など初期値UTC
default_date 時刻のみのフィールドにつける日付(default: 1970-1-1) 0.8.9~
newline 改行の形式(CRLF, LF, CR)
charset 文字エンコーディング(UTF-8), Shift_JIS等
columns カラムに関する情報、日(後述)
header_line(旧オプション) ヘッダ行があるかどうか

trim_if_not_quotedについて

下記のようなデータで、trim_if_not_quotedがtrueの場合

,
"",""
  ,

出力結果は次のようになる。

,
,
,

skip_header_linesについて

次のようなデータがあり、skip_header_linesが4となっている場合、5行目からをデータとして利用します。skip_header_linesの導入でheader_lineは古い設定となりました。今後はskip_header_linesを使いましょう。

created date,2015-03-05
created by,me
device,xyz
time,key1,key2,key3,key4
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d
2015-03-05 20:40:14 -0800,a,b,c,d

カラムに関する指定は次の通りです。

設定キー 説明
name カラムの名前を指定します。
type カラムの型を指定します。前述の型
format カラムの型がtimestampの場合、日付の形式を指定します。
date 時刻のみのフィールドにつける日付

formatで指定する形式は、man strftime(3)または、strftimeなどを参考にしてください。

※ 日時のフォーマットが全て動作するかは未確認です。不具合があればお知らせください。

null_stringについて

設定 引用なし空文字列 引用あり空文字列 \N 引用なし "\N" 引用あり
null_string: null(default) null 空文字列 \N \N
null_string: "" null null \N \N
null_string: \N 空文字列 空文字列 null null

5.2, JSONパーサープラグイン

次のようなデータをすべてカラム名recordでjson型としてデータを読み込みします。

{"time":1455829282,"ip":"93.184.216.34","name":frsyuki}
{"time":1455829282,"ip":"172.36.8.109":sadayuki}
{"time":1455829284,"ip":"example.com","name":Treasure Data}
{"time":1455829282,"ip":"10.98.43.1","name":MessagePack}
設定キー 説明
stop_on_invalid_record 読み込みできないデータがあった場合に停止します。(デフォルトfalse)

6, 組み込みアウトプットプラグイン

typeに出力したいアウトプットを種類を指定します。現在次のアウトプットが指定できます。

type 出力先 備考
stdout 標準出力
file ファイル
"null" 何も出力しない
#out: {type: stdout}
#out: {type: "null" } # クォートすること
out:
  type: file
  path_prefix: ./sampleo
  file_ext: .csv.gz
  formatter:
    type: csv
    header_line: true
    charset: UTF-8
    newline: CRLF
  encoders:
  - {type: gzip, level: 6 }

6.1, null

何も出力しないアウトプットプラグインです。typeを指定するときはクォートして"null"と記述してください。

6.2, stdout

オプションはありません。

6.3, file

fileアウトプットで指定できる設定は次のとおりです。

設定項目 データ型 説明 初期値等
path_prefix 文字列 出力ファイル名のプレフィックスパス
sequence_format 文字列 出力するファイル名につけるシーケンス .%03d.%02d
file_ext 文字列 出力先ファイルの末尾の文字列
encoders 配列 エンコーダーに関する設定
formatter ハッシュ フォーマッターに関する設定

6.3.1, ファイル出力例

  • path_prefix: /path/to/embulk/sample
  • file_ext: csv
  • sequence_format: ".%03d.%02d"
  • 出力されるファイルの形式: /path/to/embulk/sample.000.00.csv

6.3.2, シーケンスフォーマット(sequence_format)とは

タスクインデックスはインプットプラグインのタスクの数で、インプットプラグインがFileやS3の場合、入力ファイル数がタスクインデックスの数になります。入力がデータベースなどタスク数が1の場合、タスクインデックス数は0となります。

ファイルインデックスは、一つのタスク内複数ファイルを取り扱う場合に増える番号ですが、現状は複数タスクをまとめるがないため常に0になります。

7, 組み込みフォーマッター

typeを使って、利用するフォーマッタを指定します。
利用できるフォーマッタは次のとおりです。

type フォーマッタ説明
csv CSV形式の出力をするフォーマッタ

7.1, CSVフォーマッタ

設定キー データ型 説明
delimiter 文字列 区切り文字を指定します。未指定時",", タブの場合は"\t"
quote 文字列 クォートに使う文字
quote_policy 列挙型 クォートの方針(ALL, MINIMAL, NONE)
escape 文字列 特別な文字をエスケープする文字
header_line 真偽値 ヘッダを出力する(true)/出力しない(false)
null_string 文字列 NULLとして取り扱う文字
newline 列挙型 改行の形式(CRLF, LF, CR)
newline_in_field 列挙型 フィールド内の改行文字(CRLF, LF, CR)
charset 文字列 文字エンコーディング(UTF-8), Shift_JIS等
default_timezone 文字列 時刻出力時のタイムゾーン(初期値UTC)

quote_policy

設定キー 説明
NONE クォートをしない
MINIMAL 必要な場合のみクォートフィールド内にデリミタ等がある場合のみ
ALL 全てのフィールドをクォート

quote_policy: NONEはEmbulk 0.7.5まで正常に動作しません。Embulk 0.7.6以降を使ってください。

column_options

設定キー 説明
timezone 時刻出力時のタイムゾーン(初期値UTC)
format 時刻のフォーマット(未指定時: %Y-%m-%d %H:%M:%S.%6N %z)

8, 組み込みエンコーダープラグイン

出力したファイルを暗号化したり圧縮するためのプラグインです。

8.1, gzip

gzip形式で圧縮をします。

設定キー データ型 説明
level 数値 圧縮率(最大9), 最小0(非圧縮)、未指定時6

8.2, bzip2

bzip2形式で圧縮をします。(0.8.4~)

設定キー データ型 説明
level 数値 圧縮率(最大9), 最小0(非圧縮)、未指定時9

9, 組み込みフィルタプラグイン

9.1, rename

カラムの名前を変更するプラグインです。columnsに変更したい元の名前、変更したい名前のペアをセットします。

下記の例では次のようにカラム名を変更します。

  • my_existing_column1をnew_column1に
  • my_existing_column2をnew_column2に
filters:
  ...
  - type: rename
    columns:
      my_existing_column1: new_column1
      my_existing_column2: new_column2

その他最新のrenameプラグインには様々な機能が追加されています。

9.2, remove_columns

カラムを削るプラグインです。詳細はオフィシャルドキュメントを参照してください。