LoginSignup
5
6

More than 5 years have passed since last update.

EMR+Lambdaでバッチ処理を自動化する(Lambdaアプリの作成)

Posted at

LambdaからEMRを起動するための クソ コードになります。

記述言語

  • java8

Lambdaが呼び出す場所

EMR+Lambdaでバッチ処理を自動化する(Lambdaの準備編)でLambdaが実行するコード指定に

パッケージ.クラス::メソッド

# 今回は, hoge.Hoge::run のようになります

があったんですが、実際の中身になります。

public class Hoge {

    /**
     * 実行
     * @param data
     * @param context
     * @return
     */
    public String run(final Map<String, String> data, final Context context) {

        //EMR起動(後述)

        "complete!"
    }

第1引数がMap<String, String>なのは、Jsonが来る想定になっています。Jsonがネストする場合、Map<String, Object>が正解かも。

Stringで受けてしまうとエラーが起こるっぽいので注意

EMRの起動について

EMRを使ったことがある方は既にお分かりの方が多いと思いますが、EMRを設定するためには

  • EMRが利用するソフトウェア(Hadoop, Sparkなど)の設定
  • EMRが立ち上げるEC2インスタンスの設定
  • それぞれのセキュリティの設定
  • EMRで何をするかのStepの設定

の4つ程度に分かれるとおもいます。
それぞれどのように設定していくか、紹介します。

EMRが利用するソフトウェア(Hadoop, Sparkなど)の設定

RunJobFlowRequest job = new RunJobFlowRequest();
job.withName("EMRの名前");

// 現在私は、emr-5.0.0を使いたいので "emr-5.0.0" と、入れています。
job.withReleaseLabel("EMRのバージョン");

job.withLogUri("ログを排出するS3のパス s3://~~~");
job.withServiceRole("EMRに設定するIamRole");
job.withJobFlowRole("EMRが起動するEC2に設定するIamRole");
job.withVisibleToAllUsers(true);
job.withInstances("EMRが立ち上げるEC2インスタンスの設定(後述)");
job.withApplications("EMRに入れるソフトウェア");
job.withConfigurations("EMRに入れるソフトウェアの設定");
    }

  • EMRに入れるソフトウェア

    私の場合は、Hadoop + Spark + Gangliaを入れたかったので以下のようにしています。

    private static List<Application> buildApplications() {
        List<Application> apps = new ArrayList<>();
        apps.add(new Application().withName("Hadoop"));
        apps.add(new Application().withName("Spark"));
        apps.add(new Application().withName("Ganglia"));
        return apps;
    }
    
  • EMRに入れるソフトウェアの設定

    Sparkのおまじないとして、以下を入れています。

    [{"classification":"spark","properties":{"maximizeResourceAllocation":"true"}}]
    

    java上では以下のように記述します。

    final Configuration configuration = new Configuration();
    configuration.setClassification("spark");
    
    final HashMap<String, String> map = new HashMap<>();
    map.put("maximizeResourceAllocation", "true");
    configuration.setProperties(map);
    
    return configuration;
    
    

EMRが立ち上げるEC2インスタンスの設定、およびセキュリティ

final JobFlowInstancesConfig instanceConfig = new JobFlowInstancesConfig();
instanceConfig.withEc2KeyName("key pair名");
instanceConfig.withEc2SubnetId("EC2を立ち上げるサブネット名");
instanceConfig.withEmrManagedMasterSecurityGroup("マスターのSecurityGroup");
instanceConfig.withEmrManagedSlaveSecurityGroup("コア、タスクのSecurityGroup");
instanceConfig.withAdditionalMasterSecurityGroups("マスターへの追加のセキュリティグループ");
instanceConfig.withAdditionalSlaveSecurityGroups("コア、タスクの追加のセキュリティグループ");
instanceConfig.withKeepJobFlowAliveWhenNoSteps("Jobがない時、EMRを落とすかどうか 落とすならfalse");
instanceConfig.withInstanceGroups("インスタンス自体の設定(後述)");
instanceConfig.withTerminationProtected("インスタンスの保護");
  • インスタンス自体の設定に関して

    final List<InstanceGroupConfig> result = new ArrayList<>();
    InstanceGroupConfig masterInstance = new InstanceGroupConfig()
                .withName("マスターノード")
                .withInstanceRole(InstanceRoleType.MASTER)
                .withInstanceCount(1)
                .withInstanceType("インスタンスタイプ") //インスタンスタイプ
                .withMarket(MarketType.SPOT)
                .withBidPrice("0.14");
        result.add(masterInstance);
    
    InstanceGroupConfig coreInsetance = new InstanceGroupConfig()
                .withName("コアノード")
                .withInstanceRole(InstanceRoleType.CORE)
                .withInstanceCount(1)
                .withInstanceType("インスタンスタイプ")
                .withMarket(MarketType.SPOT)
                .withBidPrice("0.14");
        result.add(coreInsetance);
    
        InstanceGroupConfig taskInsetance = new InstanceGroupConfig()
                .withName("タスクノード")
                .withInstanceRole(InstanceRoleType.TASK)
                .withInstanceCount(4)
                .withInstanceType("インスタンスタイプ")
                .withMarket(MarketType.SPOT)
                .withBidPrice("0.14");
        result.add(taskInsetance);
        return result;
    

EMRで何をするかのStepの設定

私の場合は、Sparkアプリケーションで分散集計処理をするので
以下のようなコマンドをStepとして実行させます。

spark-submit --deploy-mode cluster --class my.Job s3://spark-app.jar 引数1 引数2 ...

これをJavaのコードにする場合は、以下となります。

final String[] args = {
                    "spark-submit",
                    "--deploy-mode", "cluster",
                    "--class", "my.Job",
                    "s3://spark-app.jar", 
                    "引数1", "引数2"
            };

final StepConfig stepConfig = new StepConfig()
                    .withName("step名")
                    .withActionOnFailure("終了時の振る舞い")
                    .withHadoopJarStep(new HadoopJarStepConfig()
                            .withJar("command-runner.jar")
                            .withArgs(args));

return stepConfig;

EMR起動

先ほど作ったEMRの設定をするインスタンスに,Stepを追加します

// RunJobFlowRequest job = new RunJobFlowRequest();
// ・・・

job.withSteps(steps);

以下のコードでEMRが起動します

final AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient();
emr.setRegion(Region.getRegion(Regions.AP_NORTHEAST_1));

emr.runJobFlow(job);
5
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
6