はじめに
ここでは Embulk の plugin の 1 つである MapReduceExecutor を利用して、Embulk を Hadoop 2.6 上で動かす方法を説明します。分散処理フレームワークである Hadoop 上で Embulk を動かせると、全体として性能を向上させるためのリソース管理が容易になったたり、一時的なエラーが発生した際のリトライなどを自動で行ってくれるなど、様々な恩恵を受けられることが期待できます。
Embulk は plugin 機構をもっており、データの読み込み先と書き出し先を(plugin があれば)自由に選択できることはよく知られておりますが、実は処理を実行する部分も plugin として実装できます。MapReduceExecutor はその 1 実装です。Embulk のデフォルトの実行 plugin は LocalExecutorPlugin です。
Hadoop 2.6 の設定
Apache Hadoop の公式サイトからダウンロードして起動します。動作環境は自分の手元の MacBook Pro です。今回ダウンロードしたのは 2.6.0 の binary 版です。これを展開し、適切に設定ファイルを編集します。例えば:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.name.dir</name>
<value>file:///tmp/hadoopdata/hdfs/namenode</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>file:///tmp/hadoopdata/hdfs/datanode</value>
</property>
</configuration>
その後各種デーモンを起動させ、問題なくデーモンが動いているようであれば、手元で Hadoop jobs を実行できる状態になっているはずです。Namenode の起動が初めてであれば、HDFS のフォーマットを事前にしておく必要があります。
# デーモンの起動
$ sbin/start-dfs.sh; sbin/start-yarn.sh
# デーモンの停止
$ sbin/stop-dfs.sh; sbin/stop-yarn.sh
Embulk MapReduceExecutor の設定
Embulk の subcommand の bundle
を利用すれば、独自の plugin 環境を作成できますので、今回はこちらを利用し、MapReduceExecutor をインストールします。まずは bundle ディレクトリ mapred_executor_bundle
を作ります。
bundle
subcommand の詳細: Using plugin bundle
$ bin/embulk bundle mapred_executor_bundle
2015-06-19 16:51:39.568 -0700: Embulk v0.6.11
Initializing mapred_executor_bundle...
Creating mapred_executor_bundle/.bundle/config
Creating mapred_executor_bundle/embulk/input/example.rb
Creating mapred_executor_bundle/embulk/output/example.rb
Creating mapred_executor_bundle/embulk/filter/example.rb
Creating mapred_executor_bundle/Gemfile
Fetching: bundler-1.10.4.gem (100%)
Successfully installed bundler-1.10.4
1 gem installed
The Gemfile specifies no dependencies
Resolving dependencies...
Bundle complete! 0 Gemfile dependencies, 1 gem now installed.
Bundled gems are installed into ..
そのディレクトリの下に Gemfile
ができていますので、こちらに MapReduceExecutor の plugin 名である embulk-executor-mapreduce
を宣言します。
source 'https://rubygems.org/'
gem 'embulk-executor-mapreduce', '0.1.1' # 追加
この状態で、先ほどの bundle
subcommand を再度実行すると、MapReduceExecutor がインストールされます。
$ bin/embulk bundle mapred_executor_bundle
2015-06-22 15:47:26.285 -0700: Embulk v0.6.11
Fetching gem metadata from https://rubygems.org/..
Fetching version metadata from https://rubygems.org/.
Resolving dependencies...
Installing embulk-executor-mapreduce 0.1.1
Using bundler 1.10.4
Bundle complete! 1 Gemfile dependency, 2 gems now installed.
Bundled gems are installed into ..
あとは Embulk の設定ファイルの exec
部分に type: mapreduce
と追加することで MapReduceExecutor がロードされ、使われるようになります。以下が config.yml
の exec
部分になります。
exec:
type: mapreduce
config_files:
- path/to/core-site.xml
- path/to/mapred-site.xml
- path/to/hdfs-site.xml
とりあえずは、先ほど編集した Hadoop の設定ファイルの内容を MapReduceExecutor が読み出せるように config_files
へファイル名を配列として列挙し指定すれば動くようになります。ちなみに type: mapreduce
の部分をコメントアウトすると、通常の Executor (LocalExecutorPlugin) で処理されます。
実際に動かしてみる
せっかくなので、S3 上の CSV ファイルを Embulk を使い、TreasureData へアップロードしてみましょう。先ほどの bundle ディレクトリにある Gemfile
に embulk-input-s3
と embulk-output-td
を追加宣言しましょう。(embulk-output-td
を使うには事前に TreasureData のアカウントを作成し、その API key を知っておく必要があります。)
source 'https://rubygems.org/'
gem 'embulk-executor-mapreduce', '0.1.1'
gem 'embulk-input-s3', '0.1.7' # 追加
gem 'embulk-output-td', '0.1.0' # 追加
再度、bundle
subcommand を実行すると plugins がインストールされます。インストールが成功したことを確認した後、Embulk を実行させる設定ファイル config.yml
を書きます。概略は以下です(すでに guess コマンドを実行済みです)。
exec:
type: mapreduce
config_files:
- path/to/core-site.xml
- path/to/mapred-site.xml
- path/to/hdfs-site.xml
in:
type: s3
access_key_id: my_access_key
secret_access_key: my_secret_access_key
bucket: my_bucket
path_prefix: csv_files/path/prefix
parser:
charset: UTF-8
newline: CRLF
type: csv
delimiter: ','
quote: ''
escape: ''
skip_header_lines: 1
columns:
- name: date_code
type: timestamp
format: '%Y%m%d'
...(省略)...
out:
type: td
apikey: my_treasuredata_apikey
database: my_db
table: my_table
time_column: date_code
Embulk 設定ファイル config.yml
を用意したら、コマンド実行:
$ bin/embulk run -b ./mapred_executor_bundle config.yml
2015-06-22 15:56:52.494 -0700: Embulk v0.6.11
2015-06-22 15:56:54.564 -0700: Loaded plugin embulk-executor-mapreduce (0.1.1)
2015-06-22 15:56:54.565 -0700: Loaded plugin embulk-input-s3 (0.1.7)
2015-06-22 15:56:54.565 -0700: Loaded plugin embulk-output-td (0.1.0)
2015-06-22 15:56:54.630 -0700: Loaded plugin embulk-executor-mapreduce (0.1.1)
2015-06-22 15:56:54.631 -0700: Loaded plugin embulk-input-s3 (0.1.7)
2015-06-22 15:56:54.631 -0700: Loaded plugin embulk-output-td (0.1.0)
2015-06-22 15:56:54.664 -0700: Loaded plugin embulk-executor-mapreduce (0.1.1)
2015-06-22 15:56:54.665 -0700: Loaded plugin embulk-input-s3 (0.1.7)
2015-06-22 15:56:54.665 -0700: Loaded plugin embulk-output-td (0.1.0)
2015-06-22 15:56:56.713 -0700 [INFO] (transaction): Logging initialized @8990ms
2015-06-22 15:56:57.535 -0700 [INFO] (transaction): Duplicating date_code:timestamp column to 'time' column for the data partitioning
2015-06-22 15:56:57.538 -0700 [INFO] (transaction): Create bulk_import session embulk_20150622_xxxx
2015-06-22 15:56:58.373 -0700 [WARN] (transaction): Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2015-06-22 15:57:01.224 -0700 [INFO] (transaction): Connecting to ResourceManager at /0.0.0.0:8032
2015-06-22 15:57:01.434 -0700 [WARN] (transaction): Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2015-06-22 15:57:05.654 -0700 [WARN] (transaction): No job jar file set. User classes may not be found. See Job or Job#setJar(String).
2015-06-22 15:57:05.889 -0700 [INFO] (transaction): number of splits:1
2015-06-22 15:57:06.047 -0700 [INFO] (transaction): Submitting tokens for job: job_1435013311362_0002
2015-06-22 15:57:06.167 -0700 [INFO] (transaction): Job jar is not present. Not adding any jar to the list of resources.
2015-06-22 15:57:06.459 -0700 [INFO] (transaction): Submitted application application_1435013311362_0002
2015-06-22 15:57:06.488 -0700 [INFO] (transaction): The url to track the job: http://localhost:8088/proxy/application_1435013311362_0002/
2015-06-22 15:57:06.492 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:11.505 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:16.514 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:21.596 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:26.601 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:31.612 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:36.619 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:41.676 -0700 [INFO] (transaction): map 100.0% reduce 0.0%
2015-06-22 15:57:46.684 -0700 [INFO] (transaction): map 100.0% reduce 0.0%
2015-06-22 15:57:46.865 -0700 [INFO] (transaction): Counters: 30
2015-06-22 15:57:46.865 -0700 [INFO] (transaction): Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=165741
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=35632445
HDFS: Number of bytes written=468
HDFS: Number of read operations=2
HDFS: Number of large read operations=0
HDFS: Number of write operations=3
Job Counters
Launched map tasks=1
Other local map tasks=1
Total time spent by all maps in occupied slots (ms)=17606
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=17606
Total vcore-seconds taken by all map tasks=17606
Total megabyte-seconds taken by all map tasks=18028544
Map-Reduce Framework
Map input records=1
Map output records=0
Input split bytes=55
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=560
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=191889408
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=0
2015-06-22 15:57:47.340 -0700 [INFO] (transaction): Performing bulk import session 'embulk_20150622_xxxx'
2015-06-22 15:58:25.747 -0700 [INFO] (transaction): job id: 27xxxxxx
2015-06-22 15:58:25.747 -0700 [INFO] (transaction): Committing bulk import session 'embulk_20150622_xxxx'
2015-06-22 15:58:25.748 -0700 [INFO] (transaction): valid records: 10
2015-06-22 15:58:25.748 -0700 [INFO] (transaction): error records: 0
2015-06-22 15:58:25.748 -0700 [INFO] (transaction): valid parts: 1
2015-06-22 15:58:25.748 -0700 [INFO] (transaction): error parts: 0
2015-06-22 15:58:35.683 -0700 [INFO] (transaction): Deleting bulk import session 'embulk_20150622_xxxx'
2015-06-22 15:58:36.231 -0700 [INFO] (main): Committed.
2015-06-22 15:58:36.231 -0700 [INFO] (main): Next config diff: {"in":{"last_path":"my_csv_files.csv"},"out":{"last_session":"embulk_20150622_xxxx"}}
今回は S3 上に 10 行の CSV ファイルを 1 つだけ用意したので、Job Counters の Launched map tasks
が 1 でありかつ Map-Reduce Framework の Map input records
も 1 となっています。後述しますが partitioning の設定がされていないため、Reduce tasks は 0 です。実際に TreasureData 側にもデータが送られていることをコンソール画面より確認してみましょう。
どのように動いているか
現在 MapReduceExecutor には、入力データに対して timestamp partitioning をする場合としない場合の 2 つのモードがあります。このモードの違いによって、Embulk のどの plugin が Hadoop 上のどこで動くのかが異なってきます。
Partitioning を行わないモードの場合、Embulk の input/output plugin は共に hadoop の mapper 部分で実行され、reducer 部分では特に何もしません(reduce task は 0 です)。Map task の数は、入力されるファイルの数で決まります。Hadoop 上で job が実行される前に、Embulk が読むファイル名の配列(ファイルの数とファイル名)を MapReduceExecutor を取得します。各 Map task は、ファイル名配列の添え字を渡されることで、処理をするファイル名を取得することができます。
Timestamp partitioning モードの場合、Embulk の input plugin は mapper 部分で、output plugin は reducer 部分で実行されます。ファイルを読みながら map task が実行されているとき、partitioning key として指定された timestamp カラム値の範囲によって、page オブジェクトが作られ、page 単位で shuffle され、reduce task の output plugin で使われます。Map task の数は partitioning を行わないモードの時と同様に、読むファイルの数で決まります。一方、reducer task の数は、map task の数と同数であれば効率が良いというわけではないので、ユーザが設定で変更することができます。
まとめ
今回は Hadoop 2.6.0 を利用しましたが、Hadoop 2.4.x 系の上でも同様の設定ファイルで Embulk を動かすことができます。データは日々増えていくものなので、Embulk の処理の並列度を手軽に上げられる方法の 1 つとして MapReduceExecutor が利用できるかと思います。