今更ながらにHadoopを勉強中。
オライリーの象本の5章(MapReduceアプリケーション開発)のmapとreduceのJavaのサンプルを動かしてみました。
そして、ただ動かしても面白く無いので、Scalaを使ってやってみました。
サンプルの内容は、気象データのログをもとにして、年ごとの最高気温を集計するというものです。
環境情報
試した環境は下記の通りです。
環境 | バージョンなど |
---|---|
OS | Mac OS X Yosemite(10.10.4) |
Java | 1.7.0 |
Scala | 2.11.1 |
Gradle | 2.0 |
Hadoop | 2.7.0 |
Macならこれらのインストールはhomebrewを使って |
$ brew install java scala gradle hadoop
でOKです。
テストデータの取得と集計の目的
はじめにhadoopを試すためのテストデータを入手する必要があります。
ブラウザで米国気候データセンター(NCDC)にアクセスして、1901-2015までのディレクトリのいずれかの中から、中のgzファイルをダウンロードします。
ftp://ftp.ncdc.noaa.gov/pub/data/noaa/
スクリプトを作っている人もいるので、これを使えば一括ダウンロードできます。
しかし、テストだけなら1ファイルぐらいダウンロードすればOKだと思います。
1ファイルには1行ごとに下記のフォーマットのログが記述されていて、初めの太字が年、次の太字が気温(摂氏の10倍)を示しているとのことです。
009201001099999
**1992**010100004+70933-008667FM-12+0009ENJA V0200101N00931012001CN0400001N9
**-0036**1-00771096881ADDAA106000091AG14000GF108991081081008001999999MD1310051+9999OA149901801REMSYN011333 91135
Mapper
Mapperでは入力データから集計のためのKeyとValueのMapの組み合わせを作っていきます。
プログラムの処理としては引数のvalueで渡されてきた文字列から、MapのKeyとValueを作って、contextにwriteすれば良い様子。
missingメソッドは気温データが入っていなかった時の処理です。
package hadoop
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import java.io.IOException
import org.apache.hadoop.mrunit.mapreduce.MapDriver
class MaxTemperatureMapper extends Mapper[LongWritable, Text, Text, IntWritable] {
type Context = Mapper[LongWritable, Text, Text, IntWritable]#Context
override def map(
key: LongWritable,
value: Text,
context: Context) {
val line = value.toString()
val year = line.substring(
MaxTemperatureMapper.YEAR_BEGIN_INDEX,
MaxTemperatureMapper.YEAR_END_INDEX)
val temp = line.substring(
MaxTemperatureMapper.TEMP_BEGIN_INDEX,
MaxTemperatureMapper.TEMP_END_INDEX)
if (!missing(temp)) {
val airTemperature = Integer.parseInt(temp)
context.write(new Text(year), new IntWritable(airTemperature))
}
}
def missing(temp: String):Boolean = {
return temp eq "+9999"
}
}
object MaxTemperatureMapper {
private val YEAR_BEGIN_INDEX = 15
private val YEAR_END_INDEX = 19
private val TEMP_BEGIN_INDEX = 87
private val TEMP_END_INDEX = 92
}
この処理で、下記の例のようなKeyを年、Valueを気温としたMapが出来るはず。
Key | Value |
---|---|
1901 | -30 |
1901 | 10 |
1901 | 5 |
1902 | 100 |
1902 | -10 |
Reducer
Reducerでは年ごとの最大値を求めます。
引数にはKeyごとのValueが渡されるため、その最大のValueを見つければよしです。
package hadoop
import org.apache.hadoop.mapreduce.Reducer
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Writable
import java.io.IOException
import collection.JavaConversions.iterableAsScalaIterable
class MaxTemperatureReducer extends Reducer[Text, IntWritable, Text, IntWritable] {
type Context = Reducer[Text, IntWritable, Text, IntWritable]#Context
override def reduce(
key: Text,
values: java.lang.Iterable[IntWritable],
context: Context) {
var maxValue = Integer.MIN_VALUE
for (value: IntWritable <- values) {
maxValue = Math.max(maxValue, value.get())
}
context.write(key, new IntWritable(maxValue))
}
}
この処理で、最終的に、下記のような年ごとの気温の最大値を集計した結果が出来上がるはずです。
Key | Value |
---|---|
1901 | 10 |
1902 | 100 |
Driver
次に、ジョブを走らせるためのドライバアプリケーションを作成します。
Mainメソッドを持っていて、集計処理のエントリポイントになっています。
Mapper, Reducer, Combinerをセットしている。
Combinerは作成していないけれど、下記のサイトによると、CombinerとはReducerに渡す前にMapperの結果を中間処理するクラスとのこととのこと。
Hadoop Combiner
package hadoop
import org.apache.hadoop.conf.Configured
import org.apache.hadoop.util.Tool
import org.apache.hadoop.util.ToolRunner
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
class MaxTemperatureDriver extends Configured with Tool {
override def run(args: Array[String]): Int = {
if (args.length != 2) {
println("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName())
ToolRunner.printGenericCommandUsage(System.err)
-1
}
val job = new Job(this.getConf(), "Max temperature")
job.setJarByClass(this.getClass())
FileInputFormat.addInputPath(job, new Path(args(0)))
FileOutputFormat.setOutputPath(job, new Path(args(1)))
job.setMapperClass(classOf[MaxTemperatureMapper]);
job.setCombinerClass(classOf[MaxTemperatureReducer])
job.setReducerClass(classOf[MaxTemperatureReducer])
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[IntWritable])
if (job.waitForCompletion(true)) 0 else 1
}
}
object MaxTemperatureDriver {
def main(args: Array[String]) {
val exitCode = ToolRunner.run(new MaxTemperatureDriver(), args)
System.exit(exitCode)
}
}
実際に実行してみる
上で作成したファイルを実際に実行してみます。
あらかじめ、事前準備としてどこかのテストデータの.gzファイルだけが格納されたディレクトリを作成しておく必要があります。
$ git clone git@github.com:khiraiwa/hadoop-mapreduce-test.git
$ cd hadoop-mapreduce-test
$ gradle jar
$ hadoop jar build/libs/hadoop-mapreduce-test-0.1.jar hadoop.MaxTemperatureDriver [テストデータのディレクトリのパス(絶対パスの必要あり?)] [結果出力ディレクトリ(相対、絶対両方可。存在しないディレクトリ名を指定)] -conf src/main/resources/hadoop-cluster.xml
すると、出力結果のディレクトリに結果ファイルができます。
それを見ると、どうやらうまくいっている様子。。。