はじめに
Javaで作られたWEBの画面上からSparkを起動したかったので、今回はAWS SDK for Javaを使ってSparkSubmitしてみたいと思います。
これだ!という記事が見つからなかったのでQiitaに残しておこうかなと思います。
今回作ったのはCSVデータをParquetに変換するだけのJOBです。
まずはEMRでクラスタを起動してマネジメントコンソールから実行してみます。
次に以下を自動化してみたいと思います。
- EMRの起動
- JOBの実行
- EMRクラスタ停止
ソースはこちらにあります。
https://github.com/uzresk/spark-samples.git
CSVをParquetに変換するJOBを作る
引数にCSVのバケットURLとParquetの出力先URLを受け取ります。
Schemaは自動変換、ヘッダは有としてDataframeを読み込み、Parquetに変換しています。
Java
mvn packageしてjarファイルを作ります。
package jp.gr.java_conf.uzresk.samples.spark.jobs;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class Csv2Parquet {
public static void main(String[] args) {
String input = args[0];
String output = args[1];
SparkSession spark = SparkSession.builder().appName("Csv2ParquetJavaApplication").getOrCreate();
Dataset<Row> df = spark.read().option("header", "true").option("inferSchema", "true").csv(input);
df.write().mode(SaveMode.Overwrite).parquet(output);
}
}
Scala
おまけでScalaでも書いてみました。sbtでpackageしてください。
package jp.gr.java_conf.uzresk.samples.spark.jobs
import org.apache.spark.sql.{SaveMode, SparkSession}
object Csv2Parquet {
def main(args: Array[String]) {
val input = args(0)
val output = args(1)
val spark = SparkSession.builder.appName("Csv2ParquetJavaAppliation").getOrCreate
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(input)
df.write.mode(SaveMode.Overwrite).parquet(output)
}
}
EMRクラスタを起動してJOBを手動でSubmitする
クラスタ起動時に気にするポイントを何点か。
1.ソフトウェアはSparkだけで大丈夫です。
2.コアノードの数はCSVデータの大きさなどに合わせて調整してください
今回は2GB程度のデータで試しました。コアノード2つで2分もあれば変換が終わります。
3.ログの設定をしておきましょう
クラスタの起動ボタンを押したら、ステップを追加していきます。
1.ステップタブを開き、「ステップの追加」ボタンを押します。
2.ステップを追加していきます
3.クラスタが起動していれば、すぐに「保留中」→「実行中」と処理が動いていきます
失敗した場合は矢印をクリックすることで標準エラー出力が確認できます。ここはログの設定を行っていないのとなにも出てきませんので注意してください。
起動しているクラスタにSTEPを追加する
StepConfigというところにJOBの定義を行い、クラスタIDを指定してSTEPを追加します。
画面で設定した内容と同じなのであまり迷うところはないかと思います。
package jp.gr.java_conf.uzresk.samples.spark.emr;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;
import java.util.ArrayList;
import java.util.List;
public class SparkSubmitSingleStepRunningCluster {
private static final String CLUSTER_ID = "j-1VOWW7554GD9Z";
public static void main(String[] args) {
new SparkSubmitSingleStepRunningCluster().startJob();
}
private void startJob() {
ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
.withClientConfiguration(cc)
.withRegion(Regions.AP_NORTHEAST_1)
.build();
List<StepConfig> stepConfigs = new ArrayList<>();
stepConfigs.add(createCsv2ParquetConfig());
AddJobFlowStepsRequest req = new AddJobFlowStepsRequest(CLUSTER_ID, stepConfigs);
AddJobFlowStepsResult result = emr.addJobFlowSteps(req);
System.out.println(result.getStepIds());
}
private StepConfig createCsv2ParquetConfig() {
HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("spark-submit",
"--executor-memory", "1g",
"--deploy-mode", "cluster",
"--class", Resource.getString("job.class"),
Resource.getString("job.jar"),
Resource.getString("csv2parquet.csv-file"),
Resource.getString("csv2parquet.parquet-path"));
return new StepConfig()
.withName("Csv2Parquet")
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(sparkStepConf);
}
}
クラスタの起動→STEP実行→クラスタの停止を自動化
こんな感じで書きます。
EMRのクラスタ起動のパラメータがとにかく多いのですがそれ以外は先ほどの例と同じですね。
package jp.gr.java_conf.uzresk.samples.spark.emr;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.AmazonClientConfigurationBuilder;
import jp.gr.java_conf.uzresk.samples.spark.emr.utils.Resource;
import java.util.ArrayList;
import java.util.List;
import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.CORE;
import static com.amazonaws.services.elasticmapreduce.model.InstanceRoleType.MASTER;
public class SparkSubmitIncludingStartupCluster {
public static void main(String[] args) {
ClientConfiguration cc = AmazonClientConfigurationBuilder.clientConfiguration().orElse(new ClientConfiguration());
AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
.withClientConfiguration(cc)
.withRegion(Regions.AP_NORTHEAST_1)
.build();
new SparkSubmitIncludingStartupCluster().launch(emr);
}
private void launch(AmazonElasticMapReduce emr) {
RunJobFlowRequest request = createRequest();
RunJobFlowResult result = emr.runJobFlow(request);
System.out.println(result.getJobFlowId() + " is starting");
}
private RunJobFlowRequest createRequest() {
return new RunJobFlowRequest()
.withName(Resource.getString("emr.cluster-name"))
.withReleaseLabel(Resource.getString("emr.release-label"))
.withSteps(createStepConfig())
.withApplications(createApplicationList())
.withTags(new Tag("Name", Resource.getString("emr.cluster-name")))
.withEbsRootVolumeSize(Resource.getInt("emr.ebs-root-volume-size"))
.withServiceRole(Resource.getString("emr.service-role"))
.withAutoScalingRole(Resource.getString("emr.auto-scaling-role"))
.withJobFlowRole(Resource.getString("emr.job-flow-role"))
.withLogUri(Resource.getString("emr.log-uri"))
.withVisibleToAllUsers(true)
.withInstances(createJobFlowInstancesConfig(createMasterConfig(), createCoreConfig()));
}
private List<StepConfig> createStepConfig() {
List<StepConfig> configs = new ArrayList<>();
configs.add(createDebugConfig());
configs.add(createCsv2ParquetConfig());
return configs;
}
private StepConfig createDebugConfig() {
String COMMAND_RUNNER = "command-runner.jar";
String DEBUGGING_COMMAND = "state-pusher-script";
String DEBUGGING_NAME = "Setup Hadoop Debugging";
return new StepConfig()
.withName(DEBUGGING_NAME)
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(
new HadoopJarStepConfig().withJar(COMMAND_RUNNER).withArgs(DEBUGGING_COMMAND));
}
private StepConfig createCsv2ParquetConfig() {
HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("spark-submit",
"--executor-memory", "1g",
"--deploy-mode", "cluster",
"--class", Resource.getString("job.class"),
Resource.getString("job.jar"),
Resource.getString("csv2parquet.csv-file"),
Resource.getString("csv2parquet.parquet-path"));
return new StepConfig()
.withName("Csv2Parquet")
.withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
.withHadoopJarStep(sparkStepConf);
}
private ArrayList<Application> createApplicationList() {
ArrayList<Application> applications = new ArrayList<>();
applications.add(new Application().withName("Spark"));
return applications;
}
private InstanceGroupConfig createMasterConfig() {
return new InstanceGroupConfig()
.withInstanceCount(Resource.getInt("emr.master.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(MASTER)
.withInstanceType(Resource.getString("emr.master.instance-type"))
.withName(Resource.getString("emr.master.name"));
}
private InstanceGroupConfig createCoreConfig() {
return new InstanceGroupConfig()
.withInstanceCount(Resource.getInt("emr.core.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(CORE)
.withInstanceType(Resource.getString("emr.core.instance-type"))
.withName(Resource.getString("emr.core.name"));
}
private JobFlowInstancesConfig createJobFlowInstancesConfig(
InstanceGroupConfig masterConfiguration, InstanceGroupConfig coreConfiguration) {
return new JobFlowInstancesConfig()
.withInstanceGroups(masterConfiguration, coreConfiguration)
.withEc2SubnetId(Resource.getString("emr.subnet-id"))
.withEmrManagedMasterSecurityGroup(Resource.getString("emr.master-sg"))
.withEmrManagedSlaveSecurityGroup(Resource.getString("emr.slave-sg"))
.withEc2KeyName(Resource.getString("emr.key-name"))
.withKeepJobFlowAliveWhenNoSteps(fales); // JOB終了時にインスタンスを停止する
}
}
最終行にある、withKeepJobFlowAliveWhenNoStepsをfalseにしておくとJOB実行後クラスタが自動で停止します。
スポットインスタンスを利用する
スポットインスタンスを利用することでバッチ処理をさらにお安く実行できます。経験上半額以下にはなりますね。
今回使っているのはm4.xlargeのオンデマンドインスタンスの価格は$0.258です。
スポットインスタンスの価格はおおよそ0.06~0.10あたりを推移しているようでしたので、$0.10あたりで価格を設定して起動します。(今回は確実に動かしたかったので)
SDKを利用する場合はMASTER,COREそれぞれ2行追加するだけで動かすことができます。
private InstanceGroupConfig createMasterConfig() {
return new InstanceGroupConfig()
.withBidPrice("0.10") // $0.10
.withMarket(MarketType.SPOT) // Spotインスタンスを利用する
.withInstanceCount(Resource.getInt("emr.master.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.master.volume-size")).withVolumeType(Resource.getString("emr.master.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(MASTER)
.withInstanceType(Resource.getString("emr.master.instance-type"))
.withName(Resource.getString("emr.master.name"));
}
private InstanceGroupConfig createCoreConfig() {
return new InstanceGroupConfig()
.withBidPrice("0.10") // $0.10
.withMarket(MarketType.SPOT) // Spotインスタンスを利用する
.withInstanceCount(Resource.getInt("emr.core.instance-count"))
.withEbsConfiguration(
new EbsConfiguration()
.withEbsBlockDeviceConfigs(
new EbsBlockDeviceConfig()
.withVolumeSpecification(
new VolumeSpecification().withSizeInGB(Resource.getInt("emr.core.volume-size")).withVolumeType(Resource.getString("emr.core.volume-type")))
.withVolumesPerInstance(1))
.withEbsOptimized(true))
.withInstanceRole(CORE)
.withInstanceType(Resource.getString("emr.core.instance-type"))
.withName(Resource.getString("emr.core.name"));
}
EMRの「まとめ」タブを確認することでスポットインスタンスで動いていることを確認できるはずです。