LoginSignup
12
12

More than 5 years have passed since last update.

はじめに

ここでは Embulk の plugin の 1 つである MapReduceExecutor を利用して、Embulk を Hadoop 2.6 上で動かす方法を説明します。分散処理フレームワークである Hadoop 上で Embulk を動かせると、全体として性能を向上させるためのリソース管理が容易になったたり、一時的なエラーが発生した際のリトライなどを自動で行ってくれるなど、様々な恩恵を受けられることが期待できます。

Embulk は plugin 機構をもっており、データの読み込み先と書き出し先を(plugin があれば)自由に選択できることはよく知られておりますが、実は処理を実行する部分も plugin として実装できます。MapReduceExecutor はその 1 実装です。Embulk のデフォルトの実行 plugin は LocalExecutorPlugin です。

Hadoop 2.6 の設定

Apache Hadoop の公式サイトからダウンロードして起動します。動作環境は自分の手元の MacBook Pro です。今回ダウンロードしたのは 2.6.0 の binary 版です。これを展開し、適切に設定ファイルを編集します。例えば:

core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:9000</value>
  </property>
</configuration>
mapred-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>
</configuration>
hdfs-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
   <name>dfs.replication</name>
   <value>1</value>
  </property>

  <property>
    <name>dfs.name.dir</name>
    <value>file:///tmp/hadoopdata/hdfs/namenode</value>
  </property>

  <property>
    <name>dfs.data.dir</name>
    <value>file:///tmp/hadoopdata/hdfs/datanode</value>
  </property>
</configuration>

その後各種デーモンを起動させ、問題なくデーモンが動いているようであれば、手元で Hadoop jobs を実行できる状態になっているはずです。Namenode の起動が初めてであれば、HDFS のフォーマットを事前にしておく必要があります。

# デーモンの起動
$ sbin/start-dfs.sh; sbin/start-yarn.sh
# デーモンの停止
$ sbin/stop-dfs.sh; sbin/stop-yarn.sh

Embulk MapReduceExecutor の設定

Embulk の subcommand の bundle を利用すれば、独自の plugin 環境を作成できますので、今回はこちらを利用し、MapReduceExecutor をインストールします。まずは bundle ディレクトリ mapred_executor_bundle を作ります。

bundle subcommand の詳細: Using plugin bundle

$ bin/embulk bundle mapred_executor_bundle
2015-06-19 16:51:39.568 -0700: Embulk v0.6.11
Initializing mapred_executor_bundle...
  Creating mapred_executor_bundle/.bundle/config
  Creating mapred_executor_bundle/embulk/input/example.rb
  Creating mapred_executor_bundle/embulk/output/example.rb
  Creating mapred_executor_bundle/embulk/filter/example.rb
  Creating mapred_executor_bundle/Gemfile
Fetching: bundler-1.10.4.gem (100%)
Successfully installed bundler-1.10.4
1 gem installed
The Gemfile specifies no dependencies
Resolving dependencies...
Bundle complete! 0 Gemfile dependencies, 1 gem now installed.
Bundled gems are installed into ..

そのディレクトリの下に Gemfile ができていますので、こちらに MapReduceExecutor の plugin 名である embulk-executor-mapreduce を宣言します。

mapreduce_executor_bundle/Gemfile
source 'https://rubygems.org/'
gem 'embulk-executor-mapreduce', '0.1.1'            # 追加

この状態で、先ほどの bundle subcommand を再度実行すると、MapReduceExecutor がインストールされます。

$ bin/embulk bundle mapred_executor_bundle
2015-06-22 15:47:26.285 -0700: Embulk v0.6.11
Fetching gem metadata from https://rubygems.org/..
Fetching version metadata from https://rubygems.org/.
Resolving dependencies...
Installing embulk-executor-mapreduce 0.1.1
Using bundler 1.10.4
Bundle complete! 1 Gemfile dependency, 2 gems now installed.
Bundled gems are installed into ..

あとは Embulk の設定ファイルの exec 部分に type: mapreduce と追加することで MapReduceExecutor がロードされ、使われるようになります。以下が config.ymlexec 部分になります。

config.yml
exec:
  type: mapreduce
  config_files:
    - path/to/core-site.xml
    - path/to/mapred-site.xml
    - path/to/hdfs-site.xml

