はじめに
Hadoop StreamingでMapReduceをシェルで実装する方法を記述します。
環境
- CentOS 6.5
- CDH5
構成
ホスト名 | IPアドレス | ResourceManager | Namenode | NodeManager | Datanode | JobHistoryServer |
---|---|---|---|---|---|---|
hadoop-master | 192.168.122.101 | ○ | ○ | - | - | ○ |
hadoop-master2 | 192.168.122.102 | ○ | ○ | - | - | - |
hadoop-slave | 192.168.122.111 | - | - | ○ | ○ | - |
hadoop-slave2 | 192.168.122.112 | - | - | ○ | ○ | - |
hadoop-slave3 | 192.168.122.113 | - | - | ○ | ○ | - |
hadoop-client | 192.168.122.201 | - | - | - | - | - |
クラスタの構築方法は、CDH5でhadoopのクラスタを構築するをご参照ください。
MapReduceの実装
今回は、ファイルに含まれる単語の数をカウントするプログラムを作成します。
- mapperの実装
# !/bin/bash
while read -a words
do
for word in ${words[*]}
do
printf "%s\t1\n" $word
done
done
- reducerの実装
# !/bin/bash
current_key=""
total=0
while read data
do
key=$(echo $data | awk '{print $1}')
value=$(echo $data | awk '{print $2}')
if [ "$current_key" != "" ] && [ "$current_key" != "$key" ] ; then
printf '%s\t%d\n' "$current_key" "$total"
current_key=$key
total=$value
else
current_key=$key
total=$((total+value))
fi
done
printf '%s\t%d\n' "$current_key" "$total"
動作確認
クライアントで動作確認をします。
- 動作確認用に解析対象のファイルをHDFSへputします。
Java Ruby Python Java Ruby Python Java
$ sudo -u hdfs hadoop fs -put sample.txt input/sample.txt
$ sudo -u hdfs hadoop fs -ls input/
Found 1 items
-rw-r--r-- 3 hdfs hadoop 39 2014-09-20 06:59 input/sample.txt
- Hadoop Streamingを使用してMapReduceを実行します。
$ sudo -u hdfs hadoop \
jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-files /tmp/sample_mapper.sh,/tmp/sample_reducer.sh \
-input input/sample.txt \
-output output/sample \
-mapper sample_mapper.sh \
-reducer sample_reducer.sh
Hadoop Streamingを使用する場合は、jarに hadoop-streaming.jar を指定します。
その他のオプションは以下の通りとなります。
オプション | 概要 |
---|---|
files | Map/Reduceのクラスタへコピーするファイルを指定します |
input | 解析の対象となるファイルを指定します |
output | 解析結果を保存するHDFS上のパスを指定します |
mapper | map処理を実装したファイル名を指定します |
reducer | reduce処理を実装したファイル名を指定します |
filesオプションで指定したファイルは、各ノードへコピーされ、そのコピーへのシンボリックリングが各タスクの作業用ディレクトリに作成されます。
上記の場合は、filesオプションで、 /tmp/sample_mapper.sh __/tmp/sample_reducer.sh__を指定しているため、2つのファイルが各ノードへコピーされます。
また、各タスクの作業用ディレクトリに sample_mapper.sh __sample_reducer.sh__という名前のシンボリックリンクが作成されます。
そのため、mapperオプションへ、 sample_mapper.sh reducerオプションへ、 __sample_reducer.sh__を指定すればよいことになります。
- 解析結果を確認します。
$ sudo -u hdfs hadoop fs -cat 'output/sample/part-*'
Java 3
Python 2
Ruby 2