11
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Apache Sparkによる大規模データの分散処理による機械学習(回帰分析) by Amazon EMR

Last updated at Posted at 2017-02-14

EMRでApache Sparkを使用するに当たって、必要なデータを処理するためのコードと入力データを用意するだけで、面倒な環境構築を行わず、わずかばかりの設定を加えるだけですぐに使用することができます。

ここでは、最初にScalaによるEMR上でのSpark処理、次に他のサービスのトリガーをきっかけに動くJavaによるLambdaでEMRを呼び出す手順を注意点も含めて詳細なメモを書きます。

Scalaによる回帰分析を行う実行ファイルの作成

環境構築

scalaをインストール。

$ cd /tmp
$ curl -O http://downloads.typesafe.com/scala/2.11.8/scala-2.11.8.tgz
$ tar xzf scala-2.11.8.tgz
$ mkdir -p /usr/local/src
$ mv scala-2.11.8 /usr/local/src/scala

$ export PATH=$PATH:/usr/local/src/scala/bin
$ export SCALA_HOME=/usr/local/src/scala

$ which scala

Scalaのパッケージマネージャであるsbtをインストール。

$ brew install sbt
$ which sbt

プロジェクトの作成

プロジェクトを置くディレクトリを作成。

$ mkdir SparkExampleApp

ビルドの設定ファイルの作成。

$ vim build.sbt

以下のように記述する。

name := "Spark Sample Project"
version := "1.0"
scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.0.0",
  "org.apache.spark" %% "spark-mllib" % "2.0.0",
  "org.apache.spark" %% "spark-sql" % "2.0.0",
  "com.databricks" %% "spark-csv" % "1.4.0"
)

実行ファイルの作成。

$ mkdir -p src/main/scala
$ vim src/main/scala/SparkExampleApp.scala

SparkExampleApp.scalaは以下のものを使用。

import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature._
import org.apache.spark.ml.regression.LinearRegression

object SparkExampleApp {

  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("Spark Sample App")
      .getOrCreate()

    import spark.implicits._

    spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "AKIA****************")
    spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "****************************************")

    val filePath = args(0)
    val df = spark.sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(filePath)

    val assembler = new VectorAssembler()
      .setInputCols(Array("x"))
      .setOutputCol("features")

    val polynomialExpansion = new PolynomialExpansion()
      .setInputCol(assembler.getOutputCol)
      .setOutputCol("polyFeatures")
      .setDegree(4)

    val linearRegression = new LinearRegression()
      .setLabelCol("y")
      .setFeaturesCol(polynomialExpansion.getOutputCol)
      .setMaxIter(100)
      .setRegParam(0.0)

    val pipeline = new Pipeline()
      .setStages(Array(assembler, polynomialExpansion, linearRegression))


    val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3))
    val model = pipeline.fit(trainingData)

    val outputFilePath = args(1)
    model.transform(testData)
      .select("x", "prediction")
      .write
      .format("com.databricks.spark.csv")
      .option("header", "false")
      .save(outputFilePath)

  }
}

jarファイルの作成のために以下のコマンドを実行。

$ sbt package

生成されたtarget/scala-2.11/spark-sample-project_2.11-1.0.jarをS3のsample-bucket/sample-emr/srcにアップロードしておく。

Apache Sparkの設定

http://spark.apache.org/downloads.html
上のURL上で以下のように入力してApache Sparkのデータをダウンロードしてくる。

  1. Choose a Spark release: 2.0.1(Oct 03 2016)
  2. Choose a package type: Pre-build for Hadoop 2.7 and later
  3. Choose a download type: Direct download
  4. Download Spark: spark-2.0.1-bin-hadoop2.7.tgz

ダウンロードしたファイルに移動する。

$ cd ~/Downloads/spark-2.0.1-bin-hadoop2.7

conf以下にspark-defaults.confというファイルを以下の内容で作成する。

$ vim conf/spark-defaults.conf
spark-defaults.conf
spark.jars.packages  com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1

実験用データの作成

$ ruby -e 'puts "x,y";Range.new(0,5).step(0.05).each {|i| puts "#{i},#{Math.sin(i)+Random.rand(-0.3..0.3)}"}' > data.csv

sample.jpg

グラフの表示

$ brew install gnuplot
$ gnuplot
gnuplot> set datafile separator ","
gnuplot> set terminal jpeg
gnuplot> set output "sample.jpg"
gnuplot> plot "data.csv" every ::1
$ qlmanage -p sample.jpg

実行

S3にバケットを作成しておき、以下のようなバケットポリシーを作成しておく。

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Sid": "GetObject",
			"Effect": "Allow",
			"Principal": "*",
			"Action": "s3:GetObject",
			"Resource": "arn:aws:s3:::sample-bucket/*"
		},
		{
			"Sid": "ListBucket",
			"Effect": "Allow",
			"Principal": "*",
			"Action": "s3:ListBucket",
			"Resource": "arn:aws:s3:::sample-bucket"
		}
	]
}

