想定する読者
お題
何かしらの情報に関して登録・更新・参照する、ごくごく普通のWebアプリケーションがあったとして、画面で操作した内容を自システムのDBに保存すると同時に、操作した順序を保って
別システムにも反映したいという要件があったとする。
別システムでは機能ごとにデータ反映用のAPIを提供しているので、それを叩けばよいのだけど、問題は1つ1つのAPIが重い(=要するにリクエスト受けてからレスポンスまでの時間が5秒以上かかる)ということ。
したがって、単純に考えて、画面で操作した流れでそのままAPIを同期的に叩くというわけにはいかない。
(そんなことをしたら、ユーザが何かの情報を登録・更新するたびに7〜8秒くらい待たされるシステムになってしまう。)
というわけで、APIを叩く部分は非同期にする。
すると、とたんに「操作した順序を保って
」の部分が怪しくなってくる。
さあ、どうしよう。
とりあえず非同期にするので、反映したい内容をメッセージキューにでも積むか。
※ちなみに、システムはGoogle Cloudを使うことが前提になっていることとする。
いろいろ選択肢はあるけど、一応、マネージドサービスを使うことを重視するという前提だと仮定して、以下を考える。
ただ、上記を使おうとすると以下の点で今回の要件と抵触する。
- 実行順序を保証しない。※
- 重複実行の可能性がある。
※Cloud Pub/Subの方は、Beta版ではあるものの「メッセージの順序指定」という機能が加わった。
参考までに。
https://qiita.com/sky0621/items/3df3ae65b859b8196e39
ただし、これも、Subscriptionからメッセージが送出される時の順序指定である。
少なくともPushタイプのSubscriptionにしている場合はメッセージ1をPush(=要するに指定のエンドポイントにリクエストすること)したあと、そのレスポンスを待ってメッセージ2をPushということはしてくれない。
(問答無用に「メッセージ1をPush」→「メッセージ2をPush」→・・・)
また、「重複実行の可能性」についても、ある種のデータの更新であれば2回、別システムに同じ更新をしても問題ないかもしれないが、新規データの作成の場合は、そうはいかない。
同じデータが2個作られてしまう。つまり、メッセージが重複しても、それを排除する仕組みを別途用意しないといけない。
たぶん一般的には、メッセージの生成元でユニークなIDを振って、それを「処理済みかどうか」の確認のためにMemorystore等に登録する。(既に登録済みだったら、このメッセージは処理済み(つまり重複配信された)とみなして削除する実装を入れる。)
実行順序の保証が難しいは重複実行への対処もしないといけないはで、やりたいことに対して考慮、実装しないといけない要素が多すぎる。
ということで、もっとシンプルに「画面操作の履歴をデータベースに持っておいて、格納順に処理(別システムのAPIを叩く)すればいいだけでは?」と考える。
とはいえ、スケーラビリティも考慮したい。
ということで、Datastoreの採用を考えたのだけど、「Firestore は次世代の Datastore です。
」なんて書いてあるので、Firestoreを使ってみることにする。
システムの全体像
- 「Webアプリケーション」(今回は実質WebAPIサーバ)は、Cloud Run
- 「RDB」(今回のソースでは実際の書き込みロジックは省略)は、Cloud SQL
- 「NoSQLストア」は、Firestore
- 「同期サービス」(今回はAPIを擬似的に叩いた体裁で数秒スリープさせるだけの実装)は、GKE
Webアプリケーション概要
いつもこの手のお題としていい塩梅のものが見つからない。
今回は、「学校」や「学年」、「クラス」、「先生」、「生徒」といった情報の登録ができるものとする。
(あくまで擬似的なものだけど。)
※ちなみに、私は学生ではありません。
エンドポイント
-
/add-school
・・・「学校」の登録 -
/add-grade
・・・「学年」の登録 -
/add-class
・・・「クラス」の登録 -
/add-teacher
・・・「先生」の登録 -
/add-student
・・・「生徒」の登録
前提
- ローカルにGoの開発環境構築済み。
- GCP契約済み。
- ローカルでCloud SDKのセットアップ済み。
- ローカルの環境変数
GOOGLE_APPLICATION_CREDENTIALS
に(必要な権限を全て有したサービスアカウントの)鍵JSONファイルパス設定済み。
開発環境
# OS - Linux(Ubuntu)
$ cat /etc/os-release
NAME="Ubuntu"
VERSION="18.04.5 LTS (Bionic Beaver)"
# バックエンド
# 言語 - Golang
$ go version
go version go1.15.2 linux/amd64
IDE - Goland
GoLand 2020.2.3
Build #GO-202.7319.61, built on September 16, 2020
今回の全ソース
Webアプリケーション
同期サービス
ソース抜粋解説
Webアプリケーション
package main
import (
"fmt"
"log"
"net/http"
"os"
"time"
"cloud.google.com/go/firestore"
"github.com/google/uuid"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
func main() {
project := os.Getenv("PUB_PROJECT")
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())
e.GET("/add-school", handler(project, "add-school"))
e.GET("/add-grade", handler(project, "add-grade"))
e.GET("/add-class", handler(project, "add-class"))
e.GET("/add-teacher", handler(project, "add-teacher"))
e.GET("/add-student", handler(project, "add-student"))
e.Logger.Fatal(e.Start(":8080"))
}
func handler(project, path string) func(c echo.Context) error {
return func(c echo.Context) error {
ctx := c.Request().Context()
client, err := firestore.NewClient(ctx, project)
if err != nil {
log.Fatal(err)
}
defer client.Close()
order := fmt.Sprintf("%s:%s", path, createUUID())
_, err = client.Collection("operation").Doc(order).
Set(ctx, map[string]interface{}{
"order": order,
"sequence": time.Now().UnixNano(),
}, firestore.MergeAll)
if err != nil {
log.Fatal(err)
}
return c.String(http.StatusOK, order)
}
}
func createUUID() string {
u, err := uuid.NewRandom()
if err != nil {
log.Fatal(err)
}
return u.String()
}
同期サービス
package main
import (
"context"
"log"
"os"
"strings"
"time"
"cloud.google.com/go/firestore"
)
func main() {
ctx := context.Background()
client, err := firestore.NewClient(ctx, os.Getenv("PUB_PROJECT"))
if err != nil {
log.Fatal(err)
}
defer client.Close()
operationIter := client.Collection("operation").
Where("sequence", ">", 0).OrderBy("sequence", firestore.Asc).Snapshots(ctx)
defer operationIter.Stop()
for {
operation, err := operationIter.Next()
if err != nil {
log.Fatalln(err)
}
for _, change := range operation.Changes {
ope, err := change.Doc.Ref.Get(ctx)
if err != nil {
log.Fatalln(err)
}
d := ope.Data()
order, ok := d["order"]
if ok {
ods := strings.Split(order.(string), ":")
if len(ods) > 0 {
od := ods[0]
switch od {
case "add-school":
time.Sleep(5 * time.Second)
case "add-grade":
time.Sleep(4 * time.Second)
case "add-class":
time.Sleep(3 * time.Second)
case "add-teacher":
time.Sleep(2 * time.Second)
case "add-student":
time.Sleep(1 * time.Second)
}
}
}
log.Printf("[operation-Data] %#+v", d)
}
}
}
実践
Firestoreへの書き込み側
以下5つのエンドポイントを上から順に叩いてみる、そして、それを2回繰り返すと、
-
/add-school
・・・「学校」の登録 -
/add-grade
・・・「学年」の登録 -
/add-class
・・・「クラス」の登録 -
/add-teacher
・・・「先生」の登録 -
/add-student
・・・「生徒」の登録
※FirestoreはデフォルトではドキュメントIDの昇順で並ぶようなので、必ずしもエンドポイントを叩いた順に並ぶわけではない。
Firestoreから情報をsequence
の昇順で取得する側
ちゃんと時系列に処理されているかどうか、コンテナログを見てみる。
以下の順番で2巡させたので想定通り。
-
/add-school
・・・「学校」の登録 -
/add-grade
・・・「学年」の登録 -
/add-class
・・・「クラス」の登録 -
/add-teacher
・・・「先生」の登録 -
/add-student
・・・「生徒」の登録
同期サービス内では、/add-school
をFirestoreから読み取った場合は5秒のスリープを入れているが、当然、/add-grade
の読み取りに追い越されるなんてことはない。
まとめ
こんな感じに、操作をFirestoreに時系列(が維持できるよう、OrderByがかけられるNanoレベルのUnixTimestampをフィールドに持つのが必要なわけだけど)に投入していくと、順番を保ったまま別システムに連携するシステムが作れる。
とはいえ、もちろんこれだけではプロダクションに上げるレベルのものにはならない。
例えば、Firestore内にドキュメントが溜まっていく一方(1回、別システムに連携したら不要なドキュメントなのに)である点や、それゆえに、実は、同期サービスを再デプロイすると、また、(処理済みなのに)1からドキュメントを拾って処理し始めてしまうという課題がある。
これについては、
「処理が終わるたびにドキュメント消せばいいのでは?」というのが単純な解になるかと思うけど、そうすると、「ドキュメントの削除」というのがトリガーとなって、1つのドキュメントを処理し終わったのに再度、同じドキュメントを処理し始める(削除されているので途中でエラーになる、ないし、ロジックによってはそのまま2回目の処理が走る)ことが起きる。
というわけで、
まだまだいろいろ考えるべきポイントはあるのだけど、とりあえず、いったんこのへんで。