11
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Hadoop MapReduceアプリの勉強

Last updated at Posted at 2015-07-20

今更ながらにHadoopを勉強中。
オライリーの象本の5章(MapReduceアプリケーション開発)のmapとreduceのJavaのサンプルを動かしてみました。
そして、ただ動かしても面白く無いので、Scalaを使ってやってみました。

サンプルの内容は、気象データのログをもとにして、年ごとの最高気温を集計するというものです。

環境情報

試した環境は下記の通りです。

環境 バージョンなど
OS Mac OS X Yosemite(10.10.4)
Java 1.7.0
Scala 2.11.1
Gradle 2.0
Hadoop 2.7.0
Macならこれらのインストールはhomebrewを使って
$ brew install java scala gradle hadoop

でOKです。

テストデータの取得と集計の目的

はじめにhadoopを試すためのテストデータを入手する必要があります。
ブラウザで米国気候データセンター(NCDC)にアクセスして、1901-2015までのディレクトリのいずれかの中から、中のgzファイルをダウンロードします。

ftp://ftp.ncdc.noaa.gov/pub/data/noaa/

スクリプトを作っている人もいるので、これを使えば一括ダウンロードできます。
しかし、テストだけなら1ファイルぐらいダウンロードすればOKだと思います。

1ファイルには1行ごとに下記のフォーマットのログが記述されていて、初めの太字が年、次の太字が気温(摂氏の10倍)を示しているとのことです。

009201001099999
**1992**010100004+70933-008667FM-12+0009ENJA V0200101N00931012001CN0400001N9
**-0036**1-00771096881ADDAA106000091AG14000GF108991081081008001999999MD1310051+9999OA149901801REMSYN011333 91135

Mapper

Mapperでは入力データから集計のためのKeyとValueのMapの組み合わせを作っていきます。
プログラムの処理としては引数のvalueで渡されてきた文字列から、MapのKeyとValueを作って、contextにwriteすれば良い様子。
missingメソッドは気温データが入っていなかった時の処理です。

package hadoop

import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import java.io.IOException
import org.apache.hadoop.mrunit.mapreduce.MapDriver

class MaxTemperatureMapper extends Mapper[LongWritable, Text, Text, IntWritable] {
  type Context = Mapper[LongWritable, Text, Text, IntWritable]#Context
  override def map(
      key: LongWritable,
      value: Text,
      context: Context) {
    val line = value.toString()
    val year = line.substring(
        MaxTemperatureMapper.YEAR_BEGIN_INDEX,
        MaxTemperatureMapper.YEAR_END_INDEX)
    val temp = line.substring(
        MaxTemperatureMapper.TEMP_BEGIN_INDEX,
        MaxTemperatureMapper.TEMP_END_INDEX)
    if (!missing(temp)) {
      val airTemperature = Integer.parseInt(temp)
      context.write(new Text(year), new IntWritable(airTemperature))
    }
  }

  def missing(temp: String):Boolean = {
    return temp eq "+9999"
  }
}

object MaxTemperatureMapper {
  private val YEAR_BEGIN_INDEX = 15
  private val YEAR_END_INDEX = 19
  private val TEMP_BEGIN_INDEX = 87
  private val TEMP_END_INDEX = 92
}

この処理で、下記の例のようなKeyを年、Valueを気温としたMapが出来るはず。

Key Value
1901 -30
1901 10
1901 5
1902 100
1902 -10

Reducer

Reducerでは年ごとの最大値を求めます。
引数にはKeyごとのValueが渡されるため、その最大のValueを見つければよしです。

package hadoop

import org.apache.hadoop.mapreduce.Reducer
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Writable
import java.io.IOException
import collection.JavaConversions.iterableAsScalaIterable

class MaxTemperatureReducer extends Reducer[Text, IntWritable, Text, IntWritable] {
  type Context = Reducer[Text, IntWritable, Text, IntWritable]#Context
  override def reduce(
      key: Text,
      values: java.lang.Iterable[IntWritable],
      context: Context) {
      var maxValue = Integer.MIN_VALUE
      for (value: IntWritable <- values) {
        maxValue = Math.max(maxValue, value.get())
      }
      context.write(key, new IntWritable(maxValue))
  }
}

この処理で、最終的に、下記のような年ごとの気温の最大値を集計した結果が出来上がるはずです。

Key Value
1901 10
1902 100

Driver

次に、ジョブを走らせるためのドライバアプリケーションを作成します。
Mainメソッドを持っていて、集計処理のエントリポイントになっています。

Mapper, Reducer, Combinerをセットしている。
Combinerは作成していないけれど、下記のサイトによると、CombinerとはReducerに渡す前にMapperの結果を中間処理するクラスとのこととのこと。
Hadoop Combiner

package hadoop

import org.apache.hadoop.conf.Configured
import org.apache.hadoop.util.Tool
import org.apache.hadoop.util.ToolRunner
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text

class MaxTemperatureDriver extends Configured with Tool {
  override def run(args: Array[String]): Int = {
    if (args.length != 2) {
      println("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName())
      ToolRunner.printGenericCommandUsage(System.err)
      -1
    }
    val job = new Job(this.getConf(), "Max temperature")
    job.setJarByClass(this.getClass())

    FileInputFormat.addInputPath(job, new Path(args(0)))
    FileOutputFormat.setOutputPath(job, new Path(args(1)))

    job.setMapperClass(classOf[MaxTemperatureMapper]);
    job.setCombinerClass(classOf[MaxTemperatureReducer])
    job.setReducerClass(classOf[MaxTemperatureReducer])

    job.setOutputKeyClass(classOf[Text])
    job.setOutputValueClass(classOf[IntWritable])

    if (job.waitForCompletion(true)) 0 else 1
  }
}

object MaxTemperatureDriver {
  def main(args: Array[String]) {
    val exitCode = ToolRunner.run(new MaxTemperatureDriver(), args)
    System.exit(exitCode)
  }
}

実際に実行してみる

上で作成したファイルを実際に実行してみます。
あらかじめ、事前準備としてどこかのテストデータの.gzファイルだけが格納されたディレクトリを作成しておく必要があります。

$ git clone git@github.com:khiraiwa/hadoop-mapreduce-test.git
$ cd hadoop-mapreduce-test
$ gradle jar
$ hadoop jar build/libs/hadoop-mapreduce-test-0.1.jar hadoop.MaxTemperatureDriver [テストデータのディレクトリのパス(絶対パスの必要あり?)] [結果出力ディレクトリ(相対、絶対両方可。存在しないディレクトリ名を指定)] -conf src/main/resources/hadoop-cluster.xml

すると、出力結果のディレクトリに結果ファイルができます。
それを見ると、どうやらうまくいっている様子。。。

11
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?