Sparkの実行

$ ./bin/spark-submit --class SparkExampleApp --master local ~/workspace/SparkExampleApp/target/scala-2.11/spark-sample-project_2.11-1.0.jar s3n://sample-bucket/sample-emr/data.csv s3n://sample-bucket/sample-emr/output.csv

実行後の結果は以下のようになる。

sample.jpg

EMRの実行

1.マネジメントコンソールからEMRを選択。
2.[クラスターを作成]を選択して以下のように記述する。

クラスター名: SampleCluster
ログ記録: チェック
S3フォルダー: s3://sample-bucket/sample-emr/logs
起動モード: ステップ実行

ステップタイプ: Spark アプリケーション

[設定]を選択し、ステップを追加の設定を行う。

3.ステップを追加で以下のように入力して、[追加]を選択。

名前: SparkApplication
デプロイモード: クラスター
Spark-submitオプション: --class SparkExampleApp
アプリケーションの場所: s3://sample-bucket/sample-emr/src/spark-sample-project_2.11-1.0.jar
引数: s3n://sample-bucket/sample-emr/data.csv s3n://sample-bucket/sample-emr/output/output.csv
失敗時の操作: クラスターを終了。

ベンダー: Amazon
リリース: emr-5.0.3
アプリケーション: Hadoop 2.7.3, Spark 2.0.1

インスタンスタイプ: m3.xlarge
インスタンス数: 2

アクセス権限: デフォルト
EMRロール: EMR_DefaultRole
EC2 インスタンスプロファイル: EMR_EC2_DefaultRole

4.[クラスターを作成]を選択。

5.デプロイモードはクライアントとクラスターのうちのクラスターを選択。
6.失敗時の操作は、次へ、キャンセルして待機、クラスターを終了からクラスターを終了を選択。

7.起動モードはクラスター、ステップ実行のうちのステップ実行を選択。
8.ステップタイプはストリーミングプログラミング、Hiveプログラム、Pigプログラム、Spark アプリケーション、カスタムJARからSpark アプリケーションを選択。

しばらくすると、EMRのクラスターが終了し、出力先にファイルが生成されていることが確認できる。

JavaによるLambdaからのEMR呼び出し

Eclipseでプロジェクト作成

Apache Mavenを利用してプロジェクト管理する場合。

1.File > New > Other...からMaven > Maven Project
デフォルト設定のまま進めるので、3回[Next]を選択。

Group Id: com.sample.lambda
Artifact Id: com.sapmle.lambda
Version: 0.0.1-SNAPSHOT
Package: com.sample.lambda

[Finish]を選択。

2.pom.xmlを選択してOverviewタブ中の以下を編集。
Name: lambda-java

3.Dependenciesタブを選択。
[Add...]を選択し、以下のように記述して、aws-lambda-java-core: 1.0.0aws-lambda-java-eventsaws-java-sdk-coreaws-java-sdk-emrを追加する。

Group Id: com.amazonaws
Artifact Id: aws-lambda-java-core
Version: 1.0.0
Group Id: com.amazonaws
Artifact Id: aws-lambda-java-events
Version: 1.0.0
Group Id: com.amazonaws
Artifact Id: aws-java-sdk-core
Version: 1.11.49
Group Id: com.amazonaws
Artifact Id: aws-java-sdk-emr
Version: 1.11.49

次に重要なのが

pom.xmlを右クリックして、Maven > Add Plugin を選択し、以下のように入力する。

Group Id: org.apache.maven.plugins
Artifact Id: maven-shade-plugin
バージョン: 2.3

プロジェクトをビルドするために、Package Exploreからcom.sample.lambdaを右クリックして、Run As > Maven Build を実行する。

作成したpom.xmlは以下のようになる。

pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.sample.lambda</groupId>
  <artifactId>com.sample.lambda</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
    	<groupId>com.amazonaws</groupId>
    	<artifactId>aws-lambda-java-core</artifactId>
    	<version>1.0.0</version>
    </dependency>
    <dependency>
    	<groupId>com.amazonaws</groupId>
    	<artifactId>aws-lambda-java-events</artifactId>
    	<version>1.0.0</version>
    </dependency>
    <dependency>
    	<groupId>com.amazonaws</groupId>
    	<artifactId>aws-java-sdk-emr</artifactId>
    	<version>1.11.49</version>
    </dependency>
    <dependency>
    	<groupId>com.amazonaws</groupId>
    	<artifactId>aws-java-sdk-core</artifactId>
    	<version>1.11.49</version>
    </dependency>
  </dependencies>
  <name>lambda-java</name>
  <build>
  	<plugins>
  		<plugin>
  			<groupId>org.apache.maven.plugins</groupId>
  			<artifactId>maven-shade-plugin</artifactId>
  			<version>2.3</version>
  		</plugin>
  	</plugins>
  </build>
