AWSでバッチを構築する際、運用負荷を考えるとサーバレスを検討したくなると思います。
さらに、リソースの無駄をなくすためにイベントドリブンなアーキテクチャにもしたいですよね。
AWSでサーバレスでイベントドリブンなバッチのアーキテクチャとして有名なのが、S3をイベントトリガとしたLambda実行だと思います。
(前提として、ファイルのアップロードを処理実行のトリガとします。)
ただ、Lambdaには様々な制限があり、Lambdaでは実現できないケースも多々あるかと思います。
今回はそのようなケースにFargateを用いてサーバレスでイベントドリブンなバッチを構築していきたいと思います。
構築はTerraformでガッと構築したいと思います。
コードはこちら
アーキテクチャ
アーキテクチャは以下のようになります。
Fargateをタスク実行のためのトリガとしては、Cloud Watch Eventsもあるのですが、その場合にはタスク定義のコンテナの上書きが指定することができず、S3トリガからのイベント情報などを扱うことができません。
そこで、今回はFargateのタスク実行はLambdaからtask_runを実行することにします。
S3-Lambda
S3をトリガとしたLambdaの実行は以下のように作成することができます。
まずはLambda関数の作成です。実行ポリシーも一緒に作っちゃいます。
resource "aws_iam_role" "default" {
name = var.service_name
description = "IAM Rolw for ${var.run_task}"
assume_role_policy = file("${var.run_task}-role.json")
}
resource "aws_iam_policy" "default" {
name = var.service_name
description = "IAM Policy for ${var.run_task}"
policy = file("${var.run_task}-policy.json")
}
resource "aws_iam_role_policy_attachment" "default" {
role = aws_iam_role.default.name
policy_arn = aws_iam_policy.default.arn
}
resource "aws_cloudwatch_log_group" "default" {
name = "/aws/lambda/${var.run_task}"
retention_in_days = 7
}
data archive_file "default" {
type = "zip"
source_dir = "../${var.run_task}"
output_path = "${var.run_task}.zip"
}
resource "aws_lambda_function" "default" {
filename = "${var.run_task}.zip"
function_name = var.run_task
role = aws_iam_role.default.arn
handler = "main.main"
source_code_hash = data.archive_file.default.output_base64sha256
runtime = "python3.7"
environment {
variables = {
CLUSTER_NAME = var.service_name
SUBNET_ID = var.subnet_id
SECURITY_GROUP_ID = var.security_group_id
TASK_DEFINITION = aws_ecs_task_definition.default.revision
}
}
}
S3とLambdaの連携は以下の通り
resource "aws_s3_bucket" "default" {
bucket = var.run_task
}
resource "aws_s3_bucket_notification" "default" {
bucket = aws_s3_bucket.default.id
lambda_function {
lambda_function_arn = aws_lambda_function.default.arn
events = ["s3:ObjectCreated:*"]
}
}
resource "aws_lambda_permission" "default" {
statement_id = "AllowExecutionFromS3Bucket"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.default.arn
principal = "s3.amazonaws.com"
source_arn = aws_s3_bucket.default.arn
}
Fargate
ここからはFargateの作成です。
resource "aws_cloudwatch_log_group" "fargate" {
name = "/ecs/${var.service_name}"
}
resource "aws_ecs_cluster" "fargate" {
name = var.service_name
}
resource "aws_iam_role" "fargate" {
name = "fargate-${var.service_name}"
description = "IAM Rolw for ${var.service_name}"
assume_role_policy = file("${var.service_name}-role.json")
}
resource "aws_iam_policy" "fargate" {
name = "fargate-${var.service_name}"
description = "IAM Policy for ${var.service_name}"
policy = file("${var.service_name}-policy.json")
}
resource "aws_iam_role_policy_attachment" "fargate" {
role = aws_iam_role.fargate.name
policy_arn = aws_iam_policy.fargate.arn
}
resource "aws_ecs_task_definition" "default" {
family = var.service_name
container_definitions = <<EOF
[
{
"name": "${var.service_name}",
"image": "${var.ecr_image_arn}",
"essential": true,
"memory": 256
}
]
EOF
network_mode = "awsvpc"
execution_role_arn = aws_iam_role.fargate.arn
cpu = 1024
memory = 2048
requires_compatibilities = ["FARGATE"]
}
重要なのはタスク定義の作成で、Fargateを利用する場合はnetwork_mode
をawsvpcに指定する必要があり、
requires_compatibilities
でFargateの互換性を定義する必要があります。
また、その際にはcpu
とmemory
の指定も必須になるので指定するようにお願いします。
このようにしてインフラ構成が完成しました。
task_run
最後に、Fargateのタスク実行のためのスクリプトです。
Fargateのコンテナは docker run イメージ名 python main.py hoge
のようにして、第一引数をFargateのコンテナ内で扱えるようにしてあります。
import os
import boto3
def main(event, _):
"""
hoge
"""
filename = event['Records'][0]['s3']['bucket']['name']
client = boto3.client('ecs')
response = client.run_task(
cluster=os.environ['CLUSTER_NAME'],
count=1,
launchType='FARGATE',
networkConfiguration={
'awsvpcConfiguration': {
'subnets': [
os.environ['SUBNET_ID']
],
'securityGroups': [
os.environ['SECURITY_GROUP_ID']
],
'assignPublicIp': 'ENABLED'
}
},
overrides={
'containerOverrides': [
{
'name': 'hogehoge',
'command': [
'python',
'main.py',
'ほげ{}'.format(filename)
]
},
]
},
taskDefinition=os.environ['TASK_DEFINITION']
)
print(response)
このようにすると、S3からのイベント情報をFargateに持っていくことができ、サーバレスでイベントドリブンなアーキテクチャが完成します。