LoginSignup
6
5

More than 5 years have passed since last update.

Lambdaからステップ付きのEMRクラスタを起動する

Last updated at Posted at 2016-07-14

概要

Lambdaのスケジュールドイベントを使ってEMRクラスタを起動したくなったので書いてみました。LambdaからEMRクラスタを起動し、ステップとしてS3DistCpとHiveスクリプトの実行ジョブを渡しています。

環境

  • Java8
  • EMR4.7.1

コード

public class SampleCreateEMRClusterHandler implements RequestHandler<Object, String> {

    public String handleRequest(Object event, Context context) {
        //Lambdaのロールを用いてクリデンシャル生成
        AWSCredentialsProvider cp = new EnvironmentVariableCredentialsProvider();
        AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(cp);
        emr.setRegion(Region.getRegion(Regions.US_WEST_2));//クラスタ起動するリージョン

        //インスタンスの設定
        JobFlowInstancesConfig instanceConfig = new JobFlowInstancesConfig()
                .withEc2KeyName("emr") //アクセスに用いるキーペア
                .withEc2SubnetId("subnet-xxxxxx") //サブネット
                .withEmrManagedMasterSecurityGroup("sg-xxxxxx") //マスターノードのEMRマネージドセキュリティグループ
                .withEmrManagedSlaveSecurityGroup("sg-xxxxxx") //スレーブノードのEMRマネージドセキュリティグループ
                .withAdditionalMasterSecurityGroups(Arrays.asList("sg-xxxxxx")) //マスターノードの追加セキュリティグループ
                .withAdditionalSlaveSecurityGroups(Arrays.asList("sg-xxxxxx")) //スレーブノードの追加セキュリティグループ
                .withKeepJobFlowAliveWhenNoSteps(false) //ステップ終了時にクラスターを維持するかどうか
                .withInstanceGroups(buildInstanceGroupConfigs()); //インスタンスグループの設定

        //クラスタの設定
        RunJobFlowRequest request = new RunJobFlowRequest()
                .withName("EMR-sample") //クラスタ名
                .withReleaseLabel("emr-4.7.1")
                .withLogUri("s3://path/") //クラスターログの出力先
                .withServiceRole("EMR_DefaultRole")
                .withJobFlowRole("EMR_EC2_DefaultRole")
                .withVisibleToAllUsers(true) //全IAMユーザーに見えるようにする
                .withInstances(instanceConfig)
                .withApplications(buildApplications()) //使用するアプリケーション設定
                .withSteps(buildStepConfigs()); //ステップ設定

        RunJobFlowResult result = emr.runJobFlow(request); //クラスタ起動
        return "Process complete.";
    }

    /**
     * EMRクラスタにインストールするアプリケーションリストを生成
     */
    private List<Application> buildApplications() {
        List<Application> apps = new ArrayList<>();
        apps.add(new Application().withName("Hadoop"));
        apps.add(new Application().withName("Hive"));
        apps.add(new Application().withName("Pig"));
        apps.add(new Application().withName("Spark"));
        apps.add(new Application().withName("Zeppelin-Sandbox"));
        apps.add(new Application().withName("Tez"));
        return apps;
    }

    /**
     * EMRクラスタのインスタンスグループ設定リストを生成
     */
    private List<InstanceGroupConfig> buildInstanceGroupConfigs() {
        List<InstanceGroupConfig> result = new ArrayList<>();
        InstanceGroupConfig masterInstance = new InstanceGroupConfig()
                .withName("マスターノード")
                .withInstanceRole(InstanceRoleType.MASTER) //マスターノード
                .withInstanceCount(1) //インスタンス数
                .withInstanceType("m3.xlarge") //インスタンスタイプ
                .withMarket(MarketType.SPOT) //スポットインスタンスを使う(使いたい時だけ)
                .withBidPrice("0.3"); //スポットインスタンスの入札価格
        result.add(masterInstance);

        InstanceGroupConfig coreInsetance = new InstanceGroupConfig()
                .withName("コアノード")
                .withInstanceRole(InstanceRoleType.CORE) //コアノード
                .withInstanceCount(2)
                .withInstanceType("m3.xlarge")
                .withMarket(MarketType.SPOT)
                .withBidPrice("0.3");
        result.add(coreInsetance);

        InstanceGroupConfig taskInsetance = new InstanceGroupConfig()
                .withName("タスクノード")
                .withInstanceRole(InstanceRoleType.TASK) //タスクノード(Optional)
                .withInstanceCount(2)
                .withInstanceType("m3.xlarge")
                .withMarket(MarketType.SPOT)
                .withBidPrice("0.3");
        result.add(taskInsetance);
        return result;
    }

    /**
     * EMRで実行するステップのリストを生成
     */
    private List<StepConfig> buildStepConfigs() {
        List<StepConfig> result = new ArrayList<>();
        //S3Distcpをステップに追加するサンプル
        String[] args = {
                "s3-dist-cp",
                "--src","入力元のs3 or hdfsのパス",
                "--dest", "出力先のs3 or hdfsのパス",
                "--groupBy","正規表現",
                "--targetSize","MB指定"
        };
        StepConfig s3DistCpStep = new StepConfig()
                .withName("S3DistCp")
                .withActionOnFailure(ActionOnFailure.CONTINUE) //失敗時にそのまま続行する(適宜設定)
                .withHadoopJarStep(
                        new HadoopJarStepConfig()
                                .withJar("command-runner.jar")
                                .withArgs(args)
                );
        result.add(s3DistCpStep);

        //Hiveスクリプトをステップに追加するサンプル
        //日時を変数として渡すケース
        String[] args_hive = {
                "--d","DATE=YYYYmmdd",
                "--d","HOUR=HH"
        };
        StepFactory stepFactory = new StepFactory();
        StepConfig hiveStep = new StepConfig()
                .withName("Hive Script")
                .withActionOnFailure(ActionOnFailure.CONTINUE)
                .withHadoopJarStep(stepFactory.newRunHiveScriptStep("Hiveスクリプトのパス", args_hive));
        result.add(hiveStep);

        return result;
    }

}

参考

6
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
5