</project>

4.src/main/java中のcom.sample.lambdaのSampleCreateEMRClusterHandler.javaを以下のように作成する。

SampleCreateEMRClusterHandler.java
package com.sample.lambda;

import java.util.ArrayList;
import java.util.List;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;

import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure;
import com.amazonaws.services.elasticmapreduce.model.Application;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.InstanceGroupConfig;
import com.amazonaws.services.elasticmapreduce.model.InstanceRoleType;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

public class SampleCreateEMRClusterHandler implements RequestHandler<Object, String> {

    public String handleRequest(Object event, Context context) {
        AWSCredentialsProvider cp = new EnvironmentVariableCredentialsProvider();
        AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(cp);
        emr.setRegion(Region.getRegion(Regions.US_EAST_1));

        JobFlowInstancesConfig instanceConfig = new JobFlowInstancesConfig()
                .withKeepJobFlowAliveWhenNoSteps(false)
                .withInstanceGroups(buildInstanceGroupConfigs());

        RunJobFlowRequest request = new RunJobFlowRequest()
                .withName("SampleCluster")
                .withReleaseLabel("emr-5.0.3")
                .withLogUri("s3://sample-bucket/sample-emr/logs/")
                .withServiceRole("EMR_DefaultRole")
                .withJobFlowRole("EMR_EC2_DefaultRole")
                .withVisibleToAllUsers(true)
                .withInstances(instanceConfig)
                .withApplications(buildApplications())
                .withSteps(buildStepConfigs());

        RunJobFlowResult result = emr.runJobFlow(request);
        return "Process complete.";
    }

    private List<Application> buildApplications() {
        List<Application> apps = new ArrayList<Application>();
        apps.add(new Application().withName("Hadoop"));
        apps.add(new Application().withName("Spark"));
        return apps;
    }

    private List<InstanceGroupConfig> buildInstanceGroupConfigs() {
        List<InstanceGroupConfig> result = new ArrayList<InstanceGroupConfig>();
        InstanceGroupConfig masterInstance = new InstanceGroupConfig()
                .withName("MasterNode")
                .withInstanceRole(InstanceRoleType.MASTER)
                .withInstanceCount(1)
                .withInstanceType("m3.xlarge");
        result.add(masterInstance);

        InstanceGroupConfig coreInsetance = new InstanceGroupConfig()
                .withName("CoreNode")
                .withInstanceRole(InstanceRoleType.CORE)
                .withInstanceCount(1)
                .withInstanceType("m3.xlarge");
        result.add(coreInsetance);

        return result;
    }

    private List<StepConfig> buildStepConfigs() {
        List<StepConfig> result = new ArrayList<StepConfig>();
        
        final String[] args = {
                "spark-submit",
                "--deploy-mode", "cluster",
                "--class", "SparkExampleApp",
                "s3://sample-bucket/sample-emr/src/spark-sample-project_2.11-1.0.jar", 
                "s3n://sample-bucket/sample-emr/data.csv",
                "s3n://sample-bucket/sample-emr/output/output.csv"
        };

        final StepConfig sparkStep = new StepConfig()
                .withName("SparkProcess")
                .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
                .withHadoopJarStep(new HadoopJarStepConfig()
                        .withJar("command-runner.jar")
                        .withArgs(args));
        result.add(sparkStep);
        
        return result;
    }

}

以上実行コードの完成。プロジェクトを再度ビルドする。

Package Explorerのcom.sample.lambdaを右クリックし、Run AsMaven build...を選択する。
以下のように入力して、[Run]を選択する。
Goals: package shade:shade

(追記) TerminalからのMavenによるビルド方法

Mavenをインストールしていない場合は以下の手順でインストールする。

Mavenを以下のURLからダウンロードしてくる。
http://maven.apache.org/download.cgi

mv apache-maven-3.3.9 /usr/local/

.zshrcに以下を追加

export M3_HOME=/usr/local/apache-maven-3.3.9
M3=$M3_HOME/bin
export PATH=$M3:$PATH
$ . ~/.zshrc

プロジェクト直下にterminalで移動し、以下を実行する。

bashを使用している場合は、~/.zshrc~/.bashrcに置き換えて考える。

$ mvn package

target以下にcom.sapmle.lambda-0.0.1-SNAPSHOT.jarが生成される。こちらがLambdaにアップロードするものとなる。

IAMロールの作成

マネジメントコンソールからIdentity and Access Management
[ロール]から[新しいロールの作成]を選択。

