EMRでApache Sparkを使用するに当たって、必要なデータを処理するためのコードと入力データを用意するだけで、面倒な環境構築を行わず、わずかばかりの設定を加えるだけですぐに使用することができます。
ここでは、最初にScalaによるEMR上でのSpark処理、次に他のサービスのトリガーをきっかけに動くJavaによるLambdaでEMRを呼び出す手順を注意点も含めて詳細なメモを書きます。
Scalaによる回帰分析を行う実行ファイルの作成
環境構築
scalaをインストール。
$ cd /tmp
$ curl -O http://downloads.typesafe.com/scala/2.11.8/scala-2.11.8.tgz
$ tar xzf scala-2.11.8.tgz
$ mkdir -p /usr/local/src
$ mv scala-2.11.8 /usr/local/src/scala
$ export PATH=$PATH:/usr/local/src/scala/bin
$ export SCALA_HOME=/usr/local/src/scala
$ which scala
Scalaのパッケージマネージャであるsbtをインストール。
$ brew install sbt
$ which sbt
プロジェクトの作成
プロジェクトを置くディレクトリを作成。
$ mkdir SparkExampleApp
ビルドの設定ファイルの作成。
$ vim build.sbt
以下のように記述する。
name := "Spark Sample Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.0",
"org.apache.spark" %% "spark-mllib" % "2.0.0",
"org.apache.spark" %% "spark-sql" % "2.0.0",
"com.databricks" %% "spark-csv" % "1.4.0"
)
実行ファイルの作成。
$ mkdir -p src/main/scala
$ vim src/main/scala/SparkExampleApp.scala
SparkExampleApp.scalaは以下のものを使用。
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature._
import org.apache.spark.ml.regression.LinearRegression
object SparkExampleApp {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Spark Sample App")
.getOrCreate()
import spark.implicits._
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "AKIA****************")
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "****************************************")
val filePath = args(0)
val df = spark.sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load(filePath)
val assembler = new VectorAssembler()
.setInputCols(Array("x"))
.setOutputCol("features")
val polynomialExpansion = new PolynomialExpansion()
.setInputCol(assembler.getOutputCol)
.setOutputCol("polyFeatures")
.setDegree(4)
val linearRegression = new LinearRegression()
.setLabelCol("y")
.setFeaturesCol(polynomialExpansion.getOutputCol)
.setMaxIter(100)
.setRegParam(0.0)
val pipeline = new Pipeline()
.setStages(Array(assembler, polynomialExpansion, linearRegression))
val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3))
val model = pipeline.fit(trainingData)
val outputFilePath = args(1)
model.transform(testData)
.select("x", "prediction")
.write
.format("com.databricks.spark.csv")
.option("header", "false")
.save(outputFilePath)
}
}
jarファイルの作成のために以下のコマンドを実行。
$ sbt package
生成されたtarget/scala-2.11/spark-sample-project_2.11-1.0.jar
をS3のsample-bucket/sample-emr/src
にアップロードしておく。
Apache Sparkの設定
http://spark.apache.org/downloads.html
上のURL上で以下のように入力してApache Sparkのデータをダウンロードしてくる。
- Choose a Spark release: 2.0.1(Oct 03 2016)
- Choose a package type: Pre-build for Hadoop 2.7 and later
- Choose a download type: Direct download
- Download Spark: spark-2.0.1-bin-hadoop2.7.tgz
ダウンロードしたファイルに移動する。
$ cd ~/Downloads/spark-2.0.1-bin-hadoop2.7
conf以下にspark-defaults.confというファイルを以下の内容で作成する。
$ vim conf/spark-defaults.conf
spark.jars.packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.1
実験用データの作成
$ ruby -e 'puts "x,y";Range.new(0,5).step(0.05).each {|i| puts "#{i},#{Math.sin(i)+Random.rand(-0.3..0.3)}"}' > data.csv
グラフの表示
$ brew install gnuplot
$ gnuplot
gnuplot> set datafile separator ","
gnuplot> set terminal jpeg
gnuplot> set output "sample.jpg"
gnuplot> plot "data.csv" every ::1
$ qlmanage -p sample.jpg
実行
S3にバケットを作成しておき、以下のようなバケットポリシーを作成しておく。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "GetObject",
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::sample-bucket/*"
},
{
"Sid": "ListBucket",
"Effect": "Allow",
"Principal": "*",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::sample-bucket"
}
]
}
Sparkの実行
$ ./bin/spark-submit --class SparkExampleApp --master local ~/workspace/SparkExampleApp/target/scala-2.11/spark-sample-project_2.11-1.0.jar s3n://sample-bucket/sample-emr/data.csv s3n://sample-bucket/sample-emr/output.csv
実行後の結果は以下のようになる。
EMRの実行
1.マネジメントコンソールからEMRを選択。
2.[クラスターを作成]を選択して以下のように記述する。
クラスター名: SampleCluster
ログ記録: チェック
S3フォルダー: s3://sample-bucket/sample-emr/logs
起動モード: ステップ実行
ステップタイプ: Spark アプリケーション
[設定]を選択し、ステップを追加の設定を行う。
3.ステップを追加で以下のように入力して、[追加]を選択。
名前: SparkApplication
デプロイモード: クラスター
Spark-submitオプション: --class SparkExampleApp
アプリケーションの場所: s3://sample-bucket/sample-emr/src/spark-sample-project_2.11-1.0.jar
引数: s3n://sample-bucket/sample-emr/data.csv s3n://sample-bucket/sample-emr/output/output.csv
失敗時の操作: クラスターを終了。
ベンダー: Amazon
リリース: emr-5.0.3
アプリケーション: Hadoop 2.7.3, Spark 2.0.1
インスタンスタイプ: m3.xlarge
インスタンス数: 2
アクセス権限: デフォルト
EMRロール: EMR_DefaultRole
EC2 インスタンスプロファイル: EMR_EC2_DefaultRole
4.[クラスターを作成]を選択。
5.デプロイモードはクライアントとクラスターのうちのクラスターを選択。
6.失敗時の操作は、次へ、キャンセルして待機、クラスターを終了からクラスターを終了を選択。
7.起動モードはクラスター、ステップ実行のうちのステップ実行を選択。
8.ステップタイプはストリーミングプログラミング、Hiveプログラム、Pigプログラム、Spark アプリケーション、カスタムJARからSpark アプリケーションを選択。
しばらくすると、EMRのクラスターが終了し、出力先にファイルが生成されていることが確認できる。
JavaによるLambdaからのEMR呼び出し
Eclipseでプロジェクト作成
Apache Mavenを利用してプロジェクト管理する場合。
1.File > New > Other...からMaven > Maven Project
デフォルト設定のまま進めるので、3回[Next]を選択。
Group Id: com.sample.lambda
Artifact Id: com.sapmle.lambda
Version: 0.0.1-SNAPSHOT
Package: com.sample.lambda
[Finish]を選択。
2.pom.xmlを選択してOverviewタブ中の以下を編集。
Name: lambda-java
3.Dependenciesタブを選択。
[Add...]を選択し、以下のように記述して、aws-lambda-java-core: 1.0.0
とaws-lambda-java-events
とaws-java-sdk-core
、aws-java-sdk-emr
を追加する。
Group Id: com.amazonaws
Artifact Id: aws-lambda-java-core
Version: 1.0.0
Group Id: com.amazonaws
Artifact Id: aws-lambda-java-events
Version: 1.0.0
Group Id: com.amazonaws
Artifact Id: aws-java-sdk-core
Version: 1.11.49
Group Id: com.amazonaws
Artifact Id: aws-java-sdk-emr
Version: 1.11.49
次に重要なのが
pom.xmlを右クリックして、Maven > Add Plugin を選択し、以下のように入力する。
Group Id: org.apache.maven.plugins
Artifact Id: maven-shade-plugin
バージョン: 2.3
プロジェクトをビルドするために、Package Exploreからcom.sample.lambdaを右クリックして、Run As > Maven Build を実行する。
作成したpom.xmlは以下のようになる。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sample.lambda</groupId>
<artifactId>com.sample.lambda</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-emr</artifactId>
<version>1.11.49</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>1.11.49</version>
</dependency>
</dependencies>
<name>lambda-java</name>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
</plugin>
</plugins>
</build>
</project>
4.src/main/java中のcom.sample.lambdaのSampleCreateEMRClusterHandler.javaを以下のように作成する。
package com.sample.lambda;
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure;
import com.amazonaws.services.elasticmapreduce.model.Application;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.InstanceGroupConfig;
import com.amazonaws.services.elasticmapreduce.model.InstanceRoleType;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
public class SampleCreateEMRClusterHandler implements RequestHandler<Object, String> {
public String handleRequest(Object event, Context context) {
AWSCredentialsProvider cp = new EnvironmentVariableCredentialsProvider();
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(cp);
emr.setRegion(Region.getRegion(Regions.US_EAST_1));
JobFlowInstancesConfig instanceConfig = new JobFlowInstancesConfig()
.withKeepJobFlowAliveWhenNoSteps(false)
.withInstanceGroups(buildInstanceGroupConfigs());
RunJobFlowRequest request = new RunJobFlowRequest()
.withName("SampleCluster")
.withReleaseLabel("emr-5.0.3")
.withLogUri("s3://sample-bucket/sample-emr/logs/")
.withServiceRole("EMR_DefaultRole")
.withJobFlowRole("EMR_EC2_DefaultRole")
.withVisibleToAllUsers(true)
.withInstances(instanceConfig)
.withApplications(buildApplications())
.withSteps(buildStepConfigs());
RunJobFlowResult result = emr.runJobFlow(request);
return "Process complete.";
}
private List<Application> buildApplications() {
List<Application> apps = new ArrayList<Application>();
apps.add(new Application().withName("Hadoop"));
apps.add(new Application().withName("Spark"));
return apps;
}
private List<InstanceGroupConfig> buildInstanceGroupConfigs() {
List<InstanceGroupConfig> result = new ArrayList<InstanceGroupConfig>();
InstanceGroupConfig masterInstance = new InstanceGroupConfig()
.withName("MasterNode")
.withInstanceRole(InstanceRoleType.MASTER)
.withInstanceCount(1)
.withInstanceType("m3.xlarge");
result.add(masterInstance);
InstanceGroupConfig coreInsetance = new InstanceGroupConfig()
.withName("CoreNode")
.withInstanceRole(InstanceRoleType.CORE)
.withInstanceCount(1)
.withInstanceType("m3.xlarge");
result.add(coreInsetance);
return result;
}
private List<StepConfig> buildStepConfigs() {
List<StepConfig> result = new ArrayList<StepConfig>();
final String[] args = {
"spark-submit",
"--deploy-mode", "cluster",
"--class", "SparkExampleApp",
"s3://sample-bucket/sample-emr/src/spark-sample-project_2.11-1.0.jar",
"s3n://sample-bucket/sample-emr/data.csv",
"s3n://sample-bucket/sample-emr/output/output.csv"
};
final StepConfig sparkStep = new StepConfig()
.withName("SparkProcess")
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs(args));
result.add(sparkStep);
return result;
}
}
以上実行コードの完成。プロジェクトを再度ビルドする。
Package Explorerのcom.sample.lambdaを右クリックし、Run As
のMaven build...
を選択する。
以下のように入力して、[Run]を選択する。
Goals: package shade:shade
(追記) TerminalからのMavenによるビルド方法
Mavenをインストールしていない場合は以下の手順でインストールする。
Mavenを以下のURLからダウンロードしてくる。
http://maven.apache.org/download.cgi
mv apache-maven-3.3.9 /usr/local/
.zshrcに以下を追加
export M3_HOME=/usr/local/apache-maven-3.3.9
M3=$M3_HOME/bin
export PATH=$M3:$PATH
$ . ~/.zshrc
プロジェクト直下にterminalで移動し、以下を実行する。
bashを使用している場合は、~/.zshrc
を~/.bashrc
に置き換えて考える。
$ mvn package
target以下にcom.sapmle.lambda-0.0.1-SNAPSHOT.jarが生成される。こちらがLambdaにアップロードするものとなる。
IAMロールの作成
マネジメントコンソールからIdentity and Access Management
[ロール]から[新しいロールの作成]を選択。
手順 1: ロール名の設定
以下のように入力して[次のステップ]を選択。
ロール名: sampleRole
手順 2: ロールタイプの選択
AWS Lambdaを選択。
手順 3: 信頼性の確立
信頼関係はLambdaに設定されて飛ばされる。
手順 4: ポリシーのアタッチ
AmazonElasticMapReduceRole
を選択して、[次のステップ]を選択。
手順 5: 確認
内容を確認したら、[ロールの作成]を選択。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1492490223000",
"Effect": "Allow",
"Action": [
"elasticmapreduce:*",
"iam:*"
],
"Resource": [
"*"
]
}
]
}
Lambdaの作成
マネジメントコンソールからLamdaを選択。
[Create a Lambda function]を選択。
Select blueprint
Blank functionを選択。
Configure triggers
ここでは何も設定しないが、必要に応じてLambdaを呼び出す元となるサービスを指定する。
[Next]を選択。
Configure function
Name: SampleLambda
Runtime*: Java8
Code entry type: Upload a .ZIP or JAR file
Function package: [先ほど作成したJARファイルを指定]
Handler*: com.sample.lambda.SampleCreateEMRClusterHandler::handleRequest
Role*: Choose an existing role
Existing role: sampleRole
[Next]を選択。
Review
最後に設定内容を確認して、実際にTestを実行してみてエラーが出ないか確認してみる。
SNSによるLambdaのトリガー処理
SNSから先程作成したLambdaをトリガーするTopicを生成する。
ここでは、Node.jsから呼び出す例を取り上げます。
var AWS = require('aws-sdk');
AWS.config.update({
accessKeyId: 'AKIAXXXXXXXXXXXXXXXX',
secretAccessKey: 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
region: 'us-east-1'
});
publishSNSTopic();
const publishSNSTopic = () => {
var sns = new AWS.SNS({
apiVersion: "2010-03-31",
region: "us-east-1"
});
sns.publish({
Message: 'Start Job by triggering lambda for EMR',
Subject: "Trigger Lambda for EMR",
TopicArn: "arn:aws:sns:us-east-1:xxxxxxxxxxxx:<トピック名>"
}, function(err, data) {
if (err) console.log("Failure to publish topic by SNS");
});
}
$ node index.js
注意点
- URI schemeはs3ではなくs3nを指定。
s3: S3ブロックファイルシステム
s3n: S3ネイティブファイルシステム
以前はs3nを使っていたが、今はs3を推奨。
サービス提供者のドキュメントを読んでそれに合わせることが大事。
-
EMRでログ記録はチェックするようにする
エラーが出た時のデバッグの根拠になるから。 -
EMR: 出力先にファイルがあるとエラーになる。(output.csv/)
-
com/amazonaws/auth/AWSCredentialsProvider: class java.lang.NoClassDefFoundError
maven-shade-plugin
を忘れずに追加することが重要。
- https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/java-create-jar-pkg-maven-and-eclipse.html
- https://docs.aws.amazon.com/ja_jp/lambda/latest/dg/java-create-jar-pkg-maven-no-ide.html
java.lang.ClassNotFoundException: com.amazonaws.ClientConfigurationFactory
aws-java-sdk-core
とaws-java-sdk-emr
のバージョンを1.11.49
に統一
-
spark-submit
コマンド実行時の注意点
$ spark-submit --deploy-mode cluster --class SparkExampleApp s3n://sample-bucket/sample-emr/src/spark-sample-project_2.11-1.0.jar s3n://sample-bucket/sample-emr/data.csv s3n://sample-bucket/sample-emr/output/output.csv
じゃなくて
$ spark-submit --deploy-mode cluster --class SparkExampleApp s3://sample-bucket/sample-emr/src/spark-sample-project_2.11-1.0.jar s3n://sample-bucket/sample-emr/data.csv s3n://sample-bucket/sample-emr/output/output.csv
原因は、EMRでS3を呼び出すときに、昔はs3nを使っており、今はs3を推奨しており、ScalaのAWSをライブラリはs3nで認識するのに対して、EMRはs3じゃないと受け付けないことが原因。
参考
Scalaをsbtでビルド
http://qiita.com/kanuma1984/items/6f599c815cc8f9232228
EMRでSpark
http://www.atmarkit.co.jp/ait/articles/1609/27/news018.html
JavaによるLambda作成
http://docs.aws.amazon.com/ja_jp/ElasticMapReduce/latest/DeveloperGuide/calling-emr-with-java-sdk.html
Lambda+Java
http://qiita.com/Keisuke69/items/fc39a2f464d14480a432
Lambda+EMR
http://qiita.com/yskazuma/items/b67b1f3f8c39a7a19051
http://qiita.com/RyujiKawazoe/items/534f4b069ebea2f7d26c