これは、失敗談として書こうとしたが、この記事を書くために色々再現させながらやってみると十分な性能を出すことができた。記事タイトルは釣りとしてそのままにする。
実際、今回の様な用途で、embulkで十分な性能を出すには色々と苦労があった。その知見とembulkの今後の発展に少しでも役立てればいいなと思う。
なお、最初に挑戦したのはWindows環境だった。ここではMacの環境で試しているがほぼそのままの手順でWindowsでも通用すると思う。
タスク
たくさん(万単位)あるcsvファイルをある形式のcsvファイルに変換してデータ移行したい
Embulk選定の理由
- なんかそういうETL専用っぽい
- 流行ってそう
- 簡単そう(これは今はそう思わない)
- 速いらしい
- 使い方に慣れれば、csv → DB や DB → csv などの用途にも使えそう。その他適用範囲が広そう。
- 今回のタスクの変換処理がそれなりに複雑なので、スクリプト言語で変換処理を実装したい(javaとかでやるといかにも面倒そう)
最後の点が大きかったのだがその処理の詳細な内容についてはこの記事では触れない。rubyのようなテキストを扱うのが簡単な言語でパーサプラグインを作るとキマる予感がしたのだが…
ハマった問題と原因
問題: 速度が致命的に遅い
原因: 入力ファイルが大量にあるため
embulkは1ファイルあたり1回ずつEmbukの起動が必要になる(少し語弊があるかもしれない)が、javaとjrubyの相乗効果で起動が異常に遅い。
巨大で数ファイルのデータであればembulkの効果が発揮されそうだが、今回のタスクにはミスマッチだった。
解決までの試行錯誤
以降、以下の順で説明する。ちょっとしたチュートリアルの代わりになることを狙っている。
- embulkのインストール
- サンプル実行
- パーサーの開発、実行
- EmbulkEmbed による実行速度改善
embulkのバージョン
0.8.38
インストール
まずは公式(https://github.com/embulk/embulk#linux--mac--bsd) に倣ってembulk本体のインストール
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc
$ embulk --version
embulk 0.8.38
サンプル実行
最初に embulk を単純に実行してみる。
テストデータをembulkのコマンドで用意する。以降の説明もこのデータを利用する。
$ embulk example ./work
2017-11-29 07:52:36.631 +0900: Embulk v0.8.38
********************************** INFORMATION **********************************
Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
compatibility-breaking changes and key feature updates.
https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************
Creating ./work directory...
Creating ./work/
Creating ./work/csv/
Creating ./work/csv/sample_01.csv.gz
Creating ./work/seed.yml
Run following subcommands to try embulk:
1. embulk guess ./work/seed.yml -o config.yml
2. embulk preview config.yml
3. embulk run config.yml
以下のファイルが準備された。
$ tree work
work
|-- csv
| `-- sample_01.csv.gz
`-- seed.yml
1 directory, 2 files
$ gzip -dc work/csv/sample_01.csv.gz
id,account,time,purchase,comment
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,"Embulk ""csv"" parser plugin"
4,11270,2015-01-29 11:54:36,20150129,NULL
seed.yml を元に config.yml を生成する。これは入力データから config.yml の雛形をある程度自動で準備してくれるコマンド。
$ embulk guess work/seed.yml -o config.yml
ただしここでの説明を簡単にするために以下のように簡略化した。以降 seed.ymlは不要なので忘れて良い。
in:
type: file
path_prefix: ./work/csv/sample_
decoders:
- {type: gzip}
parser:
charset: UTF-8
type: csv
skip_header_lines: 1
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out:
type: file
path_prefix: ./work/sample_output_
file_ext: csv
formatter:
type: csv
そして preview (出力ファイルを生成せずに実行結果の概要を画面表示)
$ embulk preview config.yml
2017-11-29 07:59:43.981 +0900: Embulk v0.8.38
********************************** INFORMATION **********************************
Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
compatibility-breaking changes and key feature updates.
https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************
2017-11-29 07:59:56.665 +0900 [INFO] (0001:preview): Listing local files at directory '/Users/koji/embulk-workspace/embulk-parser-multiline/work/csv' filtering filename by prefix 'sample_'
2017-11-29 07:59:56.666 +0900 [INFO] (0001:preview): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-11-29 07:59:56.669 +0900 [INFO] (0001:preview): Loading files [/Users/koji/embulk-workspace/embulk-parser-multiline/work/csv/sample_01.csv.gz]
2017-11-29 07:59:56.677 +0900 [INFO] (0001:preview): Try to read 32,768 bytes from input source
+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long | time:timestamp | purchase:timestamp | comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
| 1 | 32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC | embulk |
| 2 | 14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC | embulk jruby |
| 3 | 27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
| 4 | 11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC | |
+---------+--------------+-------------------------+-------------------------+----------------------------+
そして run(work/sample_output_*.csvが生成される。)
$ embulk run config.yml
〜画面表示は省略〜
$ head -999 work/sample_output_*.csv
==> work/sample_output_000.00.csv <==
id,account,time,purchase,comment
1,32864,2015-01-27 19:23:49.000000 +0000,2015-01-27 00:00:00.000000 +0000,embulk
2,14824,2015-01-27 19:01:23.000000 +0000,2015-01-27 00:00:00.000000 +0000,embulk jruby
3,27559,2015-01-28 02:20:02.000000 +0000,2015-01-28 00:00:00.000000 +0000,"Embulk ""csv"" parser plugin"
4,11270,2015-01-29 11:54:36.000000 +0000,2015-01-29 00:00:00.000000 +0000,NULL
==> work/sample_output_001.00.csv <==
id,account,time,purchase,comment
==> work/sample_output_002.00.csv <==
id,account,time,purchase,comment
==> work/sample_output_003.00.csv <==
id,account,time,purchase,comment
既に起動の遅さが気になるが、うまく動いている。
あれ?ファイルが4つ作成された。これは、試行錯誤した結果config.ymlにmin_output_taks: 1
を追加することで1つにすることができた。(あまりこういったことを言及している記述は見受けられないが)
組み込みプラグインの config.yml のパラメータについてはEmbulk 0.8 documentationを参照すると良い。
$ diff -u config.yml{.v1,}
--- config.yml.v1 2017-11-29 08:28:27.000000000 +0900
+++ config.yml 2017-11-29 08:27:13.000000000 +0900
@@ -19,3 +19,6 @@
file_ext: csv
formatter:
type: csv
+
+exec:
+ min_output_tasks: 1
もう一度実行。OK.
$ rm work/*.csv
$ embulk run config.yml
$ head -999 work/sample_output_*.csv
id,account,time,purchase,comment
1,32864,2015-01-27 19:23:49.000000 +0000,2015-01-27 00:00:00.000000 +0000,embulk
2,14824,2015-01-27 19:01:23.000000 +0000,2015-01-27 00:00:00.000000 +0000,embulk jruby
3,27559,2015-01-28 02:20:02.000000 +0000,2015-01-28 00:00:00.000000 +0000,"Embulk ""csv"" parser plugin"
4,11270,2015-01-29 11:54:36.000000 +0000,2015-01-29 00:00:00.000000 +0000,NULL
入力ファイル名が前方一致で曖昧だったり、出力ファイル名に自動で連番がつくなど制御が細かくできない気がするが自分の今回の利用では問題ないので無視する。
あと、ちょっと気になるのがサンプルの入力ファイルは最後に空行があるのだが、これが消えている。どのフェーズで消えているのかわからないがcsvを正規化しているのだと思えば、まあ悪いことではない。
$ gzip -dc work/csv/sample_01.csv.gz | wc -l
6
$ cat work/sample_output_000.00.csv | wc -l
5
パーサープラグインの開発
以下、パーサープラグインの開発の手順を示す。この記事で示すパーサーの例は実際にやりたかったものとは違うが、以下の記事で似かよった問題に対応しており、非常に参考になった。そこでこれを土台に説明したい。
複数行からなるログを解析するために、EmbulkのparserプラグインをRubyで開発する話(準備編)
まずはパーサーの雛形である。ここでは multiline という名前になっているが1行入力(csvでもない)し、1行出力するだけのパーサーとなっている。
module Embulk
module Parser
class MultiLine < ParserPlugin
Plugin.register_parser("multiline", self)
def self.transaction(config, &control)
parser_task = config.load_config(Java::LineDecoder::DecoderTask)
task = {
"decoder_task" => DataSource.from_java(parser_task.dump)
# "property1" => config.param("property1", :string),
# "property2" => config.param("property2", :integer, default: 0),
}
columns = [
Column.new(0, "line", :string),
]
yield(task, columns)
end
def init
# initialization code:
# @property1 = task["property1"]
# @property2 = task["property2"]
@decoder_task = task.param("decoder_task", :hash).load_task(Java::LineDecoder::DecoderTask)
end
def run(file_input)
decoder = Java::LineDecoder.new(file_input.instance_eval { @java_file_input }, @decoder_task)
while decoder.nextFile
while line = decoder.poll
page_builder.add([line])
end
end
page_builder.finish
end
end
end
end
いや、ちょっと複雑じゃないか?ETL処理なんて業務の脇役なので多分このコードを次に見るときは内容を忘れてる。
このプラグインのポイントは以下
- embulkのパーサーは行指向ではない
性能のためか?もしくはバイナリファイルを扱うこともあるから?なのか、行単位で処理するスクリプトを書く場合は上記のJava::LineDecoderというクラスを使う必要があるらしい。これを使えばdecoder.pollで次の行を取ってきてくれるらしい。ちょっと複雑なのはそういう処理も入っているから。
まあ良しとしよう。順番が前後するがこのプラグインは以下のステップにより作成する。
最初にプラグインの雛形を生成。(ここではrubyのparserプラグインの雛形をmultilineという名前で生成するコマンドを実行している)
$ embulk new ruby-parser multiline
2017-11-29 07:06:14.108 +0900: Embulk v0.8.38
********************************** INFORMATION **********************************
Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
compatibility-breaking changes and key feature updates.
https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************
Creating embulk-parser-multiline/
Plugin template is successfully generated.
Next steps:
$ cd embulk-parser-multiline
$ bundle install # install one using rbenv & rbenv-build
$ bundle exec rake # build gem to be released
$ bundle exec embulk run config.yml # you can run plugin using this command
丁寧に今後のステップの説明が出ているが、開発するmultilineはgem化はしないのでこのステップは踏まない。まずは生成されたファイル群を見てみる。
$ tree embulk-parser-multiline
embulk-parser-multiline
|-- Gemfile
|-- LICENSE.txt
|-- README.md
|-- Rakefile
|-- embulk-parser-multiline.gemspec
`-- lib
`-- embulk
|-- guess
| `-- multiline.rb
`-- parser
`-- multiline.rb <-- いじるのはこのファイル
4 directories, 7 files
gem 化は考えないので今回触るのは lib/embulk/parser/multiline.rb だけである。他のファイルは一切触らない。
消してしまってもいい。
このファイルを先に記した内容に置き換えてまずは実行してみる。
$ vi embulk-parser-multiline/lib/embulk/parser/multiline.rb
〜内容を差し替える〜
multiline を利用するようにパーサーを変更する。
$ diff -u config.yml{.v2,}
--- config.yml.v2 2017-11-29 09:10:36.000000000 +0900
+++ config.yml 2017-11-29 09:10:45.000000000 +0900
@@ -5,7 +5,7 @@
- {type: gzip}
parser:
charset: UTF-8
- type: csv
+ type: multiline
skip_header_lines: 1
columns:
- {name: id, type: long}
そして、実行するのだが、実行の際には、 -I
オプションでプラグインの場所を指定する必要がある。これがないと、
Error: ParserPlugin 'multiline' is not found.
Unknown parser plugin 'multiline'. embulk/parser/multiline.rb is not installed. Run 'embulk gem search -rd embulk-parser' command to find plugins.
というエラーが発生するので注意。この部分は色々あって結構ハマった。
$ embulk preview -I embulk-parser-multiline/lib config.yml
2017-11-29 09:17:24.234 +0900: Embulk v0.8.38
********************************** INFORMATION **********************************
Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
compatibility-breaking changes and key feature updates.
https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************
2017-11-29 09:17:36.936 +0900 [INFO] (0001:preview): Listing local files at directory 'work/csv' filtering filename by prefix 'sample_'
2017-11-29 09:17:36.938 +0900 [INFO] (0001:preview): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-11-29 09:17:36.940 +0900 [INFO] (0001:preview): Loading files [work/csv/sample_01.csv.gz]
2017-11-29 09:17:36.948 +0900 [INFO] (0001:preview): Try to read 32,768 bytes from input source
2017-11-29 09:17:37.003 +0900 [INFO] (0001:preview): Loaded plugin embulk/parser/multiline from a load path
+---------------------------------------------------------------------+
| line:string |
+---------------------------------------------------------------------+
| id,account,time,purchase,comment |
| 1,32864,2015-01-27 19:23:49,20150127,embulk |
| 2,14824,2015-01-27 19:01:23,20150127,embulk jruby |
| 3,27559,2015-01-28 02:20:02,20150128,"Embulk ""csv"" parser plugin" |
| 4,11270,2015-01-29 11:54:36,20150129,NULL |
| |
+---------------------------------------------------------------------+
csv のプラグインを使ってないので列が1つになっている。(末尾の空行も残っているようだ)
何もしないプラグインが正常に動作することが確認できた。
embulk のデバッグ
ここで、ちょっとデバッグのことを考える。embulk を使っていてうまく動作しないときに以下のようなテクニックが重要になってくると思った。
irb
irb を使うとembulkの状態を確認することができる。例えば
$ embulk irb -I embulk-parser-multiline/lib config.yml
2017-11-29 09:20:11.768 +0900: Embulk v0.8.38
********************************** INFORMATION **********************************
Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
compatibility-breaking changes and key feature updates.
https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************
irb(main):001:0> puts $LOAD_PATH
uri:classloader:/META-INF/jruby.home/lib/ruby/2.3/site_ruby
uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib
=> nil
irb(main):002:0>
などとして、ruby のグローバル変数 $LOAD_PATH
がどうなっているか?などの確認が取れる。
ただ、自分の経験で、この irb が役に立ったことはない。今後、使える場面もあるかも知れないので参考に記載した。
メインのスクリプトの書き換え
embulkの実行ファイル(これは実際にはjarファイル)を展開するとjavaのクラスファイルと一緒にrubyスクリプトが含まれていることがわかる
$ unzip -d embulk-X.X.X ~/.embulk/bin/embulk
$ ls embulk-X.X.X
LICENSE bundler/ edu/ javax/ license.html me/ org/
META-INF/ bundler.rb embulk/ jni/ licenses/ msgpack/ tables/
NOTICE ch/ embulk.rb jnr/ liquid/ msgpack.rb
about.html com/ io/ jruby/ liquid.rb net/
例えば、以下のスクリプトに修正を加えれば embulk がプラグインを見つけられない時に原因を調べることができる。
(embulkのエラーメッセージから該当のスクリプトファイルを突き止めた)
$ diff -u embulk-X.X.X/embulk/plugin_registry.rb{.org,}
--- embulk-X.X.X/embulk/plugin_registry.rb.org 2017-11-29 09:50:31.000000000 +0900
+++ embulk-X.X.X/embulk/plugin_registry.rb 2017-11-29 09:52:31.000000000 +0900
@@ -35,6 +35,7 @@
def search(type)
name = "#{@search_prefix}#{type}"
+STDERR.puts "name ==> #{name.inspect}"
begin
require_and_show name
return true
@@ -46,6 +47,7 @@
# search from $LOAD_PATH
load_path_files = $LOAD_PATH.map do |lp|
+STDERR.puts "lp ==> #{lp.inspect}"
lpath = File.expand_path(File.join(lp, "#{name}.rb"))
File.exist?(lpath) ? lpath : nil
end
以下の実行例で最初に出る Embulk version unavailable due to failure to access the jar file.
は無視すること。メッセージからjarファイルよりversion情報を取得する処理があるらしい。
$ java -classpath embulk-X.X.X org.embulk.cli.Main preview config.yml
Embulk version unavailable due to failure to access the jar file.
java.io.FileNotFoundException: /Users/koji/embulk-workspace/embulk-X.X.X (Is a directory)
at java.util.zip.ZipFile.open(Native Method)
at java.util.zip.ZipFile.<init>(ZipFile.java:219)
at java.util.zip.ZipFile.<init>(ZipFile.java:149)
at java.util.jar.JarFile.<init>(JarFile.java:166)
at java.util.jar.JarFile.<init>(JarFile.java:103)
at org.embulk.EmbulkVersion.getSelfJarManifest(EmbulkVersion.java:61)
at org.embulk.EmbulkVersion.<clinit>(EmbulkVersion.java:19)
at org.embulk.cli.Main.main(Main.java:27)
2017-11-29 10:01:08.800 +0900: Embulk v[embulk-version-unavailable]
********************************** INFORMATION **********************************
Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
compatibility-breaking changes and key feature updates.
https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************
[WARN] Embulk looks running out of the Embulk jar file. It is unsupported.
2017-11-29 10:01:18.484 +0900 [INFO] (0001:preview): Listing local files at directory 'work/csv' filtering filename by prefix 'sample_'
2017-11-29 10:01:18.487 +0900 [INFO] (0001:preview): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-11-29 10:01:18.490 +0900 [INFO] (0001:preview): Loading files [work/csv/sample_01.csv.gz]
2017-11-29 10:01:18.500 +0900 [INFO] (0001:preview): Try to read 32,768 bytes from input source
name ==> "embulk/parser/multiline"
lp ==> "uri:classloader:/META-INF/jruby.home/lib/ruby/2.3/site_ruby"
lp ==> "uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib"
lp ==> "/Users/koji/embulk-workspace/embulk-X.X.X"
〜スタックトレースは省略〜
Error: ParserPlugin 'multiline' is not found.
Unknown parser plugin 'multiline'. embulk/parser/multiline.rb is not installed. Run 'embulk gem search -rd embulk-parser' command to find plugins.
どうやら、-I
オプションは irb では認識されていないが、preview の時は$LOAD_PATH
が変わっているらしい。
パーサープラグインの開発(続き)
単純な何もしないパーサーがうまくいったが、もう少し改良を加える。
自分が実際に作りたかったのはcsvパーサーの変形であったので、組み込みのcsvパーサを改良するものとしたい。
(この命題ではcsvパーサーの出力をさらにフィルタ処理するfilterプラグインでも良かったかもしれない)
どうやって、突き止めたかは忘れてしまったが、以下のようにするとcsvパーサーの実装を流用できる
- java_import で、javaのクラスを使えるようにする(これはjrubyの機能)
- CsvParserPlugin クラスで csv パーサーと同じconfig項目を読み込む
- CsvTokenizer でカラム毎の処理を行う
config.ymlのskip_header_linesを参照して、ヘッダを読み飛ばす処理も実装した
require 'pp'
module Embulk
module Java
java_import 'org.embulk.standards.CsvParserPlugin'
java_import 'org.embulk.standards.CsvTokenizer'
end
module Parser
class Multiline < ParserPlugin
Plugin.register_parser("multiline", self)
def self.transaction(config, &control)
plugin_task = config.load_config(Java::CsvParserPlugin::PluginTask)
task = {
"plugin_task" => DataSource.from_java(plugin_task.dump),
# "property1" => config.param("property1", :string),
# "property2" => config.param("property2", :integer, default: 0),
}
columns = [
Column.new(0, "id", :string),
Column.new(1, "account", :string),
Column.new(2, "time", :string),
Column.new(3, "purchase", :string),
Column.new(4, "comment", :string),
]
yield(task, columns)
end
def init
# initialization code:
# @property1 = task["property1"]
# @property2 = task["property2"]
@plugin_task = task.param("plugin_task", :hash)
.load_task(Java::CsvParserPlugin::PluginTask)
end
def run(file_input)
java_file_input = file_input.instance_eval { @java_file_input }
decoder = Java::LineDecoder.new(java_file_input, @plugin_task)
tokenizer = Java::CsvTokenizer.new(decoder, @plugin_task)
while tokenizer.nextFile
skip_header_lines = @plugin_task.skip_header_lines
while skip_header_lines > 0
skip_header_lines -= 1
break if !tokenizer.skipHeaderLine
end
while tokenizer.nextRecord
line = []
while tokenizer.hasNextColumn
line.push tokenizer.nextColumn
end
page_builder.add(line)
end
end
page_builder.finish
end
end
end
end
$ embulk preview -I embulk-parser-multiline/lib config.yml
〜途中省略〜
+-----------+----------------+---------------------+-----------------+----------------------------+
| id:string | account:string | time:string | purchase:string | comment:string |
+-----------+----------------+---------------------+-----------------+----------------------------+
| 1 | 32864 | 2015-01-27 19:23:49 | 20150127 | embulk |
| 2 | 14824 | 2015-01-27 19:01:23 | 20150127 | embulk jruby |
| 3 | 27559 | 2015-01-28 02:20:02 | 20150128 | Embulk "csv" parser plugin |
| 4 | 11270 | 2015-01-29 11:54:36 | 20150129 | NULL |
+-----------+----------------+---------------------+-----------------+----------------------------+
複数ファイルの処理
パーサー開発の雛形ができた。ここから実際に作りたいものを実装する例を示しても良いが今回の記事の本質ではないので、そこは省略する。
パーサは完成したものとして、実質何もしないパーサーで10ファイル程度の処理を行ってみる。
embulk では、config.yml の内容を動的に変更するのに liquid というテンプレートエンジンを使うことがスタンダードらしい。
そこで、以下の config.yml.liquid を用意する
in:
type: file
path_prefix: {{env.input_path}}/{{env.input_file_prefix}}
decoders:
- {type: gzip}
parser:
charset: UTF-8
type: multiline
skip_header_lines: 1
columns:
- {name: id, type: long}
- {name: account, type: long}
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
out:
type: file
path_prefix: {{env.output_path}}/{{env.output_file_prefix}}
file_ext: csv
formatter:
type: csv
exec:
min_output_tasks: 1
そして、以下のように実行してみる。(長いので折り返している)
つまり、liquid を利用することで環境変数を通して、パラメータを動的に変更可能にしている。
$ input_path=work/csv \
input_file_prefix=sample_ \
output_path=work \
output_file_prefix=sample_output_ \
embulk run -I embulk-parser-multiline/lib config.yml.liquid
うまくいったら複数ファイルの出力を試す
$ time for i in $(seq 1 10); do input_path=work/csv input_file_prefix=sample_ output_path=work output_file_prefix=sample_output_${i}_ embulk run -I embulk-parser-multiline/lib config.yml.liquid; done
〜途中省略〜
real 2m47.777s
user 3m15.435s
sys 0m5.956s
1ファイル処理するのにだいたい15秒くらいかかっていたわけだが、実際複数ファイル試した時には
10ファイル処理するのに実時間で3分弱。じゃあ、1万ファイルだと…40時間!これは使い物にならない!
しかし、ここまでのサンクコストが高すぎるので後戻りできない。このまま突き進むことにしてしまった。
EmbulkEmbed の利用
起動処理の時間の問題ということで、embulkを埋め込む対処を試してみた。
参考: ひしだま's 技術メモページ - Javaから実行する方法
import org.embulk.EmbulkEmbed;
import org.embulk.EmbulkEmbed.Bootstrap;
import org.embulk.config.ConfigLoader;
import org.embulk.config.ConfigSource;
import java.io.File;
import java.io.IOException;
public class EmbulkRunner {
public static void main(String[] args) throws IOException {
Bootstrap bootstrap = new EmbulkEmbed.Bootstrap();
EmbulkEmbed embulk = bootstrap.initializeCloseable();
try {
ConfigLoader loader = embulk.newConfigLoader();
ConfigSource config = loader.fromYamlFile(new File("./config.yml"));
ConfigSource in = config.getNested("in");
in.set("path_prefix", "./work/csv/sample_");
embulk.run(config);
} finally {
embulk.destroy();
}
}
}
$ javac -classpath ~/.embulk/bin/embulk src/EmbulkRunner.java
※ 以下、WindowsとUnix系OSではclasspathのパスの区切り文字が違うので注意(よくこれにハマる)
$ java -classpath ~/.embulk/bin/embulk:src EmbulkRunner
$ java -classpath ~/.embulk/bin/embulk;src EmbulkRunner
Exception in thread "main" org.embulk.exec.PartialExecutionException:
org.embulk.config.ConfigException: ParserPlugin 'multiline' is not found.
当然、プラグインの場所を示していないので失敗する。これはembulkのソースを確認した結果、以下のようにsystemConfig.ymlというファイルを用意し、それを読み込ませれば良いことがわかった。
jruby_load_path:
- embulk-parser-multiline/lib
$ diff -u src/EmbulkRunner.java{.org,}
--- src/EmbulkRunner.java.org 2017-11-29 13:30:27.000000000 +0900
+++ src/EmbulkRunner.java 2017-11-29 13:27:51.000000000 +0900
@@ -9,6 +9,8 @@
public class EmbulkRunner {
public static void main(String[] args) throws IOException {
Bootstrap bootstrap = new EmbulkEmbed.Bootstrap();
+ ConfigSource systemConfig = bootstrap.getSystemConfigLoader().fromYamlFile(new File("./systemConfig.yml"));
+ bootstrap.setSystemConfig(systemConfig);
EmbulkEmbed embulk = bootstrap.initializeCloseable();
try {
# コンパイル
$ javac -classpath ~/.embulk/bin/embulk src/EmbulkRunner.java
# 実行
$ java -classpath ~/.embulk/bin/embulk:src EmbulkRunner
2017-11-29 13:34:16.943 +0900 [INFO] (0001:transaction): Loaded plugin embulk/parser/multiline from a load path
2017-11-29 13:34:16.960 +0900 [INFO] (0001:transaction): Listing local files at directory 'work/csv' filtering filename by prefix 'sample_'
2017-11-29 13:34:16.962 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-11-29 13:34:16.967 +0900 [INFO] (0001:transaction): Loading files [work/csv/sample_01.csv.gz]
2017-11-29 13:34:17.068 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / tasks=1
2017-11-29 13:34:17.115 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
2017-11-29 13:34:17.165 +0900 [INFO] (0013:task-0000): Writing local file './work/sample_output_000.00.csv'
2017-11-29 13:34:17.248 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
うまくいった。内部で10回ループするように修正してみる。
EmbulkEmbedの場合、liquidを使わなくてもconfigを動的に変えられるのが良い。
$ diff -u src/EmbulkRunner.java{.v1,}
--- src/EmbulkRunner.java.v1 2017-11-29 13:44:39.000000000 +0900
+++ src/EmbulkRunner.java 2017-11-29 13:47:55.000000000 +0900
@@ -16,10 +16,14 @@
try {
ConfigLoader loader = embulk.newConfigLoader();
ConfigSource config = loader.fromYamlFile(new File("./config.yml"));
-
ConfigSource in = config.getNested("in");
- in.set("path_prefix", "./work/csv/sample_");
- embulk.run(config);
+ ConfigSource out = config.getNested("out");
+
+ for (int i = 0; i < 10; i++) {
+ in.set("path_prefix", "./work/csv/sample_");
+ out.set("path_prefix", "./work/sample_output_" + i + "_");
+ embulk.run(config);
+ }
} finally {
embulk.destroy();
}
# コンパイル
$ javac -classpath ~/.embulk/bin/embulk src/EmbulkRunner.java
# 実行
$ time java -classpath ~/.embulk/bin/embulk:src EmbulkRunner
real 0m16.338s
user 0m16.653s
sys 0m0.673s
10回で16秒、100回で19秒だった
real 0m19.207s
user 0m23.418s
sys 0m1.091s
最初の起動に時間がかかっているだけで十分速いように思える。
外部のプラグインの利用
EmbulkEmbed を利用しつつ外部プラグインも利用してみる。ここでは、embulk-filter-columnを試す。
最初は素のembulkで動くことを確認する。
プラグインをインストールする。
$ embulk gem install embulk-filter-column
2017-11-29 14:04:10.398 +0900: Embulk v0.8.38
Gem plugin path is: /Users/koji/.embulk/jruby/2.3.0
Fetching: embulk-filter-column-0.7.1.gem (100%)
Successfully installed embulk-filter-column-0.7.1
1 gem installed
~/.embulk/jruby 配下にインストールされた
$ tree ~/.embulk
/Users/koji/.embulk
|-- bin
| `-- embulk
`-- jruby
`-- 2.3.0
|-- build_info
|-- cache
| `-- embulk-filter-column-0.7.1.gem
|-- doc
|-- extensions
|-- gems
| `-- embulk-filter-column-0.7.1
〜省略〜
| |-- classpath
| | |-- JsonPathCompiler-0.0.12.jar
| | |-- accessors-smart-1.1.jar
| | |-- asm-5.0.3.jar
| | |-- embulk-filter-column-0.7.1.jar
| | |-- json-smart-2.2.1.jar
| | `-- slf4j-api-1.7.21.jar
〜省略〜
| |-- lib
| | `-- embulk
| | `-- filter
| | `-- column.rb
〜省略〜
`-- specifications
`-- embulk-filter-column-0.7.1.gemspec
32 directories, 41 files
$
config.yml を修正して column プラグインを使う設定に変える(最後の列にdという時刻の列を追加している)。
$ diff -u config.yml{.v3,}
--- config.yml.v3 2017-11-29 14:00:14.000000000 +0900
+++ config.yml 2017-11-29 14:02:07.000000000 +0900
@@ -13,6 +13,10 @@
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
- {name: purchase, type: timestamp, format: '%Y%m%d'}
- {name: comment, type: string}
+filters:
+ - type: column
+ add_columns:
+ - {name: d, type: timestamp, default: "2015-07-13", format: "%Y-%m-%d"}
out:
type: file
path_prefix: ./work/sample_output_
実行してみる
$ embulk run -I embulk-parser-multiline/lib config.yml
2017-11-29 14:09:59.569 +0900 [INFO] (0001:transaction): Loaded plugin embulk-filter-column (0.7.1)
2017-11-29 14:09:59.662 +0900 [INFO] (0001:transaction): Loaded plugin embulk/parser/multiline from a load path
2017-11-29 14:09:59.682 +0900 [INFO] (0001:transaction): Listing local files at directory 'work/csv' filtering filename by prefix 'sample_'
2017-11-29 14:09:59.684 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-11-29 14:09:59.689 +0900 [INFO] (0001:transaction): Loading files [work/csv/sample_01.csv.gz]
2017-11-29 14:09:59.819 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / tasks=1
2017-11-29 14:09:59.877 +0900 [INFO] (0001:transaction): {done: 0 / 1, running: 0}
2017-11-29 14:09:59.949 +0900 [INFO] (0015:task-0000): Writing local file './work/sample_output_000.00.csv'
2017-11-29 14:10:00.053 +0900 [INFO] (0001:transaction): {done: 1 / 1, running: 0}
2017-11-29 14:10:00.059 +0900 [INFO] (main): Committed.
2017-11-29 14:10:00.059 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"work/csv/sample_01.csv.gz"},"out":{}}
最初に、Loaded plugin embulk-filter-column (0.7.1)
という表示があり、うまく動いている。
出力結果も(時刻の書式が気にくわないが)列が追加されている。
$ cat work/sample_output_000.00.csv
id,account,time,purchase,comment,d
1,32864,2015-01-27 19:23:49,20150127,embulk,2015-07-13 00:00:00.000000 +0000
2,14824,2015-01-27 19:01:23,20150127,embulk jruby,2015-07-13 00:00:00.000000 +0000
3,27559,2015-01-28 02:20:02,20150128,"Embulk ""csv"" parser plugin",2015-07-13 00:00:00.000000 +0000
4,11270,2015-01-29 11:54:36,20150129,NULL,2015-07-13 00:00:00.000000 +0000
なお、出力時の時刻の書式は以下で変えられる。CSV formatter plugin - Example(個々の列毎にしか指定できないのだろうか?)
$ diff -u config.yml{.v4,}
--- config.yml.v4 2017-11-29 14:18:26.000000000 +0900
+++ config.yml 2017-11-29 14:18:39.000000000 +0900
@@ -23,6 +23,8 @@
file_ext: csv
formatter:
type: csv
+ column_options:
+ d: {format: '%Y-%m-%d %H:%M:%S'}
exec:
min_output_tasks: 1
EmbulkEmbed も試してみる
plugin のインストールとconfig.ymlの書き換えのみなので再コンパイルせずに実行する。
$ time java -classpath ~/.embulk/bin/embulk:src EmbulkRunner
Exception in thread "main" org.embulk.config.ConfigException: FilterPlugin 'column' is not found.
Unknown filter plugin 'column'. embulk/filter/column.rb is not installed. Run 'embulk gem search -rd embulk-filter' command to find plugins.
失敗した gem でインストールしたプラグインが見つかってない。
なお、前に紹介したデバッグ手法(メインのスクリプトを差し替えて実行する)をEmbulkEmbedで行いたい場合、以下のように実行すれば良い。
$ time java -classpath embulk-X.X.X:src EmbulkRunner
この問題の対処方法の結論はsystemConfig.ymlを以下のように修正すれば良い。
$ diff -u systemConfig.yml{.v1,}
--- systemConfig.yml.v1 2017-11-29 14:24:58.000000000 +0900
+++ systemConfig.yml 2017-11-29 14:25:09.000000000 +0900
@@ -1,2 +1,3 @@
jruby_load_path:
- embulk-parser-multiline/lib
+jruby_use_default_embulk_gem_home: true
あるいは gem を使わずこの環境に閉じ込めたい場合は以下のようにする
プラグインの置き場所を embulk-parser-multiline/lib および gem の場所から plugin に変更する。
$ mkdir -p plugin/lib/embulk/{filter,parser}
$ cp embulk-parser-multiline/lib/embulk/parser/multiline.rb
$ cp ~/.embulk/jruby/2.3.0/gems/embulk-filter-column-0.7.1/lib/embulk/filter/column.rb plugin/lib/embulk/filter/
外部プラグインが必要とする jar ファイルも plugin/classpath にコピーする。
(このパスは embulk/filter/column.rb をみると分かるがファイルの場所がプラグインのスクリプトからの相対的な位置で解決されているため、この場所でなければならない)
$ mkdir plugin/classpath
$ cp ~/.embulk/jruby/2.3.0/gems/embulk-filter-column-0.7.1/classpath/*.jar plugin/classpath/
systemConfig.yml を書き換える (jruby_load_path を変更。jruby_use_default_embulk_gem_home は再度不要になる)
jruby_load_path:
- plugin/lib
実行。
$ time java -classpath ~/.embulk/bin/embulk:src EmbulkRunner
〜途中省略〜
real 0m19.743s
user 0m24.227s
sys 0m1.125s
うまくいった性能も問題ない。
まとめ
最終的に、以下の構成のファイルを用意することで高速に処理を動かすことができた。
作ったファイルの最終版はこれ
.
|-- config.yml config.yml
|-- embulk-X.X.X デバッグ用(embulk.jarを展開したもの。なくても良い)
|-- lib embulk 本体
| `-- embulk-0.8.38.jar
|-- plugin 外部、自作のプラグイン置き場
| |-- classpath
| | |-- JsonPathCompiler-0.0.12.jar
| | |-- accessors-smart-1.1.jar
| | |-- asm-5.0.3.jar
| | |-- embulk-filter-column-0.7.1.jar
| | |-- json-smart-2.2.1.jar
| | `-- slf4j-api-1.7.21.jar
| `-- lib
| `-- embulk
| |-- filter
| | `-- column.rb
| `-- parser
| `-- multiline.rb
|-- src EmbulkEmbedを利用したJavaソースとそのコンパイル結果
| |-- EmbulkRunner.class
| `-- EmbulkRunner.java
|-- systemConfig.yml EmbulkEmbed用のsystemConfig.yml
`-- work サンプルデータ
`-- csv
`-- sample_01.csv.gz
実行例
# 素の実行方法(単体動作確認、問題切り分け用)
$ java -jar lib/embulk-0.8.38.jar run -I plugin/lib config.yml
# EmbulkEmbed
$ javac -classpath lib/embulk-0.8.38.jar src/EmbulkRunner.java
$ time java -classpath lib/embulk-0.8.38.jar:src EmbulkRunner
課題
パーサ開発がrubyで実行できてもドライバ部分をjavaで書くようだと魅力が半減するように思う。
本当の本当にやりたかったのは対象ファイルの一覧がDBのテーブルに格納されているのでそれを読みつつ変換結果のパスで差し替えたいというものだった。これをjavaで書くのはjdbcでゴリゴリ書くか、javaのORマッパ準備してやるとか色々面倒。
今後、EmbulkEmbed をjrubyで動かせないか?とか試してみたい。
→ やってみた
おまけ
この記事で利用している systemConfig.ymlには以下のような設定もある
設定項目つ | 値 |
---|---|
log_path | ログファイル名、出力を標準出力ではなくログファイルに書き出す |
log_level | 出力するログレベル "info" など |
例:
jruby_load_path:
- plugin/lib
jruby_use_default_embulk_gem_home: true
log_path: ./embulk.log
log_level; info