Java(Jakarta EE)によるドメイン駆動設計(DDD)のサンプルアプリケーション Eclipse Cargo Tracker を題材にして、自分なりにマイクロサービス化を考えてみました。
今回は Eclipse Cargo Tracker をマイクロサービスへどう分割するかという観点よりも、自分ならマイクロサービスとしてどう作るかという観点を重視しました。
はじめに
Eclipse Cargo Tracker のソースや下記ドキュメントを参考にすると、Cargo
を中心とした概ね次のようなモデルになっていると思います。
Cargo がいわゆるドメイン駆動設計における aggregate root(集約のルート)となっています。
Cargo : 貨物
RouteSpecification : 経路仕様
Delivery : 配送
Itinerary : 輸送日程
Leg : 運送行程
Voyage : 航海
今回はこのモデルの範囲でマイクロサービス化を考えていく事にします。
Eclipse Cargo Tracker の性質上、ある程度は作為的なモデルになっているのは仕方ないのかなとも思いますが、個人的には以下の点が気になりました。
-
Delivery
へ責務が集中している(ように見える) - (O/Rマッパーによる)永続化を意識し過ぎている(ような気がする)
マイクロサービスの選定
まずは、最低限必要となりそうなマイクロサービスを考えてみます。
マイクロサービス化の対象として、通常は下記のような(レイヤーの)ものが候補として考えられそうです。
- ドメイン駆動設計におけるアプリケーションサービス
- クリーンアーキテクチャにおけるアプリケーションビジネスルール(ユースケース)
Eclipse Cargo Tracker の場合だと、org.eclipse.cargotracker.application
パッケージに属する次のようなサービスが該当すると思います。
- BookingService
- CargoInspectionService
- HandlingEventService
BookingService はそのままマイクロサービス化できそうですが、他は微妙な気がします。
ここで、自分なりに Eclipse Cargo Tracker の本質を考えてみたところ、貨物の配送計画と実績(状況)の管理
との結論に至りました。
であれば、計画と実績を管理するマイクロサービスは必須となりそうです。
更に、計画はあくまで計画であって実際はどうなるか分からないのだから、計画と実績を切り離して考えるのはどうだろうって事で、次のようなマイクロサービスを考えてみました。
マイクロサービス名 | 役割 | 備考 |
---|---|---|
Cargo サービス | 配送計画の管理 | Eclipse Cargo Tracker の BookingService に相当する |
Delivery サービス | 配送実績(状況)の管理 | Eclipse Cargo Tracker の Delivery とイベント周りの処理に相当する |
なお、これらのサービスはお互いに依存しないよう切り離して考える事にします。
マイクロサービスの作成
次に、この 2つのマイクロサービスをどのように作っていくかを考えます。
上記モデルの RoutingStatus
と TransportStatus
という 2つの状態に着目してみると、それぞれを計画と実績の状態遷移と考える事ができそうでした。
そこで、今回は次のようなアプローチを試してみる事にしました。
- ステートマシンで表現した状態遷移をベースにマイクロサービス化する
とりあえず、マイクロサービスは次のように実装する事に決めました。
- (a) ステートマシンをイミュータブルなデータ型と副作用のない関数で実現
- (b) GraphQL で (a) の処理を API 化し、MongoDB へ永続化
(a) を コマンド -> 現在の状態 -> 結果<(新しい状態, 発生イベント)>
のような関数で表現し、この関数で作成された状態とイベントを MongoDB へ保存します。
ドメイン駆動設計はオブジェクト指向(特に Java の影響が強め)にどっぷり浸かった考え方のように(個人的に)思うので、今回は関数言語的な観点を適用し Rust で実装しています。
ちなみに、本来ならば State モナド(厳密には StateT)を使いたかったのですが、現時点の Rust で有望そうなライブラリが見当たらなかったので諦めました。(一応、Scala と State モナドによる実装版が こちら です1)
Cargo サービス
貨物の配送計画に関する状態遷移にのみ注目し、Cargo サービスを実装します。
Eclipse Cargo Tracker の RoutingStatus
と BookingService
等を参考に、次のようなステートマシンを考えました。
配送計画のステートマシン
これらの状態を遷移させるアクション(コマンド)は次のようになります。
- 作成(Create)
- 輸送日程の設定(AssignRoute)
- 到着地の変更(ChangeDestination)
- 期限の変更(ChangeDeadline)
- 終了(Close)
RouteSpec(経路仕様)と設定された Itinerary(輸送日程)が合致していれば Routed
、何らかの不整合があれば Misrouted
状態とします。
そのため、Routed と Misrouted 間は AssignRoute・ChangeDestination・ChangeDeadline の内容に応じて変化する事になります。
Location は単に文字列のコード UnLocode
、Voyage は単に文字列の番号 VoyageNo
として扱い、このステートマシンを Rust で実装すると次のようになりました。
型名は概ね Eclipse Cargo Tracker に合わせています。(多少は変えたりしてますが)
cargo/src/cargo.rs (一部抜粋)
pub type TrackingId = String;
pub type UnLocode = String;
pub type VoyageNo = String;
pub type Date = DateTime<Local>;
#[derive(Default, Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct RouteSpec {
pub origin: UnLocode,
pub destination: UnLocode,
pub deadline: Date,
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct Itinerary(pub Vec<Leg>);
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct LocationTime {
pub location: UnLocode,
pub time: Date,
}
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize)]
pub struct Leg {
pub voyage_no: VoyageNo,
pub load: LocationTime,
pub unload: LocationTime,
}
// 状態
#[derive(Debug, Clone, Deserialize, Serialize)]
pub enum Cargo {
Nothing,
Unrouted { tracking_id: TrackingId, route_spec: RouteSpec },
Routed { tracking_id: TrackingId, route_spec: RouteSpec, itinerary: Itinerary },
Misrouted { tracking_id: TrackingId, route_spec: RouteSpec, itinerary: Itinerary },
Closed { tracking_id: TrackingId, route_spec: RouteSpec, itinerary: Itinerary },
}
// イベント
#[derive(Debug, Clone, Deserialize, Serialize)]
pub enum Event {
Created { tracking_id: TrackingId, route_spec: RouteSpec },
AssignedRoute { tracking_id: TrackingId, itinerary: Itinerary },
ChangedDestination { tracking_id: TrackingId, destination: UnLocode },
ChangedDeadline { tracking_id: TrackingId, deadline: Date },
Closed { tracking_id: TrackingId },
}
// コマンド
#[derive(Debug, Clone)]
pub enum Command {
Create(TrackingId, RouteSpec),
AssignRoute(Itinerary),
ChangeDestination(UnLocode),
ChangeDeadline(Date),
Close,
}
(省略)
pub type CargoResult<T> = std::result::Result<T, CommandError>;
impl Cargo {
// アクションの実行(コマンドの適用)
pub fn action(&self, cmd: &Command) -> CargoResult<(Self, Event)> {
match self {
Self::Nothing => match cmd {
Command::Create(t, r) =>
if r.deadline <= now() {
past_deadline()
} else {
Ok((
Self::Unrouted { tracking_id: t.clone(), route_spec: r.clone() },
Event::Created { tracking_id: t.clone(), route_spec: r.clone() }
))
}
_ => invalid_state()
}
Self::Unrouted { tracking_id, route_spec } => match cmd {
Command::AssignRoute(it) =>
if it.0.is_empty() {
empty_itinerary()
} else {
Ok((
create_routed(
tracking_id.clone(),
route_spec.clone(),
it.clone(),
),
Event::AssignedRoute {
tracking_id: tracking_id.clone(),
itinerary: it.clone(),
}
))
}
Command::ChangeDestination(l) =>
if l == &route_spec.destination {
no_change("destination")
} else {
Ok((
Self::Unrouted {
tracking_id: tracking_id.clone(),
route_spec: RouteSpec { destination: l.clone(), ..route_spec.clone() }
},
Event::ChangedDestination {
tracking_id: tracking_id.clone(),
destination: l.clone()
}
))
}
Command::ChangeDeadline(d) =>
if d == &route_spec.deadline {
no_change("deadline")
} else if d <= &now() {
past_deadline()
} else {
Ok((
Self::Unrouted {
tracking_id: tracking_id.clone(),
route_spec: RouteSpec { deadline: d.clone(), ..route_spec.clone() }
},
Event::ChangedDeadline {
tracking_id: tracking_id.clone(),
deadline: d.clone()
}
))
}
_ => invalid_state()
}
(省略)
Self::Closed { .. } => invalid_state()
}
}
(省略)
}
これを GraphQL で API 化します。
Eclipse Cargo Tracker の DefaultCargoInspectionService
内で判定している isMisdirected
や isUnloadedAtDestination
に相当する機能2も考慮し、次のような GraphQL スキーマ(API)にしてみました。
GraphQL スキーマ例
(省略: RouteSpec 等の型定義)
interface Cargo {
trackingId: ID!
routeSpec: RouteSpec!
}
interface Routing {
itinerary: Itinerary!
}
type UnroutedCargo implements Cargo {
trackingId: ID!
routeSpec: RouteSpec!
}
type RoutedCargo implements Cargo & Routing {
trackingId: ID!
routeSpec: RouteSpec!
itinerary: Itinerary!
}
(省略: MisroutedCargo と ClosedCargo の型定義)
type Query {
find(trackingId: ID!): Cargo
"""
指定の Location コードが到着地かどうかの判定(isUnloadedAtDestination のチェックに相当する)
"""
isDestination(trackingId: ID!, location: ID!): Boolean
"""
指定の Location コードと Voyage 番号が輸送日程に沿っているかどうかの判定(isMisdirected のチェックに相当する)
"""
isOnRoute(trackingId: ID!, location: ID!, voyageNo: ID): Boolean
}
type Mutation {
create(origin: ID!, destination: ID!, deadline: Date!): Cargo
assignToRoute(trackingId: ID!, legs: [LegInput!]!): Cargo
close(trackingId: ID!): Cargo
changeDestination(trackingId: ID!, destination: ID!): Cargo
changeDeadline(trackingId: ID!, deadline: Date!): Cargo
}
それぞれの状態毎に型を定義し(例. UnroutedCargo
)、状態毎の内容の差異は GraphQL のインターフェースを使って対応するようにしてみました。
これを axum と Juniper を使って Rust で実装したのが下記コードです。
MongoDB には最新の状態とイベントの履歴を保持するように保存しています。
永続化に関してはイミュータブルなデータを JSON/BSON と相互変換しているだけなので割とシンプルだと思いますが、GraphQL モデルとの変換部分はもう少しスマートにしたいところです。
なお、ここでは Location のコードや Voyage の番号の存在チェック等は実施せず、それらはこのサービスを利用する外部モジュール側(UI、別サービス等)の責務としています。
cargo/src/main.rs (一部抜粋)
type Revision = u32;
// MongoDB へ永続化する内容(イベント部分)
#[derive(Clone, Debug, Deserialize, Serialize)]
struct StoredEvent {
event: cargo::Event,
created_at: DateTime<Utc>
}
// MongoDB へ永続化する内容
#[derive(Clone, Debug, Deserialize, Serialize)]
struct StoredState {
_id: String,
rev: Revision,
state: cargo::Cargo,
events: Vec<StoredEvent>
}
(省略)
// MongoDB への保存処理(更新時)
async fn save_with_action(ctx: &Store, tracking_id: &str, cmd: &Command) -> FieldResult<Option<Cargo>> {
let r = ctx.0.find_one(doc! { "_id": tracking_id }, None).await?;
if let Some(s) = r {
let (c, e) = s.state.action(&cmd)?;
let flt = doc! { "$and": [ { "_id": tracking_id }, { "rev": s.rev } ] };
let upd = doc! {
"$inc": { "rev": 1 },
"$set": {
"state": to_bson(&c)?
},
"$push": {
"events": to_bson(&StoredEvent { event: e, created_at: Utc::now() })?
}
};
let opts = FindOneAndUpdateOptions::builder()
.return_document(ReturnDocument::After)
.build();
let r = ctx.0.find_one_and_update(flt, upd, Some(opts)).await?;
if let Some(s) = r {
Ok(Some(s.state))
} else {
Err(AppError::InvalidRevision(s.rev).into())
}
} else {
Ok(None)
}
}
struct Query;
// GraphQL の Query
#[juniper::graphql_object(context = Store)]
impl Query {
async fn find(ctx: &Store, tracking_id: ID) -> FieldResult<Option<CargoInterfaceValue>> {
let s = load_state(ctx, tracking_id.to_string()).await?;
Ok(s.map(|c| c.into()))
}
async fn is_destination(ctx: &Store, tracking_id: ID, location: ID) -> FieldResult<Option<bool>> {
let s = load_state(ctx, tracking_id.to_string()).await?;
Ok(s.map(|c| c.is_destination(location.to_string())))
}
async fn is_on_route(ctx: &Store, tracking_id: ID, location: ID, voyage_no: Option<ID>) -> FieldResult<Option<bool>> {
let s = load_state(ctx, tracking_id.to_string()).await?;
let v = voyage_no.map(|n| n.to_string());
Ok(s.and_then(|c| c.is_on_route(location.to_string(), v)))
}
}
struct Mutation;
// GraphQL の Mutation
#[juniper::graphql_object(context = Store)]
impl Mutation {
(省略)
async fn assign_to_route(ctx: &Store, tracking_id: ID, legs: Vec<LegInput>) -> FieldResult<Option<CargoInterfaceValue>> {
let cmd = Command::AssignRoute(
Itinerary(legs.iter().map(|l| l.into()).collect())
);
let r = save_with_action(ctx, tracking_id.to_string().as_str(), &cmd).await?;
Ok(r.map(|c| c.into()))
}
(省略)
}
type Schema = RootNode<'static, Query, Mutation, EmptySubscription<Store>>;
type Context = Arc<(Store, Schema)>;
#[tokio::main]
async fn main() {
(省略)
let schema = Schema::new(Query, Mutation, EmptySubscription::new());
let opt = ClientOptions::parse(mongo_endpoint).await.unwrap();
let mongo = Client::with_options(opt).unwrap();
let ctx: Context = Arc::new((
Store(mongo.database(&db_name).collection(&col_name)),
schema
));
let app = Router::new()
.route("/", post(graphql_handler))
.with_state(ctx);
println!("server start: {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}
async fn graphql_handler(State(ctx): State<Context>, Json(input): Json<GraphQLRequest>) -> Json<GraphQLResponse> {
let res = input.execute(&ctx.1, &ctx.0).await;
res.into()
}
一応、これで完成です。
動作内容
例えば、次のような JSON を POST します。
リクエスト例
{
"query": "mutation { create(origin: \"AAA\", destination: \"CCC\", deadline: \"2023-01-30T13:00:00Z\") { __typename trackingId routeSpec { origin destination deadline } } }"
}
すると、このようなレスポンスが返ってきます。
レスポンス例
{"data":{"create":{"__typename":"UnroutedCargo","trackingId":"68376df5-24e5-44ac-957b-79b04d8990df","routeSpec":{"origin":"AAA","destination":"CCC","deadline":"2023-01-30T13:00:00Z"}}}}
create と assignToRoute を実施した後の MongoDB の保存内容はこのようになりました。
MongoDB の保存内容例
{
"_id": "68376df5-24e5-44ac-957b-79b04d8990df",
"rev": {
"$numberLong": "2"
},
"state": {
"Routed": {
"tracking_id": "68376df5-24e5-44ac-957b-79b04d8990df",
"route_spec": {
"origin": "AAA",
"destination": "CCC",
"deadline": "2023-01-30T22:00:00+09:00"
},
"itinerary": [
{
"voyage_no": "0100S",
"load": {
"location": "AAA",
"time": "2023-01-15T19:00:00+09:00"
},
"unload": {
"location": "BBB",
"time": "2023-01-20T20:00:00+09:00"
}
},
{
"voyage_no": "0200A",
"load": {
"location": "BBB",
"time": "2023-01-22T07:30:00+09:00"
},
"unload": {
"location": "CCC",
"time": "2023-01-27T05:00:00+09:00"
}
}
]
}
},
"events": [
{
"event": {
"Created": {
"tracking_id": "68376df5-24e5-44ac-957b-79b04d8990df",
"route_spec": {
"origin": "AAA",
"destination": "CCC",
"deadline": "2023-01-30T22:00:00+09:00"
}
}
},
"created_at": "2023-01-10T04:57:18.374646Z"
},
{
"event": {
"AssignedRoute": {
"tracking_id": "68376df5-24e5-44ac-957b-79b04d8990df",
"itinerary": [
{
"voyage_no": "0100S",
"load": {
"location": "AAA",
"time": "2023-01-15T19:00:00+09:00"
},
"unload": {
"location": "BBB",
"time": "2023-01-20T20:00:00+09:00"
}
},
{
"voyage_no": "0200A",
"load": {
"location": "BBB",
"time": "2023-01-22T07:30:00+09:00"
},
"unload": {
"location": "CCC",
"time": "2023-01-27T05:00:00+09:00"
}
}
]
}
},
"created_at": "2023-01-10T05:03:28.557040Z"
}
]
}
Delivery サービス
貨物の配送実績(状況)に関する状態遷移にのみ注目し、Delivery サービスを実装します。
Eclipse Cargo Tracker の TransportStatus
や HandlingEvent.Type
等を参考に、次のようなステートマシンを考えました。
配送実績のステートマシン
これらの状態を遷移させるアクション(コマンド)は次のようになります。
- 作成(Create)
- 貨物の受け取り(Receive)
- 荷積み(Load)
- 荷下ろし(Unload)
- 請求(Claim)
これを Rust で実装したのがこちらです。
delivery/src/delivery.rs (一部抜粋)
pub type TrackingId = String;
pub type UnLocode = String;
pub type VoyageNo = String;
pub type Date = DateTime<Local>;
// 状態
#[derive(Debug, Clone, Deserialize, Serialize)]
pub enum Delivery {
Nothing,
NotReceived { tracking_id: TrackingId },
InPort { tracking_id: TrackingId, location: UnLocode },
OnBoardCarrier { tracking_id: TrackingId, voyage_no: VoyageNo, location: UnLocode },
Claimed { tracking_id: TrackingId, location: UnLocode, claimed_time: Date },
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub enum Event {
Created { tracking_id: TrackingId },
Received { tracking_id: TrackingId, location: UnLocode, completion_time: Date },
Loaded { tracking_id: TrackingId, voyage_no: VoyageNo, location: UnLocode, completion_time: Date },
Unloaded { tracking_id: TrackingId, voyage_no: VoyageNo, location: UnLocode, completion_time: Date },
Claimed { tracking_id: TrackingId, completion_time: Date },
}
#[derive(Debug, Clone)]
pub enum Command {
Create(TrackingId),
Receive(UnLocode, Date),
Load(VoyageNo, Date),
Unload(UnLocode, Date),
Claim(Date),
}
(省略)
pub type DeliveryResult<T> = std::result::Result<T, CommandError>;
impl Delivery {
pub fn action(&self, cmd: &Command) -> DeliveryResult<(Self, Event)> {
match self {
Self::Nothing => match cmd {
Command::Create(t) => {
Ok((
Self::NotReceived { tracking_id: t.clone() },
Event::Created { tracking_id: t.clone() }
))
}
_ => invalid_state()
}
Self::NotReceived { tracking_id } => match cmd {
Command::Receive(l, d) => {
Ok((
Self::InPort { tracking_id: tracking_id.clone(), location: l.clone() },
Event::Received { tracking_id: tracking_id.clone(), location: l.clone(), completion_time: d.clone() }
))
}
_ => invalid_state()
}
Self::InPort { tracking_id, location } => match cmd {
Command::Load(v, d) => {
Ok((
Self::OnBoardCarrier { tracking_id: tracking_id.clone(), voyage_no: v.clone(), location: location.clone() },
Event::Loaded { tracking_id: tracking_id.clone(), voyage_no: v.clone(), location: location.clone(), completion_time: d.clone() }
))
}
Command::Claim(d) => {
Ok((
Self::Claimed { tracking_id: tracking_id.clone(), location: location.clone(), claimed_time: d.clone() },
Event::Claimed { tracking_id: tracking_id.clone(), completion_time: d.clone() }
))
}
_ => invalid_state()
}
Self::OnBoardCarrier { tracking_id, voyage_no, .. } => match cmd {
Command::Unload(l, d) => {
Ok((
Self::InPort { tracking_id: tracking_id.clone(), location: l.clone() },
Event::Unloaded { tracking_id: tracking_id.clone(), voyage_no: voyage_no.clone(), location: l.clone(), completion_time: d.clone() }
))
}
_ => invalid_state()
}
Self::Claimed { .. } => invalid_state()
}
}
}
更に、次のような GraphQL スキーマとなるように実装しました。(delivery/src/main.rs 参照)
GraphQL スキーマ例
scalar Date
interface Delivery {
trackingId: ID!
}
interface Location {
location: ID!
}
interface OnBoard {
voyageNo: ID!
}
interface Claim {
claimedTime: Date!
}
type NotReceivedDelivery implements Delivery {
trackingId: ID!
}
(省略)
type OnBoardCarrierDelivery implements Delivery & Location & OnBoard {
trackingId: ID!
location: ID!
voyageNo: ID!
}
(省略)
type Query {
find(trackingId: ID!): Delivery
}
type Mutation {
create(trackingId: ID!): Delivery
receive(trackingId: ID!, location: ID!, completionTime: Date!): Delivery
load(trackingId: ID!, voyageNo: ID!, completionTime: Date!): Delivery
unload(trackingId: ID!, location: ID!, completionTime: Date!): Delivery
claim(trackingId: ID!, completionTime: Date!): Delivery
}
これで 2つのマイクロサービスが完成しました。
最後に
この 2つのマイクロサービスはそのままにして、これらを補完する機能拡張を別モジュールの追加で行っていったところ、全体的にこのような構成になりました。(ソースコードは こちら)
マイクロサービスと watcher を Rust で、event_handler を Deno でそれぞれ実装し、メッセージブローカーとして NATS を使いました。
処理の流れは次のようになります。
- Cargo 等のマイクロサービスが MongoDB へ状態とイベントを保存
- MongoDB の変更ストリームによって watcher が変更を検知し、新たに登録されたイベントの内容を取り出し NATS へ publish
- NATS を subscribe している event_handler がイベントを処理し、必要に応じて Cargo サービスなどに問い合わせる(GraphQL API を呼び出す)
Cargo の Created
イベントに応じて Delivery を作成したり、Delivery の Unloaded
イベントに応じて目的地へ到着したかどうかをチェックしたりと、Cargo と Delivery サービスの間接的な連携を実現しています。
更に、NATS を subscribe するモジュールを追加し、Cargo と Delivery の最新の状態をマージした検索モデルを Elasticsearch 等へ登録して CQRS(コマンドクエリ責務分離)を実現するような事も考えられます。
一応、疎結合で柔軟性の高いコレオグラフィ3的な連携を実現できており、作りとしてはそう悪く無いかなと考えています。