2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

EventSourcing With MongoDB

Last updated at Posted at 2023-12-15

はじめ

この記事はエアークローゼット Advent Calendar 2023 16日目の記事になります。

第6日のアドベントカレンダー記事で、EventSourcingの基本について触れました。
この記事では、EventSourcingにMongoDBをどのように使用しているかを紹介したいと思います。

なぜMongoDBを使用しているのか

MongoDBは、EventSourcingによく使用される理由がいくつかあります。

  • 柔軟性: MongoDBのスキーマレスの性質は、開発者が固定されたスキーマを事前に定義する必要なく、複雑なイベント構造を格納および取得できるようにします。この柔軟性は、さまざまなタイプや構造のイベントを格納するのに適しています。

  • 拡張性: MongoDBは水平方向にスケーリングするよう設計されており、大量のイベントデータを処理するのに適しています。イベントの数が増えると、MongoDBはデータを複数のノードに分散させて負荷を処理できます。

  • クエリ: MongoDBは強力なクエリ機能を提供し、格納されたイベントを効率的に分析およびクエリできます。

  • JSONサポート: MongoDBはJSONライクなドキュメント形式でデータを保存するため、イベントデータの構造と整合しています。このことにより、多くの開発者にとって馴染みのある形式でイベントデータを扱いやすくなります。

  • コミュニティサポート: MongoDBには大規模かつ活発なコミュニティが存在し、MongoDBを使用したEventSourcingの実装に関するリソース、ライブラリ、ベストプラクティスが提供されています。

MongoDBがこれらの利点を提供していますが、EventSourcingは他のデータベースでも実装できることに注意することが重要です。EventSourcingのためのデータベースの選択は、アプリケーションの特定の要件や制約に依存します。

EventSourcingにMongoDBをどのように使用するのか

最初に、以下のグラフを見てください。

image.png

アイデアは2つのMongoDBデータベースを使用します。1つはCommandDBと呼ばれ、変更されたイベントデータを保存する責任があります。もう1つはQueryDBと呼ばれ、データの取得に使用されます。

CommandDBからQueryDBにデータを同期化する方法は、MongoDBのaggregate機能を使用しています。MongoDBでは、aggregateメソッドを使用してコレクション内のデータに対する集計操作を実行します。これにより、データを処理および分析し、しばしばグルーピング、マッチング、ソート、およびドキュメントの変換などのさまざまな段階を含む処理が可能です。

セットアップ

以下のようにCommandDBに挿入されたデータセットを持っています。

[
  {
    "user_id": 2,
    "type": "create",
    "data": {
      email: "stein@gmail.com",
      user_name: "stein123"
    },
    timestamp: ISODate("2023-05-01T12:34:00Z")
  },
  {
    "user_id": 2,
    "type": "update",
    "data": {
      email: "new_stein@gmail.com",
      user_name: "stein456"
    },
    timestamp: ISODate("2023-06-01T12:34:00Z")
  },
  {
    "user_id": 1,
    "type": "create",
    "data": {
      email: "old_niets@gmail.com",
      user_name: "niets123"
    },
    timestamp: ISODate("2023-07-01T12:34:00Z")
  },
  {
    "user_id": 1,
    "type": "update",
    "data": {
      email: "niets@gmail.com",
      user_name: "new niets"
    },
    timestamp: ISODate("2023-08-01T12:34:00Z")
  }
]

最新のユーザーデータをQueryDBに同期するために、次のようなaggregateを行いました。

db.collection.aggregate([
  {
    $match: {
      timestamp: {
        $gte: ISODate("2023-01-01")
      }
    }
  },
  {
    $sort: {
      timestamp: 1
    }
  },
  {
    $group: {
      _id: "$user_id",
      userDetails: {
        $mergeObjects: "$data"
      }
    }
  },
  {
    $replaceRoot: {
      newRoot: {
        $mergeObjects: [
          {
            user_id: "$_id"
          },
          "$userDetails",
          {
            timestamp: new Date()
          }
        ]
      }
    }
  },
  {
    $merge: {
      into: "users",
      on: "user_id",
      whenMatched: "replace",
      whenNotMatched: "insert"
    }
  }
])

このMongoDBのaggregateコードは、コレクションデータに一連の操作を行います。各ステージの役割を以下に示します。

  • $match: タイムスタンプフィールドを基準にドキュメントをフィルタリングし、2023年1月1日以降のタイムスタンプを持つもののみを選択します。

  • $sort: フィルタされたドキュメントをタイムスタンプフィールドで昇順に並べ替えます。

  • $group: user_idフィールドでソートされたドキュメントをグループ化します。

  • $mergeObjectsを使用してグループ化されたドキュメントの詳細を1つのオブジェクトにまとめます。

  • $replaceRoot: マージされたユーザーの詳細とuser_id、およびnew Date()によって生成される新しいタイムスタンプを含む新しいドキュメントで元のドキュメントを置き換えます。

  • $merge: 結果のドキュメントを "users" コレクションにマージします。同じuser_idを持つドキュメントがすでに存在する場合は置換されますが、存在しない場合は新しいドキュメントが挿入されます。

要するに、この集計パイプラインは特定の基準と変換に基づいて、入力コレクションから「users」コレクションにドキュメントをフィルタリング、ソート、グループ化、操作、マージします。

結果

image.png

異なるデータセットを試したい場合は、以下のリンクにアクセスしてください。

以上

最後までご覧いただきありがとうございました!

エアークローゼット Advent Calendar 2023はまだまだ続きますので、ぜひ他のエンジニア, デザイナー, PMの記事もご覧いただければと思います。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?