Help us understand the problem. What is going on with this article?

【AWS】ECR, ECS(Fargate), StepFunctions, CloudWatch でバッチ処理~後編~

はじめに

本記事は、AWS FargateタスクとActivityステートマシンを利用して
・ソースの管理用リポジトリ(ECR)
・Activityをポーリングするワーカー
・アクティビティの処理内容
を実装しつつ、CloudWatchで定期実行するバッチ処理を作成するための概要(&備忘録)の後編です。

前編こちらです。

筆者自身、まだ理解が追いついていないと感じる部分はありますが、初学者の方の一助にでもなれば幸いです。

誤字・脱字や前編での疑問点などあればコメントいただけると幸いです。

改めて、手順の全体は以下の通り。

  1. ECR リポジトリの作成
  2. アクティビティステートマシンの作成
    1. アクティビティの作成
    2. アクティビティステートマシン用のタスク定義ファイルを作成
    3. アクティビティステートマシンの作成
  3. docker イメージの準備
    1. ソースのbuildとタグ付け
    2. リポジトリへpush
  4. ECS クラスターの作成
  5. ECS タスク定義の作成
    1. コンテナに登録するタスク定義ファイルの作成
    2. Fargateタスク定義の登録
  6. CloudWatch Event 設定
    1. Fargateタスク起動用ルールの作成
    2. Fargateタスク起動用ターゲットの作成と割り当て
    3. アクティビティステートマシン用ルールの作成
    4. アクティビティステートマシン用ターゲットの作成と割り当て

上記の 2-3. アクティビティステートマシンの作成までを前編で実施済みなので、以降は 3. dockerイメージの準備からとします。

事前準備

Lambdaファンクションの作成

サンプルは以下のようなコードです。コピペしてそのまま使えます。
必要なSDKをインストールするために pom.xml を編集する必要があるかと思いますが、今回は割愛します。

サンプルコード
package AwsServices.samples;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.stepfunctions.AWSStepFunctions;
import com.amazonaws.services.stepfunctions.AWSStepFunctionsClientBuilder;
import com.amazonaws.services.stepfunctions.model.GetActivityTaskRequest;
import com.amazonaws.services.stepfunctions.model.GetActivityTaskResult;
import com.amazonaws.services.stepfunctions.model.SendTaskFailureRequest;
import com.amazonaws.services.stepfunctions.model.SendTaskSuccessRequest;
import com.fasterxml.jackson.databind.JsonNode;
import com.amazonaws.util.json.Jackson;

public class GreeterActivities {

    static final com.amazonaws.regions.Regions TARGET_REGION = Regions.AP_NORTHEAST_1;
    static final String ACTIVITY_HELLO_WORLD = "HelloWorldActivity.properties";

    public void main(final String[] args) throws Exception {
        GreeterActivities greeterActivities = new GreeterActivities();
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setSocketTimeout((int)TimeUnit.SECONDS.toMillis(0)); // 0 = タイムアウトなし

        // StepFunctions設定ファイルの読み込み
        Properties properties = new Properties();
        try {
            properties.load(this.getClass().getResourceAsStream("/env/" + ACTIVITY_HELLO_WORLD));
        } catch (IOException e) {
            System.out.println("Failed : cannot read of properties of StepFunctions Activity.\n" + e.getMessage() + "\n");
        }

        AWSStepFunctions client = AWSStepFunctionsClientBuilder.standard()
                .withRegion(TARGET_REGION).withCredentials(new EnvironmentVariableCredentialsProvider())
                .withClientConfiguration(clientConfiguration)
                .build();

        while (true) {
            // ワーカーの実装
            GetActivityTaskResult getActivityTaskResult = client.getActivityTask(
                            new GetActivityTaskRequest().withActivityArn(properties.getProperty("activity.arn")));

            if (getActivityTaskResult.getTaskToken() != null) {
                try {
                    JsonNode json = Jackson.jsonNodeOf(getActivityTaskResult.getInput());
                    String greetingResult = greeterActivities.helloWorld(json.get("world").textValue());
                    client.sendTaskSuccess(
                            new SendTaskSuccessRequest().withOutput(greetingResult).withTaskToken(getActivityTaskResult.getTaskToken()));
                } catch (Exception e) {
                    client.sendTaskFailure(new SendTaskFailureRequest().withTaskToken(
                            getActivityTaskResult.getTaskToken()));
                }
            } else {
                Thread.sleep(1000);
            }
        }
    }

