はじめに
この記事はミロゴス Advent Calendar 2023 8日目の投稿です。
弊社ではデータをDynamoDBにストアすることが多く、そのデータを活用するにあたって、CSVファイルをS3に置くためのLambdaを実装、定期的に実行しています。
前回、この実装や実行をStep Functions(以下、SFn)を利用することで、Lambdaを使わずに出力する方法を投稿しました。
今回は前回の構成をベースにCDKを作成します。なお、Pythonでの実装となります。
全体構成
先に作成する構成をCDKにしたPythonのプログラムを記載します。長いので折りたたんでいます。
from enum import Enum
from aws_cdk import Duration
from aws_cdk import aws_cloudwatch as cw
from aws_cdk import aws_dynamodb as ddb
from aws_cdk import aws_iam as iam
from aws_cdk import aws_s3 as s3
from aws_cdk import aws_scheduler as scheduler
from aws_cdk.aws_stepfunctions import DefinitionBody, StateMachine
from constructs import Construct
from cdk_context import IContext
class ExportType(Enum):
FULL = "FULL_EXPORT"
class ExportFormat(Enum):
DDB_JSON = "DYNAMODB_JSON"
ION = "ION"
class ExportConstruct(Construct):
def __init__(
self,
scope: Construct,
construct_id: str,
contexts: IContext,
table: ddb.Table,
bucket: s3.Bucket,
export_format: ExportFormat = ExportFormat.DDB_JSON,
export_type: ExportType = ExportType.FULL,
**kwargs,
) -> None:
"""DDBテーブルをS3エクスポート機能を使って、指定のバケットに出力する構成を構築するコンストラクト
Args:
scope (Construct): Parent of this stack, usually an ``App`` or a ``Stage``, but could be any construct.
construct_id (str): The construct ID of this stack.
contexts (IContext): 設定値用の基底クラス
table (ddb.Table): エクスポートするDDBテーブル
bucket (s3.Bucket): エクスポート先のバケット
export_format (ExportFormat, optional): エクスポートのフォーマット Defaults to ExportFormat.DDB_JSON.
export_type (ExportType, optional): エクスポートタイプ. Defaults to ExportType.FULL.
"""
super().__init__(scope, construct_id, **kwargs)
self.contexts = contexts
statemachine = StateMachine(
self,
"statemachine",
state_machine_name="export-statemachine",
definition_body=DefinitionBody.from_file("asl_full_export.json"),
definition_substitutions={
"TableArn": table.table_arn,
"BucketName": bucket.bucket_name,
"ExportFormat": export_format.value,
"ExportType": export_type.value,
},
tracing_enabled=True,
)
table.grant_read_data(statemachine)
table.grant(statemachine, "dynamodb:ExportTableToPointInTime")
statemachine.role.add_to_principal_policy(
statement=iam.PolicyStatement(
actions=["dynamodb:DescribeExport"],
resources=[f"{table.table_arn}/export/*"],
effect=iam.Effect.ALLOW,
),
)
bucket.grant_read_write(statemachine)
statemachine.role.add_to_principal_policy(
statement=iam.PolicyStatement(
actions=["s3:AbortMultipartUpload"],
resources=[bucket.arn_for_objects("*")],
effect=iam.Effect.ALLOW,
),
)
self.statemachine = statemachine
self.alarms: list[cw.Alarm] = self._create_alarm()
self.set_cron_expression(unique_id="origin", schedule_expression=contexts.trigger_schedule_cron)
def _create_alarm(self) -> list[cw.Alarm]:
failed_alarm = self.statemachine.metric_failed(period=Duration.minutes(1), statistic="sum").create_alarm(
self,
"statemachine-failed-alarm",
alarm_name="export-statemachine-failed-alarm",
threshold=1,
evaluation_periods=1,
)
throttle_alarm = self.statemachine.metric_throttled(period=Duration.minutes(1), statistic="sum").create_alarm(
self,
"statemachine-throttle-alarm",
alarm_name="export-statemachine-throttle-alarm",
threshold=1,
evaluation_periods=1,
)
timeout_alarm = self.statemachine.metric_timed_out(period=Duration.minutes(1), statistic="sum").create_alarm(
self,
"statemachine-timeout-alarm",
alarm_name="export-statemachine-timeout-alarm",
threshold=1,
evaluation_periods=1,
)
return [failed_alarm, throttle_alarm, timeout_alarm]
def set_cron_expression(self, unique_id: str, *, schedule_expression: str = "cron(0 0 1 1 ? 2024)") -> None:
"""スケジューラーを設定する
Args:
unique_id (str): 一意にするためのID
schedule_expression (str, optional): 実行時間. Defaults to "cron(0 0 1 1 ? 2024)".
"""
export_role = iam.Role(
self,
f"{unique_id}-role",
assumed_by=iam.ServicePrincipal("scheduler.amazonaws.com"),
inline_policies={
"execute-export-statemachine": iam.PolicyDocument(
statements=[
iam.PolicyStatement(
actions=["states:StartExecution"],
resources=[self.statemachine.state_machine_arn],
effect=iam.Effect.ALLOW,
),
],
),
},
)
scheduler.CfnSchedule(
self,
f"{unique_id}-scheduler",
name=f"export-{unique_id}-schedule",
flexible_time_window={"mode": "OFF"},
schedule_expression=schedule_expression,
target=scheduler.CfnSchedule.TargetProperty(
arn=self.statemachine.state_machine_arn,
role_arn=export_role.role_arn,
),
schedule_expression_timezone="Asia/Tokyo",
state="DISABLED",
)
各リソースについて
ExportConstructクラスについて
SFnを使ってDDBからS3に出力する構成は今後も何度か作りそうなので再利用性を上げるためクラス化しています。
独自にリソースを定義していく手もありますが、constructs.Constructを継承することでカスタムコンストラクトを作成する手もあります。
カスタムコンストラクトをプライベートパッケージリポジトリにPushしておけば、pip installで呼び出すこともできます。
なお、この記事ではパッケージリポジトリに関しては記載しません。(GitHub Packagesよ、PyPIをどうか... なお、Close済み)
ExportConstructクラスの初期化は下記のように宣言しています。
初期化時の引数には判断しやすいように、ExportTypeとExportFormatをEnumの形で定義しています。
from aws_cdk import aws_dynamodb as ddb
from aws_cdk import aws_s3 as s3
from export_construct import ExportConstruct, ExportType
ExportConstruct(
self,
"csv-exporter",
contexts=contexts,
table=ddb.Table(
self,
"table",
partition_key=ddb.Attribute(name="hoge", type=ddb.AttributeType.STRING),
billing_mode=ddb.BillingMode.PAY_PER_REQUEST,
point_in_time_recovery=True,
),
bucket=s3.Bucket(self, "bucket", bucket_name="hogeBucket"),
export_format=ExportFormat.DDB_JSON
export_type=ExportType.FULL,
)
IContextについて
ExportConstructクラスのcontexts引数にIContext型をタイプヒンティングに設定しています。
contextsは環境ごとの設定パラメータを持たせています。以前はcdk.jsonやcdk.context.jsonに持たせていました。
パラメータの持たせ方はAWSJのBLEA開発チームの方法を参考にしています。
https://speakerdeck.com/konokenj/blea-cdk-dev-practice-2023?slide=31
特にPythonは動的型付けなので環境パラメータの型を間違えて設定してしまい、エラーになることが時々ありました。環境パラメータをクラス変数とし、タイプヒンティングを使うことで以前のようなミスは減りました。
親クラスのIContextでパラメータを宣言し、環境別の子クラスに実際のパラメータを持たせています。
from dataclasses import dataclass, field
@dataclass
class IContext:
trigger_schedule_cron: str
@dataclass
class DevContext(IContext):
trigger_schedule_cron: str = "cron(0 7 1 1 ? 2024)"
@dataclass
class PrdContext(IContext):
trigger_schedule_cron: str = "cron(0 7 1 1 ? *)"
Step Functionsの定義
下記でSFnを定義しています。ASLのファイル(asl_full_export.json)からStatemachineを定義し、definition_substitutionsにCDKで作成するリソースなどの情報を渡しています。
statemachine = StateMachine(
self,
"statemachine",
state_machine_name="export-statemachine",
definition_body=DefinitionBody.from_file("asl_full_export.json"),
definition_substitutions={
"TableArn": table.table_arn,
"BucketName": bucket.bucket_name,
"ExportFormat": export_format.value,
"ExportType": export_type.value,
},
tracing_enabled=True,
)
asl_full_export.jsonは前回のASLと同じものですが、下記のParametersの箇所のみ変更し、CDKから値を受け取れるようにしています。
"ExportTableToPointInTime": {
"Parameters": {
"S3Bucket": "${BucketName}",
"TableArn": "${TableArn}",
"ExportFormat": "${ExportFormat}",
"ExportType": "${ExportType}"
},
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:exportTableToPointInTime",
"Next": "Wait Export Task"
},
5日目の記事でStatemachineを定義するのにあたって、SFn Workflow Studioを使った簡単なワークフローの作成方法を記事にしてくれています。
参考にどうぞ。
IAM Role
このカスタムコンストラクトの構成を動作させるためのIAM Roleを定義しています。
前記事ではSFnにRoleを設定したり、CLIを叩くため実行ユーザのRoleなどに権限が分散していましたが、カスタムコンストラクト化にあたってクラス内で定義しています。
# DDBのS3エクスポートに対する、SFnへの権限付与
table.grant_read_data(statemachine)
table.grant(statemachine, "dynamodb:ExportTableToPointInTime")
statemachine.role.add_to_principal_policy(
statement=iam.PolicyStatement(
actions=["dynamodb:DescribeExport"],
resources=[f"{table.table_arn}/export/*"],
effect=iam.Effect.ALLOW,
),
)
# S3に対する、SFnへの権限付与
bucket.grant_read_write(statemachine)
statemachine.role.add_to_principal_policy(
statement=iam.PolicyStatement(
actions=["s3:AbortMultipartUpload"],
resources=[bucket.arn_for_objects("*")],
effect=iam.Effect.ALLOW,
),
)
アラーム
ExportConstructクラス内でSFnのアラームを作成します。そのアラームをクラス変数として宣言しています。インスタンス変数からアクセスできるようにすることで、関連するリソースの定義をExportConstructクラス外でも任意に定義できるようになり利便性が上がります。
self.alarms: list[cw.Alarm] = self._create_alarm()
def _create_alarm(self) -> list[cw.Alarm]:
failed_alarm = self.statemachine.metric_failed(period=Duration.minutes(1), statistic="sum").create_alarm(
self,
"statemachine-failed-alarm",
alarm_name="export-statemachine-failed-alarm",
threshold=1,
evaluation_periods=1,
)
...
return [failed_alarm, throttle_alarm, timeout_alarm]
ここではExportConstructのクラス変数alarmsをSNSのTopicに渡すことでアラート発報の定義をしています。
self.exporter = ExportConstruct(...)
for alarm in self.exporter.alarms:
alarm.add_alarm_action(
cw_actions.SnsAction(
topic = sns.Topic(self, "alarm-topic")
)
)
定期実行
定期的にデータを出力するため、EventBridge Scheduler(以下、EvB Scheduler)がSFnを起動します。
cdk 2.113.0(2023/12/05最新版)ではEvB SchedulerにL2コンストラクトがなく、Cfn*のL1コンストラクトしかありません。
https://docs.aws.amazon.com/cdk/api/v2/python/aws_cdk.aws_scheduler.html
また、カスタムコンストラクト実装時は2.102.0を使っていました。
aws_cdk.aws_scheduler_alphaを使うことで、ScheduleExpressionの書き方がcron式そのままではなく、それぞれの値を引数で渡せるためCron式を意識せずに設定できるメリットがあります。
しかし、aws_scheduler_alpha.targetの型はaws_cdk.aws_scheduler_targets_alphaの子クラスである必要がありました。
2.102.0時点のaws_cdk.aws_scheduler_targets_alphaはLambdaInvokeしかなかったため、ここではL1のCfnScheduleを使用しています。
def set_cron_expression(self, unique_id: str, *, schedule_expression: str = "cron(0 0 1 1 ? 2023)") -> None:
export_role = iam.Role(
self,
f"{unique_id}-role",
assumed_by=iam.ServicePrincipal("scheduler.amazonaws.com"),
inline_policies={
"execute-export-statemachine": iam.PolicyDocument(
statements=[
iam.PolicyStatement(
actions=["states:StartExecution"],
resources=[self.statemachine.state_machine_arn],
effect=iam.Effect.ALLOW,
),
],
),
},
)
scheduler.CfnSchedule(
self,
f"{unique_id}-scheduler",
name=f"{unique_id}-schedule",
flexible_time_window={"mode": "OFF"},
schedule_expression=schedule_expression,
target=scheduler.CfnSchedule.TargetProperty(
arn=self.statemachine.state_machine_arn,
role_arn=export_role.role_arn,
),
schedule_expression_timezone="Asia/Tokyo",
state="DISABLED",
)
なお、2.113.0ではLambdaInvoke以外にも、今回の要件で利用できたStepFunctionsStartExecutionを始め以下の8つのTargetが追加されています。
- CodeBuildStartBuild
- EventBridgePutEvents
- InspectorStartAssessmentRun
- LambdaInvoke
- ScheduleTargetBase
- SnsPublish
- SqsSendMessage
- StepFunctionsStartExecution
まとめ
DDBからS3へ定期的にデータを出力するタスクをSFnで管理する構成をCDKで作成しました。
また、CDKでの定義にあたって再利用性を上げるために、カスタムコンストラクトとして実装しました。その実装の一例をご紹介しました。
LambdaでS3に出力していた時から考えると、実行するプログラムを書かずに済み、CDKでクラスを再利用するだけで構成を複製することができ、かなり便利になりました。
今年のre:Invent期間前にもSFnの新機能が色々と追加されており、どんどんLambdaの実装量が減っていきますね。