概要
Lambdaのスケジュールドイベントを使ってEMRクラスタを起動したくなったので書いてみました。LambdaからEMRクラスタを起動し、ステップとしてS3DistCpとHiveスクリプトの実行ジョブを渡しています。
環境
- Java8
- EMR4.7.1
コード
public class SampleCreateEMRClusterHandler implements RequestHandler<Object, String> {
public String handleRequest(Object event, Context context) {
//Lambdaのロールを用いてクリデンシャル生成
AWSCredentialsProvider cp = new EnvironmentVariableCredentialsProvider();
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(cp);
emr.setRegion(Region.getRegion(Regions.US_WEST_2));//クラスタ起動するリージョン
//インスタンスの設定
JobFlowInstancesConfig instanceConfig = new JobFlowInstancesConfig()
.withEc2KeyName("emr") //アクセスに用いるキーペア
.withEc2SubnetId("subnet-xxxxxx") //サブネット
.withEmrManagedMasterSecurityGroup("sg-xxxxxx") //マスターノードのEMRマネージドセキュリティグループ
.withEmrManagedSlaveSecurityGroup("sg-xxxxxx") //スレーブノードのEMRマネージドセキュリティグループ
.withAdditionalMasterSecurityGroups(Arrays.asList("sg-xxxxxx")) //マスターノードの追加セキュリティグループ
.withAdditionalSlaveSecurityGroups(Arrays.asList("sg-xxxxxx")) //スレーブノードの追加セキュリティグループ
.withKeepJobFlowAliveWhenNoSteps(false) //ステップ終了時にクラスターを維持するかどうか
.withInstanceGroups(buildInstanceGroupConfigs()); //インスタンスグループの設定
//クラスタの設定
RunJobFlowRequest request = new RunJobFlowRequest()
.withName("EMR-sample") //クラスタ名
.withReleaseLabel("emr-4.7.1")
.withLogUri("s3://path/") //クラスターログの出力先
.withServiceRole("EMR_DefaultRole")
.withJobFlowRole("EMR_EC2_DefaultRole")
.withVisibleToAllUsers(true) //全IAMユーザーに見えるようにする
.withInstances(instanceConfig)
.withApplications(buildApplications()) //使用するアプリケーション設定
.withSteps(buildStepConfigs()); //ステップ設定
RunJobFlowResult result = emr.runJobFlow(request); //クラスタ起動
return "Process complete.";
}
/**
* EMRクラスタにインストールするアプリケーションリストを生成
*/
private List<Application> buildApplications() {
List<Application> apps = new ArrayList<>();
apps.add(new Application().withName("Hadoop"));
apps.add(new Application().withName("Hive"));
apps.add(new Application().withName("Pig"));
apps.add(new Application().withName("Spark"));
apps.add(new Application().withName("Zeppelin-Sandbox"));
apps.add(new Application().withName("Tez"));
return apps;
}
/**
* EMRクラスタのインスタンスグループ設定リストを生成
*/
private List<InstanceGroupConfig> buildInstanceGroupConfigs() {
List<InstanceGroupConfig> result = new ArrayList<>();
InstanceGroupConfig masterInstance = new InstanceGroupConfig()
.withName("マスターノード")
.withInstanceRole(InstanceRoleType.MASTER) //マスターノード
.withInstanceCount(1) //インスタンス数
.withInstanceType("m3.xlarge") //インスタンスタイプ
.withMarket(MarketType.SPOT) //スポットインスタンスを使う(使いたい時だけ)
.withBidPrice("0.3"); //スポットインスタンスの入札価格
result.add(masterInstance);
InstanceGroupConfig coreInsetance = new InstanceGroupConfig()
.withName("コアノード")
.withInstanceRole(InstanceRoleType.CORE) //コアノード
.withInstanceCount(2)
.withInstanceType("m3.xlarge")
.withMarket(MarketType.SPOT)
.withBidPrice("0.3");
result.add(coreInsetance);
InstanceGroupConfig taskInsetance = new InstanceGroupConfig()
.withName("タスクノード")
.withInstanceRole(InstanceRoleType.TASK) //タスクノード(Optional)
.withInstanceCount(2)
.withInstanceType("m3.xlarge")
.withMarket(MarketType.SPOT)
.withBidPrice("0.3");
result.add(taskInsetance);
return result;
}
/**
* EMRで実行するステップのリストを生成
*/
private List<StepConfig> buildStepConfigs() {
List<StepConfig> result = new ArrayList<>();
//S3Distcpをステップに追加するサンプル
String[] args = {
"s3-dist-cp",
"--src","入力元のs3 or hdfsのパス",
"--dest", "出力先のs3 or hdfsのパス",
"--groupBy","正規表現",
"--targetSize","MB指定"
};
StepConfig s3DistCpStep = new StepConfig()
.withName("S3DistCp")
.withActionOnFailure(ActionOnFailure.CONTINUE) //失敗時にそのまま続行する(適宜設定)
.withHadoopJarStep(
new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs(args)
);
result.add(s3DistCpStep);
//Hiveスクリプトをステップに追加するサンプル
//日時を変数として渡すケース
String[] args_hive = {
"--d","DATE=YYYYmmdd",
"--d","HOUR=HH"
};
StepFactory stepFactory = new StepFactory();
StepConfig hiveStep = new StepConfig()
.withName("Hive Script")
.withActionOnFailure(ActionOnFailure.CONTINUE)
.withHadoopJarStep(stepFactory.newRunHiveScriptStep("Hiveスクリプトのパス", args_hive));
result.add(hiveStep);
return result;
}
}
参考