1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AWS CDKでSQSのメッセージ数に応じたFargateタスクのスケーリングを行う

Last updated at Posted at 2021-02-16

はじめに

定期的にAPIからデータを取得するシステムがあり、これを分散処理するにあたってSQSを利用しているが、
SQSのメッセージ数が多いほど処理するworkerを増やして、処理速度を上げたいということでいろいろ試行錯誤した結果です。
需要あるか知らないですが、一応共有です。

AWS CDKとは

AWS Cloud Development Kitのことで、インフラをコードで管理するIaC(Infrastructure as Code)のひとつ。
言語はTypeScript, JavaScript, Python, Java, C#が対応してるらしいです。(今のところTypeScriptが主流?JavaScript, Java, C#で書いてる人見たことない)
ちょっと気になってたので使ってみた。

CDKプロジェクト作るンゴ

プロジェクトルートで下記を実行
CDKプロジェクトができます
僕がTypeScript書けない芸人なので、今回はPythonで書きます。

$ cdk init --language python

コードご開帳

cdk/test_stack.py

from aws_cdk import (
    core,
    aws_sqs as sqs,
    aws_ecs as ecs,
    aws_ecs_patterns as ecs_patterns,
    aws_ec2 as ec2,
    aws_iam as iam,
    aws_logs as log,
    aws_kms as kms,
    aws_autoscaling as autoscaling,
)

class TestStack(core.Stack):

    def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
        super().__init__(scope, id, **kwargs)

        # VPCを指定する
        vpc = ec2.Vpc.from_lookup(self, 'vpcId', vpc_id="vpc-XXXXXXX")

        # Security Groupの作成
        security_group = ec2.SecurityGroup(self, "SecurityGroup",
            vpc=vpc,
            security_group_name="SecurityGroup",
            description="test",
            allow_all_outbound=True,
        )
        security_group.add_ingress_rule(
            ec2.Peer.any_ipv4(),
            ec2.Port.tcp(80),
            "allow full accesss to http",
        )

        # カスタムKMSを取得する
        my_key = kms.Key.from_key_arn(self, "Key", key_arn="arn:aws:kms:ap-northeast-1:XXXXXXXXXXXX:key/...")

        # ECS IAMロールを取得する
        ecs_role = iam.Role.from_role_arn(self, "Role", role_arn="arn:aws:iam::XXXXXXXXXXXX:role/ecsTaskExecutionRole",
            mutable=False,
        )

        # ECSクラスターを作成する
        cluster = ecs.Cluster(self, "Cluster",
            vpc=vpc,
            cluster_name="ClusterName",
        )

        # Fargateタスク定義を生成
        task_definiton = aws_ecs.FargateTaskDefinition(self, "TaskDefinition",
            family="TaskDefinition",
            memory_limit_mib=512, # メモリ
            cpu=256, # vCPU
            task_role=ecs_role, # 実行ロール
        )

        # ロググループを指定する
        logger = log.LogGroup.from_log_group_name(self, "LogGroup", log_group_name="LogGroup")

        # コンテナ定義を追加
        repository = task_definiton.add_container(
            "container_name", # コンテナ名
            image=ecs.ContainerImage.from_ecr_repository(repository), # ECRイメージの取得
            logging=ecs.LogDrivers.aws_logs( # CloudWatchログの設定
                log_group=logger,
                stream_prefix="fargate"
            ),
        )

        # ポートマッピング 80 -> 80
        repository.add_port_mappings(ecs.PortMapping(container_port=80,host_port=80))

        # ALB + ロードバランサーの設定
        alb_service = ecs_patterns.ApplicationLoadBalancedFargateService(
            self,
            "FargateService",
            service_name="FargateService",
            cluster=cluster,
            task_definition=task_definiton,
            cpu=256,
            desired_count=1, # 希望するタスク数
            memory_limit_mib=512,
            public_load_balancer=True,
            security_groups=[security_group],
        )

        # キューの生成
        queue = sqs.Queue(self, "Queue",
            queue_name="Queue.fifo",
            fifo=True, # FIFO
            encryption=sqs.QueueEncryption.KMS, # KMSによる暗号化
            encryption_master_key=my_key, # 暗号化キー
            content_based_deduplication=True # 重複コンテンツの排除
        )

        # 1分間の平均値に対してメトリクスを設定
        metric = queue.metric_approximate_number_of_messages_visible(
            period=core.Duration.minutes(1),
            statistic="Average"
        )

        # オートスケールの最小、最大の台数を指定
        scaling = alb_service.service.auto_scale_task_count(
            min_capacity=1,
            max_capacity=6
        )

        # メトリクスに対してスケーリングを設定
        scaling_policy = scaling.scale_on_metric( 
            "ScaleOnSqs",
            adjustment_type=autoscaling.AdjustmentType.CHANGE_IN_CAPACITY,
            metric=metric,
            scaling_steps=[ # キュー量に対するスケーリング台数の設定
                { 'upper': 100, 'change': +1 }, # 100を超えるとmin_capacity+1
                { 'upper': 200, 'change': +2 }, # 200を超えるとmin_capacity+2
                { 'upper': 300, 'change': +3 }, # 300を超えるとmin_capacity+3
                { 'upper': 400, 'change': +4 }, # 400を超えるとmin_capacity+4
                { 'upper': 500, 'change': +5 }, # 500を超えるとmin_capacity+5
            ]
        )
app.py
#!/usr/bin/env python3
import os
from aws_cdk import core

from cdk.test_stack import TestStack

app = core.App()
WorkerStack(app, "TestStack", 
    env={
        'account': os.environ['CDK_DEFAULT_ACCOUNT'], 
        'region': os.environ['CDK_DEFAULT_REGION'],
    },
)

app.synth()

CFnテンプレートの確認

$ cdk synth

デプロイ

$ cdk deploy

CDK便利ですね。
手動でポチポチしてインフラ作るのめんどくさくなってきました。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?