        // タスクの実際の処理内容
    private String helloWorld(String world) {
        return "{\"Hello\": \"" + world + "\"}";
    }
}

/src/main/resources/env 以下には HelloWorldActivity.properties を作成します。

access.key=$IAM_ACCESS_KEY
secret.key=$IAM_ACCESS_SECRET
activity.arn=arn:aws:states:$REGION:$ACCOUNT_ID:activity:HelloWorldActivity

3. dockerイメージの準備

上記のLambdaファンクションのタスク本体を含めてdockerイメージを作成するため、Dockerfileを作成しましょう。

Dockerfileの例
FROM openjdk:8-jre-alpine
WORKDIR /opt/app/
ADD ./XXXXXXXX-0.0.1-SNAPSHOT.jar /opt/app/

EXPOSE $PORT
RUN sh -c 'touch /opt/app/XXXXXXXX-0.0.1-SNAPSHOT.jar'
ENTRYPOINT [ "sh", "-c", "java -Dlog4j.configurationFile=log4j2.xml -cp /opt/app/XXXXXXXX-0.0.1-SNAPSHOT.jar com.personal.XXXXXXXX. GreeterActivities.main" ]

XXXXXXXXとなっているプロジェクト名や、EXPOSEするポートは環境に合わせて定義して下さい。
Dockerfileの各パラメータなどについては、本記事では扱いません。
タスクとして作成したJavaのソースをmavenビルドし、出来上がったjarファイルをDockerfileと同じディレクトリに格納しましょう。

3-1. ソースのビルドとタグ付け(コマンド)

タグ付けとビルドはコマンドで行ってしまいます。

docker build -t $REPOSITORY:$TAG .

ここで使用した$REPOSITORY前編で作成したリポジトリのARNを使用します。
$TAGはバージョンを使用すると管理が楽ですが、特に気にしない場合はlatestなどとしておきましょう。

末尾の.(dot)はソース及びDockerfileのあるディレクトリを指定します。

3-2. リポジトリへのPush(コマンド)

こちらもコマンドで実施します。

docker push $IMAGE_NAME

3-1でのビルド後、docker imagesコマンドなどでイメージの一覧が確認できますが、
$IMAGE_NAME$REPOSITORY:$TAGに相当します。
これによりdockerコマンドが適切なリポジトリへPushしてくれます。

4. ECSクラスターの作成

Pushしたコンテナイメージを稼働させるためのクラスタを作成します。

aws ecs create-cluster --cluster-name "hello-world-cluster"

クラスタ名は任意です。ここでは仮にhello-world-clusterとしておきます。

5. ECS タスク定義の作成と登録

5-1. コンテナに登録するタスク定義ファイルの作成

コンテナが実際に実行するタスクの定義を、JSONで記述していきます。

タスク定義の例
{
  "family": "MyFamily",
  "taskRoleArn": "arn:aws:iam::XXXXXXXXXXXX:role/MY_ROLE",
  "executionRoleArn": "arn:aws:iam::XXXXXXXXXXXX:role:role/MY_ROLE",
  "networkMode": "awsvpc",
  "containerDefinitions": [
    {
      "name": "MyContainerName",
      "image": "$REPOSITORY_URI",
      "portMappings": [
        {
          "hostPort": 1234,
          "protocol": "tcp",
          "containerPort": 1234
        },
        {
          "hostPort": 54321,
          "protocol": "tcp",
          "containerPort": 54321
        }
      ],
      "essential": true,
      "entryPoint": [
        "コンテナが実行する docker run コマンドの --entrypoint オプションに渡す内容"
      ],
      "command": [
        ""
      ],
      "environment": [
        {
          "name": "ENV",
          "value": "development"
        }
      ],
      "logConfiguration": {
        "logDriver": "awslogs",
        "options": {
          "awslogs-group": "/logs/MyContainerLogs",
          "awslogs-region": "ap-northeast-1",
          "awslogs-stream-prefix": "logs"
        }
      }
    }
  ],
  "requiresCompatibilities": [
    "コンテナの起動タイプ。「EC2」または「FARGATE」を使用"
  ],
  "cpu": "2048",
  "memory": "4096"
}

各パラメータの定義はこちらを参照ください。

タスク定義ファイルのJSONスケルトンだけが欲しい場合は、↓のコマンドで取得可能です。

