20
22

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Embulkの利用でミスマッチをしてしまった話

Last updated at Posted at 2017-11-29

これは、失敗談として書こうとしたが、この記事を書くために色々再現させながらやってみると十分な性能を出すことができた。記事タイトルは釣りとしてそのままにする。

実際、今回の様な用途で、embulkで十分な性能を出すには色々と苦労があった。その知見とembulkの今後の発展に少しでも役立てればいいなと思う。

なお、最初に挑戦したのはWindows環境だった。ここではMacの環境で試しているがほぼそのままの手順でWindowsでも通用すると思う。

タスク

たくさん(万単位)あるcsvファイルをある形式のcsvファイルに変換してデータ移行したい

Embulk選定の理由

  • なんかそういうETL専用っぽい
  • 流行ってそう
  • 簡単そう(これは今はそう思わない)
  • 速いらしい
  • 使い方に慣れれば、csv → DB や DB → csv などの用途にも使えそう。その他適用範囲が広そう。
  • 今回のタスクの変換処理がそれなりに複雑なので、スクリプト言語で変換処理を実装したい(javaとかでやるといかにも面倒そう)

最後の点が大きかったのだがその処理の詳細な内容についてはこの記事では触れない。rubyのようなテキストを扱うのが簡単な言語でパーサプラグインを作るとキマる予感がしたのだが…

ハマった問題と原因

問題: 速度が致命的に遅い
原因: 入力ファイルが大量にあるため

embulkは1ファイルあたり1回ずつEmbukの起動が必要になる(少し語弊があるかもしれない)が、javaとjrubyの相乗効果で起動が異常に遅い。
巨大で数ファイルのデータであればembulkの効果が発揮されそうだが、今回のタスクにはミスマッチだった。

解決までの試行錯誤

以降、以下の順で説明する。ちょっとしたチュートリアルの代わりになることを狙っている。

  • embulkのインストール
  • サンプル実行
  • パーサーの開発、実行
  • EmbulkEmbed による実行速度改善

embulkのバージョン

0.8.38

インストール

