3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AWSでサーバレス・イベントドリブンなバッチ

Last updated at Posted at 2019-12-20

AWSでバッチを構築する際、運用負荷を考えるとサーバレスを検討したくなると思います。
さらに、リソースの無駄をなくすためにイベントドリブンなアーキテクチャにもしたいですよね。
AWSでサーバレスでイベントドリブンなバッチのアーキテクチャとして有名なのが、S3をイベントトリガとしたLambda実行だと思います。
(前提として、ファイルのアップロードを処理実行のトリガとします。)

basic.jpg
ただ、Lambdaには様々な制限があり、Lambdaでは実現できないケースも多々あるかと思います。

今回はそのようなケースにFargateを用いてサーバレスでイベントドリブンなバッチを構築していきたいと思います。
構築はTerraformでガッと構築したいと思います。
コードはこちら

アーキテクチャ

アーキテクチャは以下のようになります。
fargate.jpg
Fargateをタスク実行のためのトリガとしては、Cloud Watch Eventsもあるのですが、その場合にはタスク定義のコンテナの上書きが指定することができず、S3トリガからのイベント情報などを扱うことができません。
そこで、今回はFargateのタスク実行はLambdaからtask_runを実行することにします。

S3-Lambda

S3をトリガとしたLambdaの実行は以下のように作成することができます。

まずはLambda関数の作成です。実行ポリシーも一緒に作っちゃいます。

resource "aws_iam_role" "default" {
  name               = var.service_name
  description        = "IAM Rolw for ${var.run_task}"
  assume_role_policy = file("${var.run_task}-role.json")
}

resource "aws_iam_policy" "default" {
  name        = var.service_name
  description = "IAM Policy for ${var.run_task}"
  policy      = file("${var.run_task}-policy.json")
}

resource "aws_iam_role_policy_attachment" "default" {
  role       = aws_iam_role.default.name
  policy_arn = aws_iam_policy.default.arn
}

resource "aws_cloudwatch_log_group" "default" {
  name              = "/aws/lambda/${var.run_task}"
  retention_in_days = 7
}

data archive_file "default" {
  type        = "zip"
  source_dir  = "../${var.run_task}"
  output_path = "${var.run_task}.zip"
}

resource "aws_lambda_function" "default" {
  filename         = "${var.run_task}.zip"
  function_name    = var.run_task
  role             = aws_iam_role.default.arn
  handler          = "main.main"
  source_code_hash = data.archive_file.default.output_base64sha256
  runtime          = "python3.7"
  environment {
    variables = {
      CLUSTER_NAME      = var.service_name
      SUBNET_ID         = var.subnet_id
      SECURITY_GROUP_ID = var.security_group_id
      TASK_DEFINITION   = aws_ecs_task_definition.default.revision
    }
  }
}

S3とLambdaの連携は以下の通り

resource "aws_s3_bucket" "default" {
  bucket = var.run_task
}

resource "aws_s3_bucket_notification" "default" {
  bucket = aws_s3_bucket.default.id

  lambda_function {
    lambda_function_arn = aws_lambda_function.default.arn
    events              = ["s3:ObjectCreated:*"]
  }
}

resource "aws_lambda_permission" "default" {
  statement_id  = "AllowExecutionFromS3Bucket"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.default.arn
  principal     = "s3.amazonaws.com"
  source_arn    = aws_s3_bucket.default.arn
}

Fargate

ここからはFargateの作成です。

resource "aws_cloudwatch_log_group" "fargate" {
  name = "/ecs/${var.service_name}"
}

resource "aws_ecs_cluster" "fargate" {
  name = var.service_name
}

resource "aws_iam_role" "fargate" {
  name               = "fargate-${var.service_name}"
  description        = "IAM Rolw for ${var.service_name}"
  assume_role_policy = file("${var.service_name}-role.json")
}

resource "aws_iam_policy" "fargate" {
  name        = "fargate-${var.service_name}"
  description = "IAM Policy for ${var.service_name}"
  policy      = file("${var.service_name}-policy.json")
}

resource "aws_iam_role_policy_attachment" "fargate" {
  role       = aws_iam_role.fargate.name
  policy_arn = aws_iam_policy.fargate.arn
}

resource "aws_ecs_task_definition" "default" {
  family                = var.service_name
  container_definitions = <<EOF
[
  {
    "name": "${var.service_name}",
    "image": "${var.ecr_image_arn}",
    "essential": true,
    "memory": 256
  }
]
EOF
  network_mode = "awsvpc"
  execution_role_arn = aws_iam_role.fargate.arn
  cpu = 1024
  memory = 2048
  requires_compatibilities = ["FARGATE"]
}

重要なのはタスク定義の作成で、Fargateを利用する場合はnetwork_modeをawsvpcに指定する必要があり、
requires_compatibilitiesでFargateの互換性を定義する必要があります。
また、その際にはcpumemoryの指定も必須になるので指定するようにお願いします。

このようにしてインフラ構成が完成しました。

task_run

最後に、Fargateのタスク実行のためのスクリプトです。
Fargateのコンテナは docker run イメージ名 python main.py hogeのようにして、第一引数をFargateのコンテナ内で扱えるようにしてあります。

import os
import boto3


def main(event, _):
    """
    hoge
    """
    filename = event['Records'][0]['s3']['bucket']['name']
    client = boto3.client('ecs')
    response = client.run_task(
        cluster=os.environ['CLUSTER_NAME'],
        count=1,
        launchType='FARGATE',
        networkConfiguration={
            'awsvpcConfiguration': {
                'subnets': [
                    os.environ['SUBNET_ID']
                ],
                'securityGroups': [
                    os.environ['SECURITY_GROUP_ID']
                ],
                'assignPublicIp': 'ENABLED'
            }
        },
        overrides={
            'containerOverrides': [
                {
                    'name': 'hogehoge',
                    'command': [
                        'python',
                        'main.py',
                        'ほげ{}'.format(filename)
                    ]
                },
            ]
        },
        taskDefinition=os.environ['TASK_DEFINITION']
    )
    print(response)

このようにすると、S3からのイベント情報をFargateに持っていくことができ、サーバレスでイベントドリブンなアーキテクチャが完成します。

参考

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?