9
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Hadoopの多段MapReduceの勉強

Last updated at Posted at 2015-07-27

Hadoop MapReduceアプリの勉強の続き。

前回やったこと

MapperとReducerを使って、米国気候データセンター(NCDC)のデータから各年の最高気温を求めてみました。

今回やったこと

今回は多段MapReduceについて調べてみました。

多段とは、MapReduceの結果に対して、更にMapReduceをかけていくことです。

前回のMapReduceを一回目として、二回目ではその結果をInputとして、そこから3件のみをOutputとしてみました。SQLのlimit 3みたいな感じです。
※この処理自体に意味は全くないのですが練習のためやってみました。

ちなみに、前回と同じくScalaでやってみました。

環境情報

試した環境は下記の通りです。

環境 バージョンなど
OS Mac OS X Yosemite(10.10.4)
Java 1.7.0
Scala 2.11.1
Gradle 2.0
Hadoop 2.7.0

MapReduceを多段で実行する方法

Jobクラスのインスタンスに対して、waitForCompletionメソッドを呼んでMapReduceを実行するので、更に別のJobを用意してwaitForCompletionメソッドを呼んでやればいいです。

例えば、下記のようにcreateMaxTempJob(), createLimit3Job()のようなjobを返すファクトリメソッドを用意してやって順番に実行します。

    val jobs: List[Job] = List(createMaxTempJob(args), createLimit3Job(args))
    for (job: Job <- jobs) {
      if (!job.waitForCompletion(true)) 1
    }

このcreateMaxTempJob(), createLimit3Job()の実装は下記のようにしています。
二回目のMapの入力は一回目のReduceの出力のKey, Valueです。

  /**
   * 一回目のMapReduceを実行
   */
  def createMaxTempJob(args: Array[String]): Job = {
    val job = new Job(this.getConf(), "Max temperature")
    job.setJarByClass(this.getClass())

    // ディレクトリを指定し、Textファイルから入力
    FileInputFormat.addInputPath(job, new Path(args(0)))

    // 結果はKeyにText、ValueにMaxTempWritableという独自クラスを指定して、
    // それを次回のInputが容易なようにシリアライズして出力します。
    val sequenceFileOutputFormat = new SequenceFileOutputFormatHelper()
    sequenceFileOutputFormat.setOutputPath(job, new Path(args(1) + "-max"))
    job.setOutputFormatClass(classOf[SequenceFileOutputFormat[Text, MaxTempWritable]]);

    // Mapperのクラスを指定
    job.setMapperClass(classOf[MaxTemperatureMapper])

    // Mapの処理のKeyとValueの出力クラスを指定
    job.setMapOutputKeyClass(classOf[Text])
    job.setMapOutputValueClass(classOf[IntWritable])

    // Reducerのクラスを指定
    job.setReducerClass(classOf[MaxTemperatureReducer2])

    // Reduceの処理のKeyとValueの出力クラスを指定
    job.setOutputKeyClass(classOf[Text])
    job.setOutputValueClass(classOf[MaxTempWritable])
    job
  }

  /**
   * 二回目のMapReduceを実行
   */
  def createLimit3Job(args: Array[String]): Job = {
    val job = new Job(this.getConf(), "Limit 3 max temperature")
    job.setJarByClass(this.getClass())

    // 1回目でシリアライズされたSequenceFileから入力。
    val sequenceFileInputFormat = new SequenceFileInputFormatHelper()
    sequenceFileInputFormat.setInputPaths(job, new Path(args(1) + "-max"));
    job.setInputFormatClass(classOf[SequenceFileInputFormat[Text,MaxTempWritable]]);

    // ディレクトリを指定し、Text形式で出力
    FileOutputFormat.setOutputPath(job, new Path(args(1) + "-limit3"))

    // Map処理では何も行わないためMapperを指定
    job.setMapperClass(classOf[Mapper[_, _, _, _]]);

    // Mapの処理のKeyとValueの出力クラスを指定
    job.setMapOutputKeyClass(classOf[Text])
    job.setMapOutputValueClass(classOf[MaxTempWritable])

    // Reducerにレコードを3つに絞るクラスを指定
    job.setReducerClass(classOf[Limit3Reducer])

    // Reduceの処理のKeyとValueの出力クラスを指定
    job.setOutputKeyClass(classOf[Text])
    job.setOutputValueClass(classOf[IntWritable])
    job
  }

一回目のReducerは下記のようにしていて、Keyはすべて同じ値、Valueに実際のKeyとValueを格納した独自クラスをcontextに書き出しています。

  override def reduce(
      key: Text,
      values: java.lang.Iterable[IntWritable],
      context: Context) {
    var maxValue = Integer.MIN_VALUE
    for (value: IntWritable <- values) {
      maxValue = Math.max(maxValue, value.get())
    }
    context.write(new Text("key"), new MaxTempWritable(key, new IntWritable(maxValue)))
  }

Keyを同じにすると、二回目のReducerには全てのMaxTempWritableが渡されるため、ここで3件のみcontextに書き出して終了しています。

  override def reduce (
      key: Text,
      values: java.lang.Iterable[MaxTempWritable],
      context: Context) {
    var cnt = 0
    breakable {
      for (w: MaxTempWritable <- values) {
        if (cnt >= 3) break
        context.write(w.key, w.value)
        cnt += 1
      }
    }
  }

以上の内容を実際に手元で実行する場合は下記を実行します。
※入力用テストデータには前回のものを想定しています。

$ git clone git@github.com:khiraiwa/hadoop-mapreduce-test.git
$ cd hadoop-mapreduce-test
$ gradle jar
$ hadoop jar build/libs/hadoop-mapreduce-test-0.1.jar hadoop2.MaxTempLimit3Driver [テストデータのディレクトリの絶対パス] [結果出力ディレクトリの絶対パス] -conf src/main/resources/hadoop-cluster.xml

実行すると、-maxのSuffixがついたディレクトリにシリアライズされた中間ファイルが、-limit3のSuffixがついたディレクトリに最終的なファイルが出力されます。

その他

今回、参考書籍としてHadoop Hacksを用いて#20を見たのですが、Valueのセカンダリソートにjob.setSortComparatorClass()を用いており実行できず。。。

setSortComparatorClassはKeyのソートを行うクラスを指定するメソッドのようで謎です。。。

9
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?