まずは公式(https://github.com/embulk/embulk#linux--mac--bsd) に倣ってembulk本体のインストール

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc
$ embulk --version
embulk 0.8.38

サンプル実行

最初に embulk を単純に実行してみる。
テストデータをembulkのコマンドで用意する。以降の説明もこのデータを利用する。

$ embulk example ./work
2017-11-29 07:52:36.631 +0900: Embulk v0.8.38

********************************** INFORMATION **********************************
  Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
    compatibility-breaking changes and key feature updates.
  https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************

Creating ./work directory...
  Creating ./work/
  Creating ./work/csv/
  Creating ./work/csv/sample_01.csv.gz
  Creating ./work/seed.yml

Run following subcommands to try embulk:

   1. embulk guess ./work/seed.yml -o config.yml
   2. embulk preview config.yml
   3. embulk run config.yml

以下のファイルが準備された。

$ tree work
work
|-- csv
|   `-- sample_01.csv.gz
`-- seed.yml

1 directory, 2 files
$ gzip -dc work/csv/sample_01.csv.gz
id,account,time,purchase,comment
1,32864,2015-01-27 19:23:49,20150127,embulk
2,14824,2015-01-27 19:01:23,20150127,embulk jruby
3,27559,2015-01-28 02:20:02,20150128,"Embulk ""csv"" parser plugin"
4,11270,2015-01-29 11:54:36,20150129,NULL

seed.yml を元に config.yml を生成する。これは入力データから config.yml の雛形をある程度自動で準備してくれるコマンド。

$ embulk guess work/seed.yml -o config.yml

ただしここでの説明を簡単にするために以下のように簡略化した。以降 seed.ymlは不要なので忘れて良い。

config.yml
in:
  type: file
  path_prefix: ./work/csv/sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    type: csv
    skip_header_lines: 1
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out:
  type: file
  path_prefix: ./work/sample_output_
  file_ext: csv
  formatter:
    type: csv

そして preview (出力ファイルを生成せずに実行結果の概要を画面表示)

$ embulk preview config.yml
2017-11-29 07:59:43.981 +0900: Embulk v0.8.38

********************************** INFORMATION **********************************
  Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
    compatibility-breaking changes and key feature updates.
  https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************

2017-11-29 07:59:56.665 +0900 [INFO] (0001:preview): Listing local files at directory '/Users/koji/embulk-workspace/embulk-parser-multiline/work/csv' filtering filename by prefix 'sample_'
2017-11-29 07:59:56.666 +0900 [INFO] (0001:preview): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-11-29 07:59:56.669 +0900 [INFO] (0001:preview): Loading files [/Users/koji/embulk-workspace/embulk-parser-multiline/work/csv/sample_01.csv.gz]
2017-11-29 07:59:56.677 +0900 [INFO] (0001:preview): Try to read 32,768 bytes from input source
+---------+--------------+-------------------------+-------------------------+----------------------------+
| id:long | account:long |          time:timestamp |      purchase:timestamp |             comment:string |
+---------+--------------+-------------------------+-------------------------+----------------------------+
|       1 |       32,864 | 2015-01-27 19:23:49 UTC | 2015-01-27 00:00:00 UTC |                     embulk |
|       2 |       14,824 | 2015-01-27 19:01:23 UTC | 2015-01-27 00:00:00 UTC |               embulk jruby |
|       3 |       27,559 | 2015-01-28 02:20:02 UTC | 2015-01-28 00:00:00 UTC | Embulk "csv" parser plugin |
|       4 |       11,270 | 2015-01-29 11:54:36 UTC | 2015-01-29 00:00:00 UTC |                            |
+---------+--------------+-------------------------+-------------------------+----------------------------+

そして run(work/sample_output_*.csvが生成される。)

$ embulk run config.yml
〜画面表示は省略〜
$ head -999 work/sample_output_*.csv
==> work/sample_output_000.00.csv <==
id,account,time,purchase,comment
1,32864,2015-01-27 19:23:49.000000 +0000,2015-01-27 00:00:00.000000 +0000,embulk
2,14824,2015-01-27 19:01:23.000000 +0000,2015-01-27 00:00:00.000000 +0000,embulk jruby
3,27559,2015-01-28 02:20:02.000000 +0000,2015-01-28 00:00:00.000000 +0000,"Embulk ""csv"" parser plugin"
4,11270,2015-01-29 11:54:36.000000 +0000,2015-01-29 00:00:00.000000 +0000,NULL

==> work/sample_output_001.00.csv <==
id,account,time,purchase,comment

==> work/sample_output_002.00.csv <==
id,account,time,purchase,comment

==> work/sample_output_003.00.csv <==
id,account,time,purchase,comment

既に起動の遅さが気になるが、うまく動いている。

あれ?ファイルが4つ作成された。これは、試行錯誤した結果config.ymlにmin_output_taks: 1を追加することで1つにすることができた。(あまりこういったことを言及している記述は見受けられないが)

組み込みプラグインの config.yml のパラメータについてはEmbulk 0.8 documentationを参照すると良い。

$ diff -u config.yml{.v1,}
--- config.yml.v1	2017-11-29 08:28:27.000000000 +0900
+++ config.yml	2017-11-29 08:27:13.000000000 +0900
@@ -19,3 +19,6 @@
   file_ext: csv
   formatter:
     type: csv
+
+exec:
+  min_output_tasks: 1

もう一度実行。OK.

$ rm work/*.csv
$ embulk run config.yml
$ head -999 work/sample_output_*.csv
id,account,time,purchase,comment
1,32864,2015-01-27 19:23:49.000000 +0000,2015-01-27 00:00:00.000000 +0000,embulk
2,14824,2015-01-27 19:01:23.000000 +0000,2015-01-27 00:00:00.000000 +0000,embulk jruby
3,27559,2015-01-28 02:20:02.000000 +0000,2015-01-28 00:00:00.000000 +0000,"Embulk ""csv"" parser plugin"
4,11270,2015-01-29 11:54:36.000000 +0000,2015-01-29 00:00:00.000000 +0000,NULL

入力ファイル名が前方一致で曖昧だったり、出力ファイル名に自動で連番がつくなど制御が細かくできない気がするが自分の今回の利用では問題ないので無視する。

あと、ちょっと気になるのがサンプルの入力ファイルは最後に空行があるのだが、これが消えている。どのフェーズで消えているのかわからないがcsvを正規化しているのだと思えば、まあ悪いことではない。

$ gzip -dc work/csv/sample_01.csv.gz | wc -l
       6
$ cat work/sample_output_000.00.csv | wc -l
       5

パーサープラグインの開発

以下、パーサープラグインの開発の手順を示す。この記事で示すパーサーの例は実際にやりたかったものとは違うが、以下の記事で似かよった問題に対応しており、非常に参考になった。そこでこれを土台に説明したい。

複数行からなるログを解析するために、EmbulkのparserプラグインをRubyで開発する話(準備編)

まずはパーサーの雛形である。ここでは multiline という名前になっているが1行入力(csvでもない)し、1行出力するだけのパーサーとなっている。

multiline.rb
module Embulk
  module Parser
    class MultiLine < ParserPlugin
      Plugin.register_parser("multiline", self)

      def self.transaction(config, &control)
        parser_task = config.load_config(Java::LineDecoder::DecoderTask)

        task = {
          "decoder_task" => DataSource.from_java(parser_task.dump)
          # "property1" => config.param("property1", :string),
          # "property2" => config.param("property2", :integer, default: 0),
        }

        columns = [
          Column.new(0, "line", :string),
        ]

        yield(task, columns)
      end

      def init
        # initialization code:
        # @property1 = task["property1"]
        # @property2 = task["property2"]

        @decoder_task = task.param("decoder_task", :hash).load_task(Java::LineDecoder::DecoderTask)
      end

      def run(file_input)
        decoder = Java::LineDecoder.new(file_input.instance_eval { @java_file_input }, @decoder_task)

        while decoder.nextFile
          while line = decoder.poll
            page_builder.add([line])
          end
        end

        page_builder.finish
      end
    end

  end
end

いや、ちょっと複雑じゃないか?ETL処理なんて業務の脇役なので多分このコードを次に見るときは内容を忘れてる。

このプラグインのポイントは以下

  • embulkのパーサーは行指向ではない

性能のためか?もしくはバイナリファイルを扱うこともあるから?なのか、行単位で処理するスクリプトを書く場合は上記のJava::LineDecoderというクラスを使う必要があるらしい。これを使えばdecoder.pollで次の行を取ってきてくれるらしい。ちょっと複雑なのはそういう処理も入っているから。

まあ良しとしよう。順番が前後するがこのプラグインは以下のステップにより作成する。

最初にプラグインの雛形を生成。(ここではrubyのparserプラグインの雛形をmultilineという名前で生成するコマンドを実行している)

$ embulk new ruby-parser multiline
2017-11-29 07:06:14.108 +0900: Embulk v0.8.38

********************************** INFORMATION **********************************
  Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
    compatibility-breaking changes and key feature updates.
  https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************

Creating embulk-parser-multiline/

Plugin template is successfully generated.
Next steps:

  $ cd embulk-parser-multiline
  $ bundle install                      # install one using rbenv & rbenv-build
  $ bundle exec rake                    # build gem to be released
  $ bundle exec embulk run config.yml   # you can run plugin using this command

丁寧に今後のステップの説明が出ているが、開発するmultilineはgem化はしないのでこのステップは踏まない。まずは生成されたファイル群を見てみる。

$ tree embulk-parser-multiline
embulk-parser-multiline
|-- Gemfile
|-- LICENSE.txt
|-- README.md
|-- Rakefile
|-- embulk-parser-multiline.gemspec
`-- lib
    `-- embulk
        |-- guess
        |   `-- multiline.rb
        `-- parser
            `-- multiline.rb              <-- いじるのはこのファイル

4 directories, 7 files

gem 化は考えないので今回触るのは lib/embulk/parser/multiline.rb だけである。他のファイルは一切触らない。
消してしまってもいい。

このファイルを先に記した内容に置き換えてまずは実行してみる。

$ vi embulk-parser-multiline/lib/embulk/parser/multiline.rb
〜内容を差し替える〜

multiline を利用するようにパーサーを変更する。

$ diff -u config.yml{.v2,}
--- config.yml.v2	2017-11-29 09:10:36.000000000 +0900
+++ config.yml	2017-11-29 09:10:45.000000000 +0900
@@ -5,7 +5,7 @@
   - {type: gzip}
   parser:
     charset: UTF-8
-    type: csv
+    type: multiline
     skip_header_lines: 1
     columns:
     - {name: id, type: long}

そして、実行するのだが、実行の際には、 -I オプションでプラグインの場所を指定する必要がある。これがないと、

Error: ParserPlugin 'multiline' is not found.
Unknown parser plugin 'multiline'. embulk/parser/multiline.rb is not installed. Run 'embulk gem search -rd embulk-parser' command to find plugins.

というエラーが発生するので注意。この部分は色々あって結構ハマった。

$ embulk preview -I embulk-parser-multiline/lib config.yml
2017-11-29 09:17:24.234 +0900: Embulk v0.8.38

********************************** INFORMATION **********************************
  Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
    compatibility-breaking changes and key feature updates.
  https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************

2017-11-29 09:17:36.936 +0900 [INFO] (0001:preview): Listing local files at directory 'work/csv' filtering filename by prefix 'sample_'
2017-11-29 09:17:36.938 +0900 [INFO] (0001:preview): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-11-29 09:17:36.940 +0900 [INFO] (0001:preview): Loading files [work/csv/sample_01.csv.gz]
2017-11-29 09:17:36.948 +0900 [INFO] (0001:preview): Try to read 32,768 bytes from input source
2017-11-29 09:17:37.003 +0900 [INFO] (0001:preview): Loaded plugin embulk/parser/multiline from a load path
+---------------------------------------------------------------------+
|                                                         line:string |
+---------------------------------------------------------------------+
|                                    id,account,time,purchase,comment |
|                         1,32864,2015-01-27 19:23:49,20150127,embulk |
|                   2,14824,2015-01-27 19:01:23,20150127,embulk jruby |
| 3,27559,2015-01-28 02:20:02,20150128,"Embulk ""csv"" parser plugin" |
|                           4,11270,2015-01-29 11:54:36,20150129,NULL |
|                                                                     |
+---------------------------------------------------------------------+

csv のプラグインを使ってないので列が1つになっている。(末尾の空行も残っているようだ)
何もしないプラグインが正常に動作することが確認できた。

embulk のデバッグ

ここで、ちょっとデバッグのことを考える。embulk を使っていてうまく動作しないときに以下のようなテクニックが重要になってくると思った。

irb

irb を使うとembulkの状態を確認することができる。例えば

$ embulk irb -I embulk-parser-multiline/lib config.yml
2017-11-29 09:20:11.768 +0900: Embulk v0.8.38

********************************** INFORMATION **********************************
  Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
    compatibility-breaking changes and key feature updates.
  https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************

irb(main):001:0> puts $LOAD_PATH
uri:classloader:/META-INF/jruby.home/lib/ruby/2.3/site_ruby
uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib
=> nil
irb(main):002:0> 

などとして、ruby のグローバル変数 $LOAD_PATH がどうなっているか?などの確認が取れる。
ただ、自分の経験で、この irb が役に立ったことはない。今後、使える場面もあるかも知れないので参考に記載した。

メインのスクリプトの書き換え

embulkの実行ファイル(これは実際にはjarファイル)を展開するとjavaのクラスファイルと一緒にrubyスクリプトが含まれていることがわかる

$ unzip -d embulk-X.X.X ~/.embulk/bin/embulk 
$ ls embulk-X.X.X
LICENSE       bundler/      edu/          javax/        license.html  me/           org/
META-INF/     bundler.rb    embulk/       jni/          licenses/     msgpack/      tables/
NOTICE        ch/           embulk.rb     jnr/          liquid/       msgpack.rb
about.html    com/          io/           jruby/        liquid.rb     net/

例えば、以下のスクリプトに修正を加えれば embulk がプラグインを見つけられない時に原因を調べることができる。
(embulkのエラーメッセージから該当のスクリプトファイルを突き止めた)

$ diff -u embulk-X.X.X/embulk/plugin_registry.rb{.org,}
--- embulk-X.X.X/embulk/plugin_registry.rb.org	2017-11-29 09:50:31.000000000 +0900
+++ embulk-X.X.X/embulk/plugin_registry.rb	2017-11-29 09:52:31.000000000 +0900
@@ -35,6 +35,7 @@
 
     def search(type)
       name = "#{@search_prefix}#{type}"
+STDERR.puts "name ==> #{name.inspect}"
       begin
         require_and_show name
         return true
@@ -46,6 +47,7 @@
 
       # search from $LOAD_PATH
       load_path_files = $LOAD_PATH.map do |lp|
+STDERR.puts "lp ==> #{lp.inspect}"
         lpath = File.expand_path(File.join(lp, "#{name}.rb"))
         File.exist?(lpath) ? lpath : nil
       end

以下の実行例で最初に出る Embulk version unavailable due to failure to access the jar file. は無視すること。メッセージからjarファイルよりversion情報を取得する処理があるらしい。

$ java -classpath embulk-X.X.X org.embulk.cli.Main preview config.yml
Embulk version unavailable due to failure to access the jar file.
java.io.FileNotFoundException: /Users/koji/embulk-workspace/embulk-X.X.X (Is a directory)
	at java.util.zip.ZipFile.open(Native Method)
	at java.util.zip.ZipFile.<init>(ZipFile.java:219)
	at java.util.zip.ZipFile.<init>(ZipFile.java:149)
	at java.util.jar.JarFile.<init>(JarFile.java:166)
	at java.util.jar.JarFile.<init>(JarFile.java:103)
	at org.embulk.EmbulkVersion.getSelfJarManifest(EmbulkVersion.java:61)
	at org.embulk.EmbulkVersion.<clinit>(EmbulkVersion.java:19)
	at org.embulk.cli.Main.main(Main.java:27)
2017-11-29 10:01:08.800 +0900: Embulk v[embulk-version-unavailable]

********************************** INFORMATION **********************************
  Join us! Embulk-announce mailing list is up for IMPORTANT annoucement such as
    compatibility-breaking changes and key feature updates.
  https://groups.google.com/forum/#!forum/embulk-announce
*********************************************************************************

[WARN] Embulk looks running out of the Embulk jar file. It is unsupported.
2017-11-29 10:01:18.484 +0900 [INFO] (0001:preview): Listing local files at directory 'work/csv' filtering filename by prefix 'sample_'
2017-11-29 10:01:18.487 +0900 [INFO] (0001:preview): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-11-29 10:01:18.490 +0900 [INFO] (0001:preview): Loading files [work/csv/sample_01.csv.gz]
2017-11-29 10:01:18.500 +0900 [INFO] (0001:preview): Try to read 32,768 bytes from input source
name ==> "embulk/parser/multiline"
lp ==> "uri:classloader:/META-INF/jruby.home/lib/ruby/2.3/site_ruby"
lp ==> "uri:classloader:/META-INF/jruby.home/lib/ruby/stdlib"
lp ==> "/Users/koji/embulk-workspace/embulk-X.X.X"

〜スタックトレースは省略〜

Error: ParserPlugin 'multiline' is not found.
Unknown parser plugin 'multiline'. embulk/parser/multiline.rb is not installed. Run 'embulk gem search -rd embulk-parser' command to find plugins.

どうやら、-I オプションは irb では認識されていないが、preview の時は$LOAD_PATHが変わっているらしい。

パーサープラグインの開発(続き)

単純な何もしないパーサーがうまくいったが、もう少し改良を加える。

自分が実際に作りたかったのはcsvパーサーの変形であったので、組み込みのcsvパーサを改良するものとしたい。
(この命題ではcsvパーサーの出力をさらにフィルタ処理するfilterプラグインでも良かったかもしれない)

どうやって、突き止めたかは忘れてしまったが、以下のようにするとcsvパーサーの実装を流用できる

  • java_import で、javaのクラスを使えるようにする(これはjrubyの機能)
  • CsvParserPlugin クラスで csv パーサーと同じconfig項目を読み込む
  • CsvTokenizer でカラム毎の処理を行う

config.ymlのskip_header_linesを参照して、ヘッダを読み飛ばす処理も実装した

multiline.rb
require 'pp'

module Embulk
  module Java
    java_import 'org.embulk.standards.CsvParserPlugin'
    java_import 'org.embulk.standards.CsvTokenizer'
  end
  module Parser

    class Multiline < ParserPlugin
      Plugin.register_parser("multiline", self)

      def self.transaction(config, &control)
        plugin_task = config.load_config(Java::CsvParserPlugin::PluginTask)

        task = {
          "plugin_task" => DataSource.from_java(plugin_task.dump),
          # "property1" => config.param("property1", :string),
          # "property2" => config.param("property2", :integer, default: 0),
        }

        columns = [
          Column.new(0, "id",       :string),
          Column.new(1, "account",  :string),
          Column.new(2, "time",     :string),
          Column.new(3, "purchase", :string),
          Column.new(4, "comment",  :string),
        ]

        yield(task, columns)
      end

      def init
        # initialization code:
        # @property1 = task["property1"]
        # @property2 = task["property2"]

        @plugin_task = task.param("plugin_task", :hash)
                           .load_task(Java::CsvParserPlugin::PluginTask)
      end

      def run(file_input)
        java_file_input = file_input.instance_eval { @java_file_input }
        decoder = Java::LineDecoder.new(java_file_input, @plugin_task)
        tokenizer = Java::CsvTokenizer.new(decoder, @plugin_task)

        while tokenizer.nextFile
          skip_header_lines = @plugin_task.skip_header_lines
          while skip_header_lines > 0
            skip_header_lines -= 1
            break if !tokenizer.skipHeaderLine
          end

          while tokenizer.nextRecord
            line = []
            while tokenizer.hasNextColumn
              line.push tokenizer.nextColumn
            end
            page_builder.add(line)
          end
        end

        page_builder.finish
      end
    end

  end
end
$ embulk preview -I embulk-parser-multiline/lib config.yml
〜途中省略〜
+-----------+----------------+---------------------+-----------------+----------------------------+
| id:string | account:string |         time:string | purchase:string |             comment:string |
+-----------+----------------+---------------------+-----------------+----------------------------+
|         1 |          32864 | 2015-01-27 19:23:49 |        20150127 |                     embulk |
|         2 |          14824 | 2015-01-27 19:01:23 |        20150127 |               embulk jruby |
|         3 |          27559 | 2015-01-28 02:20:02 |        20150128 | Embulk "csv" parser plugin |
|         4 |          11270 | 2015-01-29 11:54:36 |        20150129 |                       NULL |
+-----------+----------------+---------------------+-----------------+----------------------------+

複数ファイルの処理

パーサー開発の雛形ができた。ここから実際に作りたいものを実装する例を示しても良いが今回の記事の本質ではないので、そこは省略する。
パーサは完成したものとして、実質何もしないパーサーで10ファイル程度の処理を行ってみる。

embulk では、config.yml の内容を動的に変更するのに liquid というテンプレートエンジンを使うことがスタンダードらしい。
そこで、以下の config.yml.liquid を用意する

config.yml.liquid
in:
  type: file
  path_prefix: {{env.input_path}}/{{env.input_file_prefix}}
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    type: multiline
    skip_header_lines: 1
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out:
  type: file
  path_prefix: {{env.output_path}}/{{env.output_file_prefix}}
  file_ext: csv
  formatter:
    type: csv

exec:
  min_output_tasks: 1

そして、以下のように実行してみる。(長いので折り返している)
つまり、liquid を利用することで環境変数を通して、パラメータを動的に変更可能にしている。

$ input_path=work/csv \
  input_file_prefix=sample_ \
  output_path=work \
  output_file_prefix=sample_output_ \
  embulk run -I embulk-parser-multiline/lib config.yml.liquid

うまくいったら複数ファイルの出力を試す

$ time for i in $(seq 1 10); do input_path=work/csv input_file_prefix=sample_ output_path=work output_file_prefix=sample_output_${i}_ embulk run -I embulk-parser-multiline/lib config.yml.liquid; done
〜途中省略〜
real        2m47.777s
user        3m15.435s
sys          0m5.956s

1ファイル処理するのにだいたい15秒くらいかかっていたわけだが、実際複数ファイル試した時には
10ファイル処理するのに実時間で3分弱。じゃあ、1万ファイルだと…40時間!これは使い物にならない!

しかし、ここまでのサンクコストが高すぎるので後戻りできない。このまま突き進むことにしてしまった。

EmbulkEmbed の利用

起動処理の時間の問題ということで、embulkを埋め込む対処を試してみた。

参考: ひしだま's 技術メモページ - Javaから実行する方法

src/EmbulkRunner.java
import org.embulk.EmbulkEmbed;
import org.embulk.EmbulkEmbed.Bootstrap;
import org.embulk.config.ConfigLoader;
import org.embulk.config.ConfigSource;

import java.io.File;
import java.io.IOException;

public class EmbulkRunner {
    public static void main(String[] args) throws IOException {
        Bootstrap bootstrap = new EmbulkEmbed.Bootstrap();

        EmbulkEmbed embulk = bootstrap.initializeCloseable();
        try {
            ConfigLoader loader = embulk.newConfigLoader();
            ConfigSource config = loader.fromYamlFile(new File("./config.yml"));

            ConfigSource in = config.getNested("in");
            in.set("path_prefix", "./work/csv/sample_");
            embulk.run(config);
        } finally {
            embulk.destroy();
        }
    }
}
コンパイル
$ javac -classpath ~/.embulk/bin/embulk src/EmbulkRunner.java

※ 以下、WindowsとUnix系OSではclasspathのパスの区切り文字が違うので注意(よくこれにハマる)

実行(Mac、Linuxの場合)
$ java -classpath ~/.embulk/bin/embulk:src EmbulkRunner
実行(Windowsの場合)
$ java -classpath ~/.embulk/bin/embulk;src EmbulkRunner
実行結果
Exception in thread "main" org.embulk.exec.PartialExecutionException: 
  org.embulk.config.ConfigException: ParserPlugin 'multiline' is not found.

当然、プラグインの場所を示していないので失敗する。これはembulkのソースを確認した結果、以下のようにsystemConfig.ymlというファイルを用意し、それを読み込ませれば良いことがわかった。

systemConfig.yml
jruby_load_path:
  - embulk-parser-multiline/lib
$ diff -u src/EmbulkRunner.java{.org,}
--- src/EmbulkRunner.java.org	2017-11-29 13:30:27.000000000 +0900
+++ src/EmbulkRunner.java	2017-11-29 13:27:51.000000000 +0900
@@ -9,6 +9,8 @@
 public class EmbulkRunner {
     public static void main(String[] args) throws IOException {
         Bootstrap bootstrap = new EmbulkEmbed.Bootstrap();
+        ConfigSource systemConfig = bootstrap.getSystemConfigLoader().fromYamlFile(new File("./systemConfig.yml"));
+        bootstrap.setSystemConfig(systemConfig);
 
         EmbulkEmbed embulk = bootstrap.initializeCloseable();
         try {
# コンパイル
$ javac -classpath ~/.embulk/bin/embulk src/EmbulkRunner.java

# 実行
$ java -classpath ~/.embulk/bin/embulk:src EmbulkRunner
2017-11-29 13:34:16.943 +0900 [INFO] (0001:transaction): Loaded plugin embulk/parser/multiline from a load path
2017-11-29 13:34:16.960 +0900 [INFO] (0001:transaction): Listing local files at directory 'work/csv' filtering filename by prefix 'sample_'
2017-11-29 13:34:16.962 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-11-29 13:34:16.967 +0900 [INFO] (0001:transaction): Loading files [work/csv/sample_01.csv.gz]
2017-11-29 13:34:17.068 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / tasks=1
2017-11-29 13:34:17.115 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2017-11-29 13:34:17.165 +0900 [INFO] (0013:task-0000): Writing local file './work/sample_output_000.00.csv'
2017-11-29 13:34:17.248 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}

うまくいった。内部で10回ループするように修正してみる。
EmbulkEmbedの場合、liquidを使わなくてもconfigを動的に変えられるのが良い。

$ diff -u src/EmbulkRunner.java{.v1,}
--- src/EmbulkRunner.java.v1	2017-11-29 13:44:39.000000000 +0900
+++ src/EmbulkRunner.java	2017-11-29 13:47:55.000000000 +0900
@@ -16,10 +16,14 @@
         try {
             ConfigLoader loader = embulk.newConfigLoader();
             ConfigSource config = loader.fromYamlFile(new File("./config.yml"));
-
             ConfigSource in = config.getNested("in");
-            in.set("path_prefix", "./work/csv/sample_");
-            embulk.run(config);
+            ConfigSource out = config.getNested("out");
+
+            for (int i = 0; i < 10; i++) {
+                in.set("path_prefix", "./work/csv/sample_");
+                out.set("path_prefix", "./work/sample_output_" + i + "_");
+                embulk.run(config);
+            }
         } finally {
             embulk.destroy();
         }
# コンパイル
$ javac -classpath ~/.embulk/bin/embulk src/EmbulkRunner.java

# 実行
$ time java -classpath ~/.embulk/bin/embulk:src EmbulkRunner
real    0m16.338s
user    0m16.653s
sys     0m0.673s

10回で16秒、100回で19秒だった

100回の結果
real    0m19.207s
user    0m23.418s
sys     0m1.091s

最初の起動に時間がかかっているだけで十分速いように思える。

外部のプラグインの利用

EmbulkEmbed を利用しつつ外部プラグインも利用してみる。ここでは、embulk-filter-columnを試す。

最初は素のembulkで動くことを確認する。

プラグインをインストールする。

$ embulk gem install embulk-filter-column
2017-11-29 14:04:10.398 +0900: Embulk v0.8.38

Gem plugin path is: /Users/koji/.embulk/jruby/2.3.0

Fetching: embulk-filter-column-0.7.1.gem (100%)
Successfully installed embulk-filter-column-0.7.1
1 gem installed

~/.embulk/jruby 配下にインストールされた

$ tree ~/.embulk
/Users/koji/.embulk
|-- bin
|   `-- embulk
`-- jruby
    `-- 2.3.0
        |-- build_info
        |-- cache
        |   `-- embulk-filter-column-0.7.1.gem
        |-- doc
        |-- extensions
        |-- gems
        |   `-- embulk-filter-column-0.7.1
〜省略〜
        |       |-- classpath
        |       |   |-- JsonPathCompiler-0.0.12.jar
        |       |   |-- accessors-smart-1.1.jar
        |       |   |-- asm-5.0.3.jar
        |       |   |-- embulk-filter-column-0.7.1.jar
        |       |   |-- json-smart-2.2.1.jar
        |       |   `-- slf4j-api-1.7.21.jar
〜省略〜
        |       |-- lib
        |       |   `-- embulk
        |       |       `-- filter
        |       |           `-- column.rb
〜省略〜
        `-- specifications
            `-- embulk-filter-column-0.7.1.gemspec

32 directories, 41 files
$ 

config.yml を修正して column プラグインを使う設定に変える(最後の列にdという時刻の列を追加している)。

$ diff -u config.yml{.v3,}
--- config.yml.v3	2017-11-29 14:00:14.000000000 +0900
+++ config.yml	2017-11-29 14:02:07.000000000 +0900
@@ -13,6 +13,10 @@
     - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
     - {name: purchase, type: timestamp, format: '%Y%m%d'}
     - {name: comment, type: string}
+filters:
+  - type: column
+    add_columns:
+      - {name: d, type: timestamp, default: "2015-07-13", format: "%Y-%m-%d"}
 out:
   type: file
   path_prefix: ./work/sample_output_

実行してみる

$ embulk run -I embulk-parser-multiline/lib config.yml
2017-11-29 14:09:59.569 +0900 [INFO] (0001:transaction): Loaded plugin embulk-filter-column (0.7.1)
2017-11-29 14:09:59.662 +0900 [INFO] (0001:transaction): Loaded plugin embulk/parser/multiline from a load path
2017-11-29 14:09:59.682 +0900 [INFO] (0001:transaction): Listing local files at directory 'work/csv' filtering filename by prefix 'sample_'
2017-11-29 14:09:59.684 +0900 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2017-11-29 14:09:59.689 +0900 [INFO] (0001:transaction): Loading files [work/csv/sample_01.csv.gz]
2017-11-29 14:09:59.819 +0900 [INFO] (0001:transaction): Using local thread executor with max_threads=8 / tasks=1
2017-11-29 14:09:59.877 +0900 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2017-11-29 14:09:59.949 +0900 [INFO] (0015:task-0000): Writing local file './work/sample_output_000.00.csv'
2017-11-29 14:10:00.053 +0900 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2017-11-29 14:10:00.059 +0900 [INFO] (main): Committed.
2017-11-29 14:10:00.059 +0900 [INFO] (main): Next config diff: {"in":{"last_path":"work/csv/sample_01.csv.gz"},"out":{}}

最初に、Loaded plugin embulk-filter-column (0.7.1) という表示があり、うまく動いている。
出力結果も(時刻の書式が気にくわないが)列が追加されている。

$ cat work/sample_output_000.00.csv 
id,account,time,purchase,comment,d
1,32864,2015-01-27 19:23:49,20150127,embulk,2015-07-13 00:00:00.000000 +0000
2,14824,2015-01-27 19:01:23,20150127,embulk jruby,2015-07-13 00:00:00.000000 +0000
3,27559,2015-01-28 02:20:02,20150128,"Embulk ""csv"" parser plugin",2015-07-13 00:00:00.000000 +0000
4,11270,2015-01-29 11:54:36,20150129,NULL,2015-07-13 00:00:00.000000 +0000

なお、出力時の時刻の書式は以下で変えられる。CSV formatter plugin - Example(個々の列毎にしか指定できないのだろうか?)

$ diff -u config.yml{.v4,}
--- config.yml.v4	2017-11-29 14:18:26.000000000 +0900
+++ config.yml	2017-11-29 14:18:39.000000000 +0900
@@ -23,6 +23,8 @@
   file_ext: csv
   formatter:
     type: csv
+    column_options:
+      d: {format: '%Y-%m-%d %H:%M:%S'}
 
 exec:
   min_output_tasks: 1

EmbulkEmbed も試してみる

plugin のインストールとconfig.ymlの書き換えのみなので再コンパイルせずに実行する。

$ time java -classpath ~/.embulk/bin/embulk:src EmbulkRunner
Exception in thread "main" org.embulk.config.ConfigException: FilterPlugin 'column' is not found.
Unknown filter plugin 'column'. embulk/filter/column.rb is not installed. Run 'embulk gem search -rd embulk-filter' command to find plugins.

失敗した gem でインストールしたプラグインが見つかってない。

なお、前に紹介したデバッグ手法(メインのスクリプトを差し替えて実行する)をEmbulkEmbedで行いたい場合、以下のように実行すれば良い。

$ time java -classpath embulk-X.X.X:src EmbulkRunner

この問題の対処方法の結論はsystemConfig.ymlを以下のように修正すれば良い。

$ diff -u systemConfig.yml{.v1,}
--- systemConfig.yml.v1	2017-11-29 14:24:58.000000000 +0900
+++ systemConfig.yml	2017-11-29 14:25:09.000000000 +0900
@@ -1,2 +1,3 @@
 jruby_load_path:
   - embulk-parser-multiline/lib
+jruby_use_default_embulk_gem_home: true

あるいは gem を使わずこの環境に閉じ込めたい場合は以下のようにする

プラグインの置き場所を embulk-parser-multiline/lib および gem の場所から plugin に変更する。

$ mkdir -p plugin/lib/embulk/{filter,parser}
$ cp embulk-parser-multiline/lib/embulk/parser/multiline.rb

$ cp ~/.embulk/jruby/2.3.0/gems/embulk-filter-column-0.7.1/lib/embulk/filter/column.rb plugin/lib/embulk/filter/

外部プラグインが必要とする jar ファイルも plugin/classpath にコピーする。
(このパスは embulk/filter/column.rb をみると分かるがファイルの場所がプラグインのスクリプトからの相対的な位置で解決されているため、この場所でなければならない)

$ mkdir plugin/classpath
$ cp ~/.embulk/jruby/2.3.0/gems/embulk-filter-column-0.7.1/classpath/*.jar plugin/classpath/

systemConfig.yml を書き換える (jruby_load_path を変更。jruby_use_default_embulk_gem_home は再度不要になる)

systemConfig.yml
jruby_load_path:
  - plugin/lib

実行。

$ time java -classpath ~/.embulk/bin/embulk:src EmbulkRunner
〜途中省略〜
real	0m19.743s
user	0m24.227s
sys	0m1.125s

うまくいった性能も問題ない。

まとめ

最終的に、以下の構成のファイルを用意することで高速に処理を動かすことができた。
作ったファイルの最終版はこれ

.
|-- config.yml                               config.yml
|-- embulk-X.X.X                             デバッグ用(embulk.jarを展開したもの。なくても良い)
|-- lib                                      embulk 本体
|   `-- embulk-0.8.38.jar
|-- plugin                                   外部、自作のプラグイン置き場
|   |-- classpath
|   |   |-- JsonPathCompiler-0.0.12.jar
|   |   |-- accessors-smart-1.1.jar
|   |   |-- asm-5.0.3.jar
|   |   |-- embulk-filter-column-0.7.1.jar
|   |   |-- json-smart-2.2.1.jar
|   |   `-- slf4j-api-1.7.21.jar
|   `-- lib
|       `-- embulk
|           |-- filter
|           |   `-- column.rb
|           `-- parser
|               `-- multiline.rb
|-- src                                      EmbulkEmbedを利用したJavaソースとそのコンパイル結果
|   |-- EmbulkRunner.class
|   `-- EmbulkRunner.java
|-- systemConfig.yml                         EmbulkEmbed用のsystemConfig.yml
`-- work                                     サンプルデータ
    `-- csv
        `-- sample_01.csv.gz

実行例

# 素の実行方法(単体動作確認、問題切り分け用)
$ java -jar lib/embulk-0.8.38.jar run -I plugin/lib config.yml

# EmbulkEmbed
$ javac -classpath lib/embulk-0.8.38.jar src/EmbulkRunner.java
$ time java -classpath lib/embulk-0.8.38.jar:src EmbulkRunner

課題

パーサ開発がrubyで実行できてもドライバ部分をjavaで書くようだと魅力が半減するように思う。

本当の本当にやりたかったのは対象ファイルの一覧がDBのテーブルに格納されているのでそれを読みつつ変換結果のパスで差し替えたいというものだった。これをjavaで書くのはjdbcでゴリゴリ書くか、javaのORマッパ準備してやるとか色々面倒。

今後、EmbulkEmbed をjrubyで動かせないか?とか試してみたい。
やってみた

おまけ

この記事で利用している systemConfig.ymlには以下のような設定もある

設定項目つ
log_path ログファイル名、出力を標準出力ではなくログファイルに書き出す
log_level 出力するログレベル "info" など

例:

systemConfig.yml
jruby_load_path:
  - plugin/lib
jruby_use_default_embulk_gem_home: true
log_path: ./embulk.log
log_level; info
20
22
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
20
22

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?