LoginSignup
14
11

More than 5 years have passed since last update.

Amazon EMRでCustomJarを作成して実行する方法

Last updated at Posted at 2014-07-21

今更といえば今更だが、意外と情報が少なかったので、Amazon EMRにて、CustomJarを作成して実行する方法をまとめてみる。
実装する処理は、みんな大好きWord Countです。

手順概要

  1. MapReduceの処理を実装したjarファイルを作る
  2. 作成したJarファイルをAmazon S3へアップロードする
  3. ログファイル出力ディレクトリ、入力ファイルを用意する
  4. EMRクラスターを立ち上げる
  5. ジョブを実行する

Hadoopの処理を実装したjarファイルを作る

Jobの内容を設定するMainメソッド、Mapperクラス、Reducerクラスを実装し、Jarを作成する。

Jarを作るために、今回はMavenを使用する。
ポイントとなるのは、hadoop-core以外で追加で使用するJarがある場合には、「maven-assembly-plugin」などを使って、依存するJarを一つにまとめた、Fat jarを作るということ。
Word Countだけなら、実はこれは必要無いが、一応使ってみる。
下記のような設定をして
「maven package」
などのコマンドを実行すると、プロジェクトのtargetフォルダ配下に、
「emr-customjar-1.0-SNAPSHOT-jar-with-dependencies.jar」
などといった、依存関係を全て含めたJarファイルが出来る。

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>2kb</groupId>
    <artifactId>emr-customjar</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>word count</name>
    <url>http://maven.apache.org</url>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <!-- EMRがサポートしているバージョンはここに記載されていた http://aws.amazon.com/jp/elasticmapreduce/faqs/ -->
            <version>1.0.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <!-- 依存関係をまとめて一つのjarにするためMavenプラグイン -->
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <!-- 依存関係をまとめて一つのjarにする -->
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <!-- packageスコープで実行されるようにする -->
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

あとは、Mainクラス、Mapperクラス、Reducerクラスをそれぞれ作成する。

package emrtest;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCountMain {

    /**
     * Jobを設定して実行する
     * 
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {

        System.out.println("Masterノード start");

        // スレーブノードで実行するJobを設定する
        Job job = new Job();
        job.setJarByClass(WordCountMain.class);
        job.setJobName("wordcount");

        // Reducerへの出力キー、バリューの型を指定する
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // Mapper、Reducerのクラスを指定する
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // もしReducerが必要なければ、このように指定する job.setNumReduceTasks(0);

        // データを読み込み、Mapperへ渡すデータ・フォーマットを指定する
        job.setInputFormatClass(TextInputFormat.class);
        // Reducerからデータを受け取り、出力を行う際のデータ・フォーマットを指定する
        job.setOutputFormatClass(TextOutputFormat.class);

        // 引数取得
        // arg[0] は、CLIから実行した場合はメインコントローラークラス名が設定される場合もあるようだったので注意。
        String inputPath = args[0];
        System.out.println("arg 0 : " + inputPath);
        String outputPath = args[1];
        System.out.println("arg 1 : " + outputPath);

        // 入力ファイル・出力ファイルのパスを設定
        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        // Job実行
        boolean result = job.waitForCompletion(true);
        System.out.println("result : " + result);

        System.out.println("Masterノード end");
    }
}
package emrtest;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Mapper
 * 
 * 継承の際のジェネリクス指定によって、mapメソッドの型を指定出来る
 * Mapper<入力キーの型, 入力値の型, 出力キーの型, 出力値の型>
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /**
     * 初期化処理
     */
    @Override
    public void setup(Context context) throws IOException, InterruptedException {
        System.out.println("Mapper setup");
    }

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 入力値を取り出す(1行データ)
        String line = value.toString();

        // 単語に分解する
        StringTokenizer tokenizer = new StringTokenizer(line);

        IntWritable one = new IntWritable(1);
        Text word = new Text();

        // 単語ごとに繰り返し
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());

            // 1単語ごとにReducerへ値を渡す。(単語, 集計数)。ここでは単純に1単語につき1を渡しているだけだが、Mapper側で一度集計してからReducerに渡してもいい。
            context.write(word, one);
        }
    }

    /**
     * 終了処理
     */
    @Override
    public void cleanup(Context context) throws IOException,InterruptedException {
        System.out.println("Mapper cleanup");
    }
}
package emrtest;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * Reducer
 * 
 * 継承の際のジェネリクス指定によって、reduceメソッドの型を指定出来る
 * Reducer<入力キーの型, 入力値の型, 出力キーの型, 出力値の型>
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        // Mapperから渡された値を集計
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }

        // 書き込み
        context.write(key, new IntWritable(sum));
    }

}

あとは maven packageコマンドなどでJarを作成する。

作成したJarファイルをAmazon S3へアップロードする

作成したJarをS3にアップする。

ログファイル出力ディレクトリ、入力ファイルを用意する

適当なログファイル出力ディレクトリを作成する。
また、入力ファイルを用意する。
入力ファイルはなんでもいいが、公式?の
http://shrub.appspot.com/elasticmapreduce/samples/wordcount/input/
あたりのファイルを適当にダウンロードして、自身のS3フォルダにアップしておく。

ちなみに、入力ファイルは、GZIPなどの形式でもいいらしい。
その場合、自動で判別して処理してくれるとのこと。

EMRクラスターを立ち上げる

コンソール画面や、Amazon CLIなどで、処理を実行するためのクラスターを立ち上げる。
今回は画面から行う。
下記の、MapRecduce実行画面に遷移する。
https://console.aws.amazon.com/elasticmapreduce/home?region=ap-northeast-1

「Create cluster」

Create Cluster画面
「Log folder S3 location」の場所に、作成したログディレクトリを指定する
「Applications to be installed」に、HiveとPigが指定されているが、今回は必要ないので消してもいい
「Master」「Core」などの項目に、使用したいインスタンス、数を指定する

「Create cluster」

Stepを実行する

作成したClusterに、Stepを追加し、実行する。

Cluster Details画面
「Add Step」

「Jar S3 location」に、先ほどアップしたJarを指定する。(s3://{バケット名}/{jarファイルパス})
「Argument」に、メインクラス名、インプットファイルパス、出力先パスを指定する。
例:
emrtest.WordCountMain
s3://{バケット名}/{inputファイル or ディレクトリパス}
s3://{バケット名}/{outputディレクトリパス}
(アウトプットディレクトリは、まだ存在していないパスを指定する必要がある)

「Add」

実行結果確認

あとは実行が終わるまで待つ。
なぜかログは、5分くらい待たないと出てこないので気長に待つ。

実行が終わると、出力先に指定したディレクトリに実行結果となる
part-r-00000
というようなファイルができているはず。

以上です。実行が終わったら、忘れずにクラスターを落としておきましょう。

参考

http://docs.aws.amazon.com/ja_jp/ElasticMapReduce/latest/DeveloperGuide/emr-what-is-emr.html
http://www.akirakoyasu.net/2012/01/16/executing-amazon-elastic-mapreduce-in-30-minutes/
http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/java.html
http://oss.infoscience.co.jp/hadoop/common/docs/current/mapred_tutorial.html#Source+Code
http://www.slideshare.net/youheiyamaguchi/ace-32403313
http://d.hatena.ne.jp/nanasess/20090330/1238422438

14
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
11