手順 1: ロール名の設定

以下のように入力して[次のステップ]を選択。
ロール名: sampleRole

手順 2: ロールタイプの選択

AWS Lambdaを選択。

手順 3: 信頼性の確立

信頼関係はLambdaに設定されて飛ばされる。

手順 4: ポリシーのアタッチ

AmazonElasticMapReduceRoleを選択して、[次のステップ]を選択。

手順 5: 確認

内容を確認したら、[ロールの作成]を選択。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1492490223000",
            "Effect": "Allow",
            "Action": [
                "elasticmapreduce:*",
                "iam:*"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

Lambdaの作成

マネジメントコンソールからLamdaを選択。
[Create a Lambda function]を選択。

Select blueprint

Blank functionを選択。

Configure triggers

ここでは何も設定しないが、必要に応じてLambdaを呼び出す元となるサービスを指定する。
[Next]を選択。

Configure function

Name: SampleLambda
Runtime*: Java8

Code entry type: Upload a .ZIP or JAR file
Function package: [先ほど作成したJARファイルを指定]

Handler*: com.sample.lambda.SampleCreateEMRClusterHandler::handleRequest
Role*: Choose an existing role
Existing role: sampleRole

[Next]を選択。

Review

最後に設定内容を確認して、実際にTestを実行してみてエラーが出ないか確認してみる。

SNSによるLambdaのトリガー処理

SNSから先程作成したLambdaをトリガーするTopicを生成する。
ここでは、Node.jsから呼び出す例を取り上げます。

node.js
var AWS = require('aws-sdk');
AWS.config.update({
        accessKeyId: 'AKIAXXXXXXXXXXXXXXXX',
        secretAccessKey: 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
        region: 'us-east-1'
});

publishSNSTopic();

const publishSNSTopic = () => {
    var sns = new AWS.SNS({
        apiVersion: "2010-03-31",
        region: "us-east-1"
    });

    sns.publish({
        Message: 'Start Job by triggering lambda for EMR',
        Subject: "Trigger Lambda for EMR",
        TopicArn: "arn:aws:sns:us-east-1:xxxxxxxxxxxx:<トピック名>"
    }, function(err, data) {
        if (err) console.log("Failure to publish topic by SNS");
    });
}
$ node index.js

注意点

  • URI schemeはs3ではなくs3nを指定。
    s3: S3ブロックファイルシステム
    s3n: S3ネイティブファイルシステム

以前はs3nを使っていたが、今はs3を推奨。
サービス提供者のドキュメントを読んでそれに合わせることが大事。

  • EMRでログ記録はチェックするようにする
    エラーが出た時のデバッグの根拠になるから。

  • EMR: 出力先にファイルがあるとエラーになる。(output.csv/)

  • com/amazonaws/auth/AWSCredentialsProvider: class java.lang.NoClassDefFoundError

maven-shade-pluginを忘れずに追加することが重要。
- https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/java-create-jar-pkg-maven-and-eclipse.html
- https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/java-create-jar-pkg-maven-no-ide.html

  • java.lang.ClassNotFoundException: com.amazonaws.ClientConfigurationFactory

aws-java-sdk-coreaws-java-sdk-emrのバージョンを1.11.49に統一

  • spark-submitコマンド実行時の注意点
$ spark-submit --deploy-mode cluster --class SparkExampleApp s3n://sample-bucket/sample-emr/src/spark-sample-project_2.11-1.0.jar s3n://sample-bucket/sample-emr/data.csv s3n://sample-bucket/sample-emr/output/output.csv

じゃなくて

$ spark-submit --deploy-mode cluster --class SparkExampleApp s3://sample-bucket/sample-emr/src/spark-sample-project_2.11-1.0.jar s3n://sample-bucket/sample-emr/data.csv s3n://sample-bucket/sample-emr/output/output.csv

原因は、EMRでS3を呼び出すときに、昔はs3nを使っており、今はs3を推奨しており、ScalaのAWSをライブラリはs3nで認識するのに対して、EMRはs3じゃないと受け付けないことが原因。

参考

Scalaをsbtでビルド
http://qiita.com/kanuma1984/items/6f599c815cc8f9232228

EMRでSpark
http://www.atmarkit.co.jp/ait/articles/1609/27/news018.html

JavaによるLambda作成
http://docs.aws.amazon.com/ja_jp/ElasticMapReduce/latest/DeveloperGuide/calling-emr-with-java-sdk.html

Lambda+Java
http://qiita.com/Keisuke69/items/fc39a2f464d14480a432

Lambda+EMR
http://qiita.com/yskazuma/items/b67b1f3f8c39a7a19051
http://qiita.com/RyujiKawazoe/items/534f4b069ebea2f7d26c

11
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?