今更といえば今更だが、意外と情報が少なかったので、Amazon EMRにて、CustomJarを作成して実行する方法をまとめてみる。
実装する処理は、みんな大好きWord Countです。
#手順概要
- MapReduceの処理を実装したjarファイルを作る
- 作成したJarファイルをAmazon S3へアップロードする
- ログファイル出力ディレクトリ、入力ファイルを用意する
- EMRクラスターを立ち上げる
- ジョブを実行する
#Hadoopの処理を実装したjarファイルを作る
Jobの内容を設定するMainメソッド、Mapperクラス、Reducerクラスを実装し、Jarを作成する。
Jarを作るために、今回はMavenを使用する。
ポイントとなるのは、hadoop-core以外で追加で使用するJarがある場合には、「maven-assembly-plugin」などを使って、依存するJarを一つにまとめた、Fat jarを作るということ。
Word Countだけなら、実はこれは必要無いが、一応使ってみる。
下記のような設定をして
「maven package」
などのコマンドを実行すると、プロジェクトのtargetフォルダ配下に、
「emr-customjar-1.0-SNAPSHOT-jar-with-dependencies.jar」
などといった、依存関係を全て含めたJarファイルが出来る。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>2kb</groupId>
<artifactId>emr-customjar</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>word count</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<!-- EMRがサポートしているバージョンはここに記載されていた http://aws.amazon.com/jp/elasticmapreduce/faqs/ -->
<version>1.0.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<!-- 依存関係をまとめて一つのjarにするためMavenプラグイン -->
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<!-- 依存関係をまとめて一つのjarにする -->
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<!-- packageスコープで実行されるようにする -->
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
あとは、Mainクラス、Mapperクラス、Reducerクラスをそれぞれ作成する。
package emrtest;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCountMain {
/**
* Jobを設定して実行する
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
System.out.println("Masterノード start");
// スレーブノードで実行するJobを設定する
Job job = new Job();
job.setJarByClass(WordCountMain.class);
job.setJobName("wordcount");
// Reducerへの出力キー、バリューの型を指定する
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// Mapper、Reducerのクラスを指定する
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// もしReducerが必要なければ、このように指定する job.setNumReduceTasks(0);
// データを読み込み、Mapperへ渡すデータ・フォーマットを指定する
job.setInputFormatClass(TextInputFormat.class);
// Reducerからデータを受け取り、出力を行う際のデータ・フォーマットを指定する
job.setOutputFormatClass(TextOutputFormat.class);
// 引数取得
// arg[0] は、CLIから実行した場合はメインコントローラークラス名が設定される場合もあるようだったので注意。
String inputPath = args[0];
System.out.println("arg 0 : " + inputPath);
String outputPath = args[1];
System.out.println("arg 1 : " + outputPath);
// 入力ファイル・出力ファイルのパスを設定
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
// Job実行
boolean result = job.waitForCompletion(true);
System.out.println("result : " + result);
System.out.println("Masterノード end");
}
}
package emrtest;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* Mapper
*
* 継承の際のジェネリクス指定によって、mapメソッドの型を指定出来る
* Mapper<入力キーの型, 入力値の型, 出力キーの型, 出力値の型>
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* 初期化処理
*/
@Override
public void setup(Context context) throws IOException, InterruptedException {
System.out.println("Mapper setup");
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 入力値を取り出す(1行データ)
String line = value.toString();
// 単語に分解する
StringTokenizer tokenizer = new StringTokenizer(line);
IntWritable one = new IntWritable(1);
Text word = new Text();
// 単語ごとに繰り返し
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
// 1単語ごとにReducerへ値を渡す。(単語, 集計数)。ここでは単純に1単語につき1を渡しているだけだが、Mapper側で一度集計してからReducerに渡してもいい。
context.write(word, one);
}
}
/**
* 終了処理
*/
@Override
public void cleanup(Context context) throws IOException,InterruptedException {
System.out.println("Mapper cleanup");
}
}
package emrtest;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* Reducer
*
* 継承の際のジェネリクス指定によって、reduceメソッドの型を指定出来る
* Reducer<入力キーの型, 入力値の型, 出力キーの型, 出力値の型>
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// Mapperから渡された値を集計
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
// 書き込み
context.write(key, new IntWritable(sum));
}
}
あとは maven packageコマンドなどでJarを作成する。
作成したJarファイルをAmazon S3へアップロードする
作成したJarをS3にアップする。
#ログファイル出力ディレクトリ、入力ファイルを用意する
適当なログファイル出力ディレクトリを作成する。
また、入力ファイルを用意する。
入力ファイルはなんでもいいが、公式?の
http://shrub.appspot.com/elasticmapreduce/samples/wordcount/input/
あたりのファイルを適当にダウンロードして、自身のS3フォルダにアップしておく。
ちなみに、入力ファイルは、GZIPなどの形式でもいいらしい。
その場合、自動で判別して処理してくれるとのこと。
#EMRクラスターを立ち上げる
コンソール画面や、Amazon CLIなどで、処理を実行するためのクラスターを立ち上げる。
今回は画面から行う。
下記の、MapRecduce実行画面に遷移する。
https://console.aws.amazon.com/elasticmapreduce/home?region=ap-northeast-1
↓
「Create cluster」
↓
Create Cluster画面
「Log folder S3 location」の場所に、作成したログディレクトリを指定する
「Applications to be installed」に、HiveとPigが指定されているが、今回は必要ないので消してもいい
「Master」「Core」などの項目に、使用したいインスタンス、数を指定する
↓
「Create cluster」
#Stepを実行する
作成したClusterに、Stepを追加し、実行する。
Cluster Details画面
「Add Step」
↓
「Jar S3 location」に、先ほどアップしたJarを指定する。(s3://{バケット名}/{jarファイルパス})
「Argument」に、メインクラス名、インプットファイルパス、出力先パスを指定する。
例:
emrtest.WordCountMain
s3://{バケット名}/{inputファイル or ディレクトリパス}
s3://{バケット名}/{outputディレクトリパス}
(アウトプットディレクトリは、まだ存在していないパスを指定する必要がある)
↓
「Add」
#実行結果確認
あとは実行が終わるまで待つ。
なぜかログは、5分くらい待たないと出てこないので気長に待つ。
実行が終わると、出力先に指定したディレクトリに実行結果となる
part-r-00000
というようなファイルができているはず。
以上です。実行が終わったら、忘れずにクラスターを落としておきましょう。
#参考
http://docs.aws.amazon.com/ja_jp/ElasticMapReduce/latest/DeveloperGuide/emr-what-is-emr.html
http://www.akirakoyasu.net/2012/01/16/executing-amazon-elastic-mapreduce-in-30-minutes/
http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/java.html
http://oss.infoscience.co.jp/hadoop/common/docs/current/mapred_tutorial.html#Source+Code
http://www.slideshare.net/youheiyamaguchi/ace-32403313
http://d.hatena.ne.jp/nanasess/20090330/1238422438