とりあえずは、先ほど編集した Hadoop の設定ファイルの内容を MapReduceExecutor が読み出せるように config_files へファイル名を配列として列挙し指定すれば動くようになります。ちなみに type: mapreduce の部分をコメントアウトすると、通常の Executor (LocalExecutorPlugin) で処理されます。

実際に動かしてみる

せっかくなので、S3 上の CSV ファイルを Embulk を使い、TreasureData へアップロードしてみましょう。先ほどの bundle ディレクトリにある Gemfileembulk-input-s3embulk-output-td を追加宣言しましょう。(embulk-output-td を使うには事前に TreasureData のアカウントを作成し、その API key を知っておく必要があります。)

mapred_executor_bundle/Gemfile
source 'https://rubygems.org/'
gem 'embulk-executor-mapreduce', '0.1.1'
gem 'embulk-input-s3', '0.1.7'                 # 追加
gem 'embulk-output-td', '0.1.0'                # 追加

再度、bundle subcommand を実行すると plugins がインストールされます。インストールが成功したことを確認した後、Embulk を実行させる設定ファイル config.yml を書きます。概略は以下です(すでに guess コマンドを実行済みです)。

config.yml
exec:
  type: mapreduce
  config_files:
    - path/to/core-site.xml
    - path/to/mapred-site.xml
    - path/to/hdfs-site.xml
in:
  type: s3
  access_key_id: my_access_key
  secret_access_key: my_secret_access_key
  bucket: my_bucket
  path_prefix: csv_files/path/prefix
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: ''
    escape: ''
    skip_header_lines: 1
    columns:
    - name: date_code
      type: timestamp
      format: '%Y%m%d'
    ...(省略)...
out:
  type: td
  apikey: my_treasuredata_apikey
  database: my_db
  table: my_table
  time_column: date_code

Embulk 設定ファイル config.yml を用意したら、コマンド実行:

$ bin/embulk run -b ./mapred_executor_bundle config.yml
2015-06-22 15:56:52.494 -0700: Embulk v0.6.11
2015-06-22 15:56:54.564 -0700: Loaded plugin embulk-executor-mapreduce (0.1.1)
2015-06-22 15:56:54.565 -0700: Loaded plugin embulk-input-s3 (0.1.7)
2015-06-22 15:56:54.565 -0700: Loaded plugin embulk-output-td (0.1.0)
2015-06-22 15:56:54.630 -0700: Loaded plugin embulk-executor-mapreduce (0.1.1)
2015-06-22 15:56:54.631 -0700: Loaded plugin embulk-input-s3 (0.1.7)
2015-06-22 15:56:54.631 -0700: Loaded plugin embulk-output-td (0.1.0)
2015-06-22 15:56:54.664 -0700: Loaded plugin embulk-executor-mapreduce (0.1.1)
2015-06-22 15:56:54.665 -0700: Loaded plugin embulk-input-s3 (0.1.7)
2015-06-22 15:56:54.665 -0700: Loaded plugin embulk-output-td (0.1.0)
2015-06-22 15:56:56.713 -0700 [INFO] (transaction): Logging initialized @8990ms
2015-06-22 15:56:57.535 -0700 [INFO] (transaction): Duplicating date_code:timestamp column to 'time' column for the data partitioning
2015-06-22 15:56:57.538 -0700 [INFO] (transaction): Create bulk_import session embulk_20150622_xxxx
2015-06-22 15:56:58.373 -0700 [WARN] (transaction): Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2015-06-22 15:57:01.224 -0700 [INFO] (transaction): Connecting to ResourceManager at /0.0.0.0:8032
2015-06-22 15:57:01.434 -0700 [WARN] (transaction): Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2015-06-22 15:57:05.654 -0700 [WARN] (transaction): No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
2015-06-22 15:57:05.889 -0700 [INFO] (transaction): number of splits:1
2015-06-22 15:57:06.047 -0700 [INFO] (transaction): Submitting tokens for job: job_1435013311362_0002
2015-06-22 15:57:06.167 -0700 [INFO] (transaction): Job jar is not present. Not adding any jar to the list of resources.
2015-06-22 15:57:06.459 -0700 [INFO] (transaction): Submitted application application_1435013311362_0002
2015-06-22 15:57:06.488 -0700 [INFO] (transaction): The url to track the job: http://localhost:8088/proxy/application_1435013311362_0002/
2015-06-22 15:57:06.492 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:11.505 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:16.514 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:21.596 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:26.601 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:31.612 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:36.619 -0700 [INFO] (transaction): map 0.0% reduce 0.0%
2015-06-22 15:57:41.676 -0700 [INFO] (transaction): map 100.0% reduce 0.0%
2015-06-22 15:57:46.684 -0700 [INFO] (transaction): map 100.0% reduce 0.0%
2015-06-22 15:57:46.865 -0700 [INFO] (transaction): Counters: 30
2015-06-22 15:57:46.865 -0700 [INFO] (transaction): Counters: 30
    File System Counters
        FILE: Number of bytes read=0
        FILE: Number of bytes written=165741
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=35632445
        HDFS: Number of bytes written=468
        HDFS: Number of read operations=2
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=3
    Job Counters
        Launched map tasks=1
        Other local map tasks=1
        Total time spent by all maps in occupied slots (ms)=17606
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=17606
        Total vcore-seconds taken by all map tasks=17606
        Total megabyte-seconds taken by all map tasks=18028544
    Map-Reduce Framework
        Map input records=1
        Map output records=0
        Input split bytes=55
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=560
        CPU time spent (ms)=0
        Physical memory (bytes) snapshot=0
        Virtual memory (bytes) snapshot=0
        Total committed heap usage (bytes)=191889408
    File Input Format Counters
        Bytes Read=0
    File Output Format Counters
        Bytes Written=0
