前回やったこと
MapperとReducerを使って、米国気候データセンター(NCDC)のデータから各年の最高気温を求めてみました。
今回やったこと
今回は多段MapReduceについて調べてみました。
多段とは、MapReduceの結果に対して、更にMapReduceをかけていくことです。
前回のMapReduceを一回目として、二回目ではその結果をInputとして、そこから3件のみをOutputとしてみました。SQLのlimit 3みたいな感じです。
※この処理自体に意味は全くないのですが練習のためやってみました。
ちなみに、前回と同じくScalaでやってみました。
環境情報
試した環境は下記の通りです。
環境 | バージョンなど |
---|---|
OS | Mac OS X Yosemite(10.10.4) |
Java | 1.7.0 |
Scala | 2.11.1 |
Gradle | 2.0 |
Hadoop | 2.7.0 |
MapReduceを多段で実行する方法
Jobクラスのインスタンスに対して、waitForCompletionメソッドを呼んでMapReduceを実行するので、更に別のJobを用意してwaitForCompletionメソッドを呼んでやればいいです。
例えば、下記のようにcreateMaxTempJob(), createLimit3Job()のようなjobを返すファクトリメソッドを用意してやって順番に実行します。
val jobs: List[Job] = List(createMaxTempJob(args), createLimit3Job(args))
for (job: Job <- jobs) {
if (!job.waitForCompletion(true)) 1
}
このcreateMaxTempJob(), createLimit3Job()の実装は下記のようにしています。
二回目のMapの入力は一回目のReduceの出力のKey, Valueです。
/**
* 一回目のMapReduceを実行
*/
def createMaxTempJob(args: Array[String]): Job = {
val job = new Job(this.getConf(), "Max temperature")
job.setJarByClass(this.getClass())
// ディレクトリを指定し、Textファイルから入力
FileInputFormat.addInputPath(job, new Path(args(0)))
// 結果はKeyにText、ValueにMaxTempWritableという独自クラスを指定して、
// それを次回のInputが容易なようにシリアライズして出力します。
val sequenceFileOutputFormat = new SequenceFileOutputFormatHelper()
sequenceFileOutputFormat.setOutputPath(job, new Path(args(1) + "-max"))
job.setOutputFormatClass(classOf[SequenceFileOutputFormat[Text, MaxTempWritable]]);
// Mapperのクラスを指定
job.setMapperClass(classOf[MaxTemperatureMapper])
// Mapの処理のKeyとValueの出力クラスを指定
job.setMapOutputKeyClass(classOf[Text])
job.setMapOutputValueClass(classOf[IntWritable])
// Reducerのクラスを指定
job.setReducerClass(classOf[MaxTemperatureReducer2])
// Reduceの処理のKeyとValueの出力クラスを指定
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[MaxTempWritable])
job
}
/**
* 二回目のMapReduceを実行
*/
def createLimit3Job(args: Array[String]): Job = {
val job = new Job(this.getConf(), "Limit 3 max temperature")
job.setJarByClass(this.getClass())
// 1回目でシリアライズされたSequenceFileから入力。
val sequenceFileInputFormat = new SequenceFileInputFormatHelper()
sequenceFileInputFormat.setInputPaths(job, new Path(args(1) + "-max"));
job.setInputFormatClass(classOf[SequenceFileInputFormat[Text,MaxTempWritable]]);
// ディレクトリを指定し、Text形式で出力
FileOutputFormat.setOutputPath(job, new Path(args(1) + "-limit3"))
// Map処理では何も行わないためMapperを指定
job.setMapperClass(classOf[Mapper[_, _, _, _]]);
// Mapの処理のKeyとValueの出力クラスを指定
job.setMapOutputKeyClass(classOf[Text])
job.setMapOutputValueClass(classOf[MaxTempWritable])
// Reducerにレコードを3つに絞るクラスを指定
job.setReducerClass(classOf[Limit3Reducer])
// Reduceの処理のKeyとValueの出力クラスを指定
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[IntWritable])
job
}
一回目のReducerは下記のようにしていて、Keyはすべて同じ値、Valueに実際のKeyとValueを格納した独自クラスをcontextに書き出しています。
override def reduce(
key: Text,
values: java.lang.Iterable[IntWritable],
context: Context) {
var maxValue = Integer.MIN_VALUE
for (value: IntWritable <- values) {
maxValue = Math.max(maxValue, value.get())
}
context.write(new Text("key"), new MaxTempWritable(key, new IntWritable(maxValue)))
}
Keyを同じにすると、二回目のReducerには全てのMaxTempWritableが渡されるため、ここで3件のみcontextに書き出して終了しています。
override def reduce (
key: Text,
values: java.lang.Iterable[MaxTempWritable],
context: Context) {
var cnt = 0
breakable {
for (w: MaxTempWritable <- values) {
if (cnt >= 3) break
context.write(w.key, w.value)
cnt += 1
}
}
}
以上の内容を実際に手元で実行する場合は下記を実行します。
※入力用テストデータには前回のものを想定しています。
$ git clone git@github.com:khiraiwa/hadoop-mapreduce-test.git
$ cd hadoop-mapreduce-test
$ gradle jar
$ hadoop jar build/libs/hadoop-mapreduce-test-0.1.jar hadoop2.MaxTempLimit3Driver [テストデータのディレクトリの絶対パス] [結果出力ディレクトリの絶対パス] -conf src/main/resources/hadoop-cluster.xml
実行すると、-maxのSuffixがついたディレクトリにシリアライズされた中間ファイルが、-limit3のSuffixがついたディレクトリに最終的なファイルが出力されます。
その他
今回、参考書籍としてHadoop Hacksを用いて#20を見たのですが、Valueのセカンダリソートにjob.setSortComparatorClass()を用いており実行できず。。。
setSortComparatorClassはKeyのソートを行うクラスを指定するメソッドのようで謎です。。。