はじめに
定期的にAPIからデータを取得するシステムがあり、これを分散処理するにあたってSQSを利用しているが、
SQSのメッセージ数が多いほど処理するworkerを増やして、処理速度を上げたいということでいろいろ試行錯誤した結果です。
需要あるか知らないですが、一応共有です。
AWS CDKとは
AWS Cloud Development Kitのことで、インフラをコードで管理するIaC(Infrastructure as Code)のひとつ。
言語はTypeScript, JavaScript, Python, Java, C#が対応してるらしいです。(今のところTypeScriptが主流?JavaScript, Java, C#で書いてる人見たことない)
ちょっと気になってたので使ってみた。
CDKプロジェクト作るンゴ
プロジェクトルートで下記を実行
CDKプロジェクトができます
僕がTypeScript書けない芸人なので、今回はPythonで書きます。
$ cdk init --language python
コードご開帳
cdk/test_stack.py
from aws_cdk import (
core,
aws_sqs as sqs,
aws_ecs as ecs,
aws_ecs_patterns as ecs_patterns,
aws_ec2 as ec2,
aws_iam as iam,
aws_logs as log,
aws_kms as kms,
aws_autoscaling as autoscaling,
)
class TestStack(core.Stack):
def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
# VPCを指定する
vpc = ec2.Vpc.from_lookup(self, 'vpcId', vpc_id="vpc-XXXXXXX")
# Security Groupの作成
security_group = ec2.SecurityGroup(self, "SecurityGroup",
vpc=vpc,
security_group_name="SecurityGroup",
description="test",
allow_all_outbound=True,
)
security_group.add_ingress_rule(
ec2.Peer.any_ipv4(),
ec2.Port.tcp(80),
"allow full accesss to http",
)
# カスタムKMSを取得する
my_key = kms.Key.from_key_arn(self, "Key", key_arn="arn:aws:kms:ap-northeast-1:XXXXXXXXXXXX:key/...")
# ECS IAMロールを取得する
ecs_role = iam.Role.from_role_arn(self, "Role", role_arn="arn:aws:iam::XXXXXXXXXXXX:role/ecsTaskExecutionRole",
mutable=False,
)
# ECSクラスターを作成する
cluster = ecs.Cluster(self, "Cluster",
vpc=vpc,
cluster_name="ClusterName",
)
# Fargateタスク定義を生成
task_definiton = aws_ecs.FargateTaskDefinition(self, "TaskDefinition",
family="TaskDefinition",
memory_limit_mib=512, # メモリ
cpu=256, # vCPU
task_role=ecs_role, # 実行ロール
)
# ロググループを指定する
logger = log.LogGroup.from_log_group_name(self, "LogGroup", log_group_name="LogGroup")
# コンテナ定義を追加
repository = task_definiton.add_container(
"container_name", # コンテナ名
image=ecs.ContainerImage.from_ecr_repository(repository), # ECRイメージの取得
logging=ecs.LogDrivers.aws_logs( # CloudWatchログの設定
log_group=logger,
stream_prefix="fargate"
),
)
# ポートマッピング 80 -> 80
repository.add_port_mappings(ecs.PortMapping(container_port=80,host_port=80))
# ALB + ロードバランサーの設定
alb_service = ecs_patterns.ApplicationLoadBalancedFargateService(
self,
"FargateService",
service_name="FargateService",
cluster=cluster,
task_definition=task_definiton,
cpu=256,
desired_count=1, # 希望するタスク数
memory_limit_mib=512,
public_load_balancer=True,
security_groups=[security_group],
)
# キューの生成
queue = sqs.Queue(self, "Queue",
queue_name="Queue.fifo",
fifo=True, # FIFO
encryption=sqs.QueueEncryption.KMS, # KMSによる暗号化
encryption_master_key=my_key, # 暗号化キー
content_based_deduplication=True # 重複コンテンツの排除
)
# 1分間の平均値に対してメトリクスを設定
metric = queue.metric_approximate_number_of_messages_visible(
period=core.Duration.minutes(1),
statistic="Average"
)
# オートスケールの最小、最大の台数を指定
scaling = alb_service.service.auto_scale_task_count(
min_capacity=1,
max_capacity=6
)
# メトリクスに対してスケーリングを設定
scaling_policy = scaling.scale_on_metric(
"ScaleOnSqs",
adjustment_type=autoscaling.AdjustmentType.CHANGE_IN_CAPACITY,
metric=metric,
scaling_steps=[ # キュー量に対するスケーリング台数の設定
{ 'upper': 100, 'change': +1 }, # 100を超えるとmin_capacity+1
{ 'upper': 200, 'change': +2 }, # 200を超えるとmin_capacity+2
{ 'upper': 300, 'change': +3 }, # 300を超えるとmin_capacity+3
{ 'upper': 400, 'change': +4 }, # 400を超えるとmin_capacity+4
{ 'upper': 500, 'change': +5 }, # 500を超えるとmin_capacity+5
]
)
app.py
#!/usr/bin/env python3
import os
from aws_cdk import core
from cdk.test_stack import TestStack
app = core.App()
WorkerStack(app, "TestStack",
env={
'account': os.environ['CDK_DEFAULT_ACCOUNT'],
'region': os.environ['CDK_DEFAULT_REGION'],
},
)
app.synth()
CFnテンプレートの確認
$ cdk synth
デプロイ
$ cdk deploy
CDK便利ですね。
手動でポチポチしてインフラ作るのめんどくさくなってきました。