2015-06-22 15:57:47.340 -0700 [INFO] (transaction): Performing bulk import session 'embulk_20150622_xxxx'
2015-06-22 15:58:25.747 -0700 [INFO] (transaction):     job id: 27xxxxxx
2015-06-22 15:58:25.747 -0700 [INFO] (transaction): Committing bulk import session 'embulk_20150622_xxxx'
2015-06-22 15:58:25.748 -0700 [INFO] (transaction):     valid records: 10
2015-06-22 15:58:25.748 -0700 [INFO] (transaction):     error records: 0
2015-06-22 15:58:25.748 -0700 [INFO] (transaction):     valid parts: 1
2015-06-22 15:58:25.748 -0700 [INFO] (transaction):     error parts: 0
2015-06-22 15:58:35.683 -0700 [INFO] (transaction): Deleting bulk import session 'embulk_20150622_xxxx'
2015-06-22 15:58:36.231 -0700 [INFO] (main): Committed.
2015-06-22 15:58:36.231 -0700 [INFO] (main): Next config diff: {"in":{"last_path":"my_csv_files.csv"},"out":{"last_session":"embulk_20150622_xxxx"}}

今回は S3 上に 10 行の CSV ファイルを 1 つだけ用意したので、Job Counters の Launched map tasks が 1 でありかつ Map-Reduce Framework の Map input records も 1 となっています。後述しますが partitioning の設定がされていないため、Reduce tasks は 0 です。実際に TreasureData 側にもデータが送られていることをコンソール画面より確認してみましょう。

どのように動いているか

現在 MapReduceExecutor には、入力データに対して timestamp partitioning をする場合としない場合の 2 つのモードがあります。このモードの違いによって、Embulk のどの plugin が Hadoop 上のどこで動くのかが異なってきます。

Partitioning を行わないモードの場合、Embulk の input/output plugin は共に hadoop の mapper 部分で実行され、reducer 部分では特に何もしません(reduce task は 0 です)。Map task の数は、入力されるファイルの数で決まります。Hadoop 上で job が実行される前に、Embulk が読むファイル名の配列(ファイルの数とファイル名)を MapReduceExecutor を取得します。各 Map task は、ファイル名配列の添え字を渡されることで、処理をするファイル名を取得することができます。

Timestamp partitioning モードの場合、Embulk の input plugin は mapper 部分で、output plugin は reducer 部分で実行されます。ファイルを読みながら map task が実行されているとき、partitioning key として指定された timestamp カラム値の範囲によって、page オブジェクトが作られ、page 単位で shuffle され、reduce task の output plugin で使われます。Map task の数は partitioning を行わないモードの時と同様に、読むファイルの数で決まります。一方、reducer task の数は、map task の数と同数であれば効率が良いというわけではないので、ユーザが設定で変更することができます。

まとめ

今回は Hadoop 2.6.0 を利用しましたが、Hadoop 2.4.x 系の上でも同様の設定ファイルで Embulk を動かすことができます。データは日々増えていくものなので、Embulk の処理の並列度を手軽に上げられる方法の 1 つとして MapReduceExecutor が利用できるかと思います。

12
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
12