aws ecs register-task-definition --generate-cli-skeleton

5-2. タスク定義の登録

それでは、実際に作成したタスク定義をコンテナへ登録しましょう。

登録用コマンド
aws ecs register-task-definition --cli-input-json file://$TASK_DEFINITION_FILE

# 確認
aws ecs list-task-definitions

6. CloudWatch Event 設定

6-1. Fargateタスク起動用ルールの作成

FARGATEタスクを起動させるためのCloudWatchイベントを定義します。

aws events put-rule --schedule-expression "cron(0 20 10 * ? *)" --role-arn "arn:aws:iam::XXXXXXXXXXXX:role/MY_ROLE" --name MyTestRule

スケジュール(実行タイミング)に cron を使用していますが、引数の数はAWS独自で、通常のLinux系コマンドにある cron よりは1つ多いです。

6-2. Fargateタスク起動用ターゲットの作成と割り当て

イベントルールが定義できたら、イベントの呼び出し対象となるターゲットを定義します。

{
  "Rule": "MyTestRule", # 登録するルール名
  "Targets": [
    {
      "Id": "target_01",
      "Arn": "arn:aws:ecs:ap-northeast-1:XXXXXXXXXXXX:cluster/hello-world-cluster",
      "RoleArn": "arn:aws:iam::XXXXXXXXXXXX:role/My_ROLE",
      "EcsParameters": {
        "TaskDefinitionArn": "arn:aws:ecs:ap-northeast-1:XXXXXXXXXXXX:task-definition/TestHelloWorldTask",
        "TaskCount": 1,
        "LaunchType": "FARGATE",
        "NetworkConfiguration": {
          "awsvpcConfiguration": {
            "Subnets": [
              "subnet-0a123456789az123"
            ],
            "SecurityGroups": [
              "sg-0d00abcd1e1f1g999",
              "sg-0123x4x999x9999xy"
            ],
            "AssignPublicIp": "ENABLED"
          }
        }
      }
    }
  ]
}

上記も例によってkeyに対するvalueを適当な値に置き換えています。
作成したJSONを使用して、ルールにターゲットを割り当てます。

aws events put-targets --cli-input-json file://$TARGET_DEFINITION_FILE

ここまで、アクティビティ、ステートマシン、イベントルール、ターゲットなど複数の要素が出てきて複雑化してしまっていますが、
ポーリングなどに関する実行イメージは クラスメソッドさんのこの記事が分かりやすいと思います。

6-3. アクティビティステートマシン用ルールの作成

Fargateタスクに関する設定は概ね完了したので、Fargateタスクがポーリングするアクティビティ用ステートマシンに関するスケジュールを登録していきます。

アクティビティステートマシンを起動するためのEventを作成
aws events put-rule --name $RULE_NAME --schedule-expression $SCHEDULE --role-arn "arn:aws:iam::XXXXXXXXXXXX:role/MY_ROLE"

6-4. アクティビティステートマシン用ターゲットの作成と割り当て

イベントの作成ができたので、最後に該当のイベントにターゲットとしてアクティビティステートマシンを登録します。
これにより、スケジューリングしたタイミングに、アクティビティステートマシンをアプリケーションからのポーリングを待つ状態で起動させることができます。

aws events put-targets --rule vImportSqlServerDataByFargate \
    --targets "Id"="vImportHoshoDataByFargate"\
    ,"Arn"="arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:stateMachine:$STATE_MACHINE_NAME","RoleArn"="arn:aws:iam::XXXXXXXXXXXX:role/MY_ROLE"

以上でセットアップは全て完了です。

  • それぞれのルールが定義したタイミングで起動している
  • アプリケーションがステートマシンをポーリングできている
  • ステートマシンが応答を返し、アプリケーションが実行できている

といった点を確認できればハッピーエンドです :clap:

最後に

本記事は筆者の実行の履歴ベースで執筆しました。
「◯◯を使えばもっと楽だよ」「◯◯は要らないのでは」などご指摘あればお願いします。

長くなりましたが、最後までご覧いただきありがとうございました。m(_ _)m

補足

1. ロググループの作成

手動実行による起動時、タスクに定義されたロググループが作成されていない場合、手動であらかじめ作成しておく必要があります。
定義されたロググループが存在しない場合、aws ecs run-taskコマンドは失敗するようになっているので注意。

aws logs create-log-group --log-group-name $LOG_GROUP_NAME
Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away