はじめに
エアークローゼットのエンジニアアインです。
この記事はエアークローゼット Advent Calendar 2023 24日目の記事になります。
各マイクロサービス連携する
マイクロサービス内に各サービス連携するのはよくあります。以下の方法を通じて連携できる:
- API calling: HTTPまたはRPCで実施する方法。
- イベント駆動: イベント発火で各サービス連携する方法。
本記事は「イベント駆動」方法を中心として書かせていただきます。
SAGAパターン
各サービスが独自のデータベースを持っているため、service-Aがservice-Bを呼ぶと、異なるデータベース上でトランザクションを実行する必要があります。注意すべきは、異なるデータベース上でACIDトランザクションを実行できません。
SAGAで複数データベース上でトランザクション実行できます。
具体的には
それぞれのローカルトランザクションは、それが属するデータベースを更新し、同時にメッセージを発行するかイベントを発火として、ビジネスロジックの次のサービスのローカルトランザクションを開始します。
2つ方法でSAGAが実施できます。
- Choreography: 各ローカルトランザクションはdomain-eventを発火し、このイベントは他のサービスでのローカルトランザクションをトリガーします(これらのサービスは自分自身で処理が必要かどうかを判断します)。
- Orchestration: 各「Orchestration」は、特定のローカルトランザクションが実行されるように指定します。
Choreography-based SAGA
例としては「ECサイト」に2つドメイン Order, Customerがあり
- Order: ユーザが注文するドメイン知識。
- Customer: ユーザ情報または注文以外挙動に関係あるドメイン知識。
Choreography-based SAGA実施するフローは↓の画像と同じです。
- Orderサービスは、
POST /orders
のリクエストを受け取り、その後、PENDINGの状態で注文を作成します。 -
Order Created
イベントを発火します。 - Customerサービスはこのイベントを受信し、
Credit Reserved
処理を実施します。 - その後、Customerサービスは
Credit Reserved
イベントを発火します。 - Orderサービスは
Credit Reserved
イベントを受信し、その後、注文の状態をCOMPLETEまたはREJECTに更新します。
Orchestration-based SAGA
- Orderサービスは
POST /orders
のリクエストを受け取り、Create OrderのSaga Orchestration
を作成します。 -
Saga orchestrator
は、PENDINGの状態でOrderを作成します。 - その後、Reserve CreditのコマンドをCustomerサービスに送信します。
- Customerサービスはユーザの注文限度額を予約します。
- その後、Customerサービスはメッセージを返し、Orderサービスの側でどのSagaが実行されるかを特定します。
-
Saga orchestrator
は注文を承認または拒否します。
Trigger Event
上記の2つの定義を通して、Sagaパターンの実装方法がどのようであれ、ここで常に強調されている要素があることがわかります。それは「イベント」です。
イベントが発火されると、すべてのものの「源泉」となります:
- 他のサービスでビジネスロジックを強制的に実行する行為
- 特定のビジネスロジックを別のサービスに「発信して」、そのサービスが自らそれを実行する行為。
連携方法設計
主題に戻ります。ECS Run Taskを使用してマイクロサービス連携を実施した方法はどのようなものでしょうか?
ざっくりに言うとECSとは
コンテナ化されたアプリケーションをデプロイ、管理、およびスケールすることができるorchestration-serviceです。
ECSアプリのライフサイクルは
- ECRは、アプリケーションに対応するDockerイメージを保存します。
- Task Definitionはアプリケーションの
blue-print
であり(アプリにとって必須なパラメータを定義するJSONファイル)。
ECSを使用すると、サーバーまたはアプリケーションを構築するだけでなく、特定の「タスク」も実行できます。
それはサーバーと異なり、サーバーは「永遠に」実行され、エンジニアからの影響がある場合にのみ終了されるという点で異なります。
「タスク」は、自分の「任務」が完了すると自動的に終了します。これは、常にバッチや特定のビジネスプロセスを「待機」させる代わりに、運用コストを大幅に削減できることを示しています。
使用するアーキテクチャ
説明するとマイクロサービスの各サービスに対して、それぞれ専用のAWSイベントバス(aws-event-bus)を作成しています。
aws-event-bus
はAWSのサービスで、クライアントが以下のツール
- aws-cli
- aws-sdk
を利用して、aws-event-busが受け取られるイベントを発火できます。
各イベントはそれぞれ独自の「特性」を持っており、この特性はAWSによって「ルール」と見なされ、それに伴う概念としてevent-bus
に関連するものが「event-rule」です。
各バスにはそれに関連付けられた1つまたは複数のルールがあります。サービスがイベントを発火する場合、サービスはこのイベントがどのルールに属するかを明示的に指定する必要があります。
以下は、aws-sdkを使用してイベントをevent-bus
に発火する例のコードです。
import {EventBridge} from "@aws-sdk/client-eventbridge";
const client = new EventBridge({region: "ap-southeast-1"});
client.putEvents({
EventBusName: "Service-A-Bus",
DetailType: "Service-A-Rule-1", // event-rule定義
Detail: {
// 添付するデータ
},
});
Implement
概要として、システム内の各マイクロサービスを連携する方法は、上記のように「イベントの発火」を通じて行われます。
しかし、これだけの一般的な概要では、動作メカニズムを具体的に理解するのが難しいと思って、このセクションでは、コーディングの詳細に深く入りたいと考えます。
event-bus && event-rules
ここでは、event-bus
とevent-rule
を実施するためにaws-cdkを使用しています(簡単に言うと、これはAWSが提供するInfrastructure As Codeのツールの一つです)。
- 1 event-bus - n event-rules
- 1 stack - n event-buses (ここでの「stack」は、aws-cdkによって定義されたリソースをデプロイするためのユニットです)
となっております。
ここでのstackの使用は、リソースをユニットごとにまとめるのに役立ちます。これにより、リソースの管理がより明確で「整然」となり、リソースを「フラット」な構造で管理するよりも効果的になります。
stackを利用すれば
stack1
→ resource[1-1]
→ resource[1-2]
stack2
→ resource[2-1]
→ resource[2-2]
フラット構築で
resource[1]
resource[2]
...
resource[n]
ぱってみるとすぐ「整然」感じをわかると思います。
stackに対して
import {Stack} from "aws-cdk-lib";
import {Construct} from "constructs";
class TestStack extends Stack {
constructor(scope: Construct, id: string) {
super(scope, id, {
env: {
account: "aws-account-id",
region: "ap-southeast-1",
},
});
const eventBusB = new EventBusB(this, "EventBusB");
new EventRuleB1(this, "EventRuleB1", eventBusB.eventBus);
}
}
event-busに対して
import {EventBus} from "aws-cdk-lib/aws-events";
import {Construct} from "constructs";
class EventBusB extends Construct {
public readonly eventBus: EventBus;
constructor(scope: Construct, id: string) {
super(scope, id); // ここのscopeはevent-busが属するstackとなる
this.eventBus = new EventBus(this, "EventBusB", {
eventBusName: "EventBusB",
});
}
}
event-ruleに対して
import {EventBus, Rule} from "aws-cdk-lib/aws-events";
import {StateMachine} from "aws-cdk-lib/aws-stepfunctions";
import * as tasks from "aws-cdk-lib/aws-stepfunctions-tasks";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as targets from "aws-cdk-lib/aws-events-targets";
import {Construct} from "constructs";
class EventRuleB1 extends Construct {
constructor(scope: Construct, id: string, eventBus: EventBus) {
super(scope, id);
const ruleB1 = new Rule(this, "ruleB1", {
ruleName: "ruleB1",
description: "ruleB1 description",
eventBus, // event-ruleに紐づくevent-bus
eventPattern: {
detailType: ["RuleB1 Detail Type"], // event-ruleの特徴で、発火するイベントはこのdetailTypeをちゃんと指定するが必要
},
});
// イベント受け取り際に実施される処理を持ってるstate-machine定義する
const ruleB1StateMachine = new EventRuleB1StateMachine(
this,
"EventRuleB1StateMachine"
);
// event-ruleをstate-machineに紐づく
ruleB1.addTarget(
new targets.SfnStateMachine(ruleB1StateMachine.stateMachine, {
deadLetterQueue: null, // ここでは一旦「deadLetterQueue = null」で設定する、 deadLetterQueueは失敗した処理をリトライすると失敗結果を受け取るqueueです。
})
);
}
}
class EventRuleB1StateMachine extends Construct {
public readonly stateMachine: StateMachine;
constructor(scope: Construct, id: string) {
super(scope, id);
// lambda関数定義
// lambda関数がイベント受け取る際に実施するコアロジック持ってる
const checkStatusFn = new lambda.Function(this, "checkStatusFn", {
code: new lambda.InlineCode(
fs.readFileSync("lib/lambdas/check_status.py", {encoding: "utf-8"})
),
handler: "index.main",
timeout: cdk.Duration.seconds(30),
runtime: lambda.Runtime.PYTHON_3_9,
});
// 上記のlambda関数を呼び出すstep-function定義する
const stateMachineJob = new tasks.LambdaInvoke(
this,
"EventRuleB1StateMachineJob",
{
lambdaFunction: checkStatusFn,
invocationType: tasks.LambdaInvocationType.EVENT,
}
);
// state-machineにstep-functionを紐づく
this.stateMachine = new StateMachine(this, "EventRuleB1StateMachine", {
definition: stateMachineJob,
stateMachineName: "EventRuleB1StateMachine",
});
}
}
ここではまた別の2つ概念がある
- state-machine
- step-function
state-machineは、その中に多くの「state」を含む「work-flow」です。
StateMachine:
state-1 → state-2 → state-3 → state-4 → ... → state-n
step-functionは、state-machine内の各stateを実施するためのツールです。
event-rule, state-machine, step-function間の関連性については↓の画像を見てください。
step-functionはstate-machineを実施します。
state-machineはevent-ruleに関連付けられ、イベントが受信されるとトリガーされる処理として機能します。
これはかなり複雑な部分であり、上記のevent-rule定義
コード例をより注意深く読んで問題を理解していただけると嬉しいです。
イベント発火 (Emit Event)
これの本質はサービスがイベント生成と発火することになります。
aws-sdkで実施できます。
import {EventBridge} from "@aws-sdk/client-eventbridge";
const client = new EventBridge({region: "ap-southeast-1"});
client.putEvents({
EventBusName: "Service-A-Bus",
DetailType: "Service-A-Rule-1", // event-rule定義
Detail: {
// 添付するデータ
},
});
各サービスが独自のevent-busを持つというcase-studyがあります。これは非常に妥当なことであり、理由は
サービスはイベントがどこに生成されるかに関して心配する必要はありません。イベントを輸送する作業は、外部のライブラリが決定し、サービスは単にビジネスロジックを正しく実行することを保証するだけで十分です。
タスク実行
これのセクションは本記事の「コア部分」となって、前回のevent-ruleの定義に続き、event-ruleがstate-machineに関連付けられ、state-machine自体がイベントを受信したときに実行される処理であることがわかります。
_画像 5_を見てみると、他のサービスのタスクを実行することはevent-ruleが担当することがわかります。
したがって、次の結論が導き出されます:
タスクを実行する作業はstate-machineが担当します。
実際のコーディングは
class EventRuleB1StateMachine extends Construct {
public readonly stateMachine: StateMachine;
constructor(scope: Construct, id: string) {
super(scope, id);
// 別のサービスのタスクを実行するstep-function定義
const stateMachineJob = new ecsTaskRun.setupStateMachineDefinition({
scope: this,
task: {
command: ["node", "serviceATaskA1.js"],
name: "serviceATaskA1",
},
});
// state-machineにstep-functionを紐づく
this.stateMachine = new StateMachine(this, "EventRuleB1StateMachine", {
definition: stateMachineJob,
stateMachineName: "EventRuleB1StateMachine",
});
}
}
上記のコードをみるとめっちゃ簡単です。
const stateMachineJob = new ecsTaskRun.setupStateMachineDefinition({
scope: this,
task: {
command: ["node", "serviceATaskA1.js"],
name: "serviceATaskA1",
},
});
7行のみあるんですが、setupStateMachineDefinition
関数には最も複雑な部分が含まれています。
もっと具体的には、次のセクションに進んでください。
local環境
2つ環境 (local, aws-cloud)に分けた理由は、aws-cloud環境はすでにTerraformで設定されているためです。詳細はこの記事を参照してください。
local環境は以下のモデルに基づいてaws-cdkで実施されました。
前のセクションで述べたように、イベントがevent-ruleに渡されると、state-machine内の処理がトリガーされます(上記の_画像 8_を参照してください、ここから2番目のステップ以降の処理が動作します)。
このステップ2では、タスク定義(これはecs-taskを起動するためのblue-printである)を作成します。
このタスク定義は、対向のサービスのソースコード(ここではサービスBと呼びます)にマウントされます。このマウントは必要です。なぜなら、将来的にサービスBのECSタスクが起動されると、それはDockerコンテナとして実行されるため、サービスBのソースコードをマウントする必要があるからです。
タスク定義のマウントプロセスが完了すると、タスク定義は完全に定義された状態になります。この時点でecs-taskを「起動」します(ステップ4)。
ecs-taskが起動されると、サービスBの側である特定のユースケースをトリガーするために、コマンドを適用します、つまりここではecs-taskに対してコマンドを実行するだけです。
実施するコードは
import {
Vpc,
IpAddresses,
SubnetType,
CfnRouteTable,
CfnSubnetRouteTableAssociation,
InstanceType,
PlacementStrategy,
} from "aws-cdk-lib/aws-ec2";
import {Compatibility} from "aws-cdk-lib/aws-ecs";
import {EcsRunTask} from "aws-cdk-lib/aws-stepfunctions-tasks";
// 共通なvpc定義
const vpc = new Vpc(scope, "vpc-id", {
ipAddresses: IpAddresses.cidr("10.0.0.0/16"),
subnetConfiguration: [
{
cidrMask: 24,
name: "ingress",
subnetType: SubnetType.PUBLIC, // local環境から、public-subnetを使う
},
],
});
// public-subnetのroute-table定義
const publicRouteTable = new CfnRouteTable(scope, "public-route-table-id", {
vpcId: vpc.vpcId,
});
// 共通なecs-cluster定義
const ecsCluster = new Cluster(scope, "ecs-cluster", {
vpc: vpc,
});
ecsCluster.addCapacity("ecs-cluster-autoScaling-group", {
instanceType: new InstanceType("t2.micro"),
vpcSubnets: {subnetType: SubnetType.PUBLIC},
});
// task-definition定義
const taskDefinition = new TaskDefinition(scope, "task-definition", {
compatibility: Compatibility.EC2,
});
// 向こうサービスのソースコードをマウントするステップ
// task-definitionのassetとして認められる
const ecsContainer = task.definition.addContainer("ecs_id", {
image: ContainerImage.fromAsset("./serviceB_source_path", {
file: "./service_B_dockerfile_path",
}),
memoryLimitMiB: 512,
cpu: 128,
command: "node usecase_b1.js",
containerName: "serviceB_container",
});
// ecs-task起動してから実行する
new EcsRunTask(scope, "usecase_b1_task", {
integrationPattern: IntegrationPattern.RUN_JOB,
cluster: ecsCluster,
containerOverrides: [
{
containerDefinition: ecsContainer,
command: task.command,
environment: [
// 発火されるイベントに添付するデータをコマンドパラメーターとして向こうサービスに渡す
{
name: "eventDetail",
value: JsonPath.stringAt("$.detail"),
},
],
},
],
taskDefinition: task.definition,
launchTarget: new EcsEc2LaunchTarget({
placementStrategies: [
PlacementStrategy.spreadAcrossInstances(),
PlacementStrategy.packedByCpu(),
PlacementStrategy.randomly(),
],
}),
});
aws-cloud環境
aws-cloud環境では、local環境とはいくつかの違いがあります。その理由は、aws-cdkを使用して最初から完全なフローを構築する代わりに、aws-cloudでは事前に作成されたリソース(VPC、ECSクラスターなど)を使用しているからです。
ただし、local環境でもaws-cloudでも、引き続きタスク定義が必要です。もちろん、aws-cloud上で事前に定義されたタスク定義があります。
一般的に、誰もが考える自然な方法は、aws-cdkを使用して事前に定義されたタスク定義を取得し、次にlocal環境のようにnew EcsRunTask()
を実行するだけです。簡単に言うと、それは以下のようになります:
const importTaskdef = TaskDefinition.fromTaskDefinitionArn(
this,
"importedTaskdef",
"既に定義されてるtask-definitionのarn"
);
const step1State = new EcsRunTask(this, "Step1 RunEcsTask", {
cluster: clusterArn,
taskDefinition: importTaskdef,
});
ただし、ここでの問題はfromTaskDefinitionArn
がITaskDefinition
の型を返す一方で、EcsRunTask
がTaskDefinition
型を要求していることです。
そのため、ここではCustomState
を使用する必要があります。CustomStateの詳しくは https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_stepfunctions.CustomState.html を参考してください。
これは、state-machine内でステートを「再定義」する方法の一例です。つまり、以下のフローを再定義しています:
ASl (Amazon State Language)で実施できます。これは、state-machine内のステートを定義するためのJSONベースの言語です。詳しくは https://docs.aws.amazon.com/step-functions/latest/dg/concepts-amazon-states-language.html を参考してください。
CustomState活用するのは以下のコードまで見てください:
import {CustomState} from "aws-cdk-lib/aws-stepfunctions";
new CustomState(scope, "task", {
stateJson: {
Type: "Task",
Resource: "arn:aws:states:::ecs:runTask",
Parameters: {
LaunchType: "FARGATE",
Cluster: ecsCluster.clusterArn,
TaskDefinition: task.definitionArn,
NetworkConfiguration: {
AwsvpcConfiguration: {
Subnets: subnetIds,
SecurityGroups: securityGroupIds,
},
},
Overrides: {
ContainerOverrides: [
{
Name: "container-name",
Command: "node usecase_b1.js",
Memory: "512",
Cpu: "256",
Environment: [
{
Name: "eventDetail",
// detailデータをstring形式に変換する
"Value.$": "States.JsonToString($.detail)",
},
],
},
],
},
},
},
});
イベント発火してから各ユースケース実行される瞬間は↓のイメージになります。
画像 10
タスクは起動され、そしてタスクが完了すると自動的に終了されます。
まとめ
記事はかなり長く、難解ですが、この記事がマイクロサービスを展開しようとしている方々にとって有益な参考資料になることを願っています。特に、サービス間の連携や複数のサービスでトランザクションをどのように実施するかについて悩んでいる方々にとって役立つでしょう。
また次の記事でお会いしましょう。
また、エアークローゼットはエンジニア採用活動も行っておりますので、興味のある方はぜひご覧ください!