お題
Cloud Pub/Sub使ってますか?
大量のメッセージを捌きたい時に使いますね。
これ、メッセージ投入時の順序、保証したい時、ないですか?
AWSの場合であればSQSのFIFOキューを使うのかなと思います。
(過去に使おうとしたことがあったものの、まだ東京リージョンなかったのと1秒に最大3000メッセージという制約に尻込みして、結局使わなかった記憶が。。。)
SQSに相当するといったらGCPではCloud Pub/Subだと思いますが、残念ながらFIFOキューは無い。
と思っていたら、
(Beta版ですが)「メッセージの順序指定」という機能が出来ていたようで。
これで、メッセージがPublishした順にSubscribeできるのかと思い、試してみました。
とはいえ、だいぶアバウトな試し方なので、とりあえず、こういう試し方をしたところ、こういう結果になったというレベルの捉え方をしてもらえれば。
想定する読者
- GCPについては知っている。
- Cloud Pub/Subを使ったことがある。ないし、おおよそどういうものかは知っている。
- Golangもそれなりに書ける。
前提
- ローカルにGoの開発環境構築済み。
- GCP契約済み。
- ローカルでCloud SDKのセットアップ済み。
- ローカルの環境変数
GOOGLE_APPLICATION_CREDENTIALS
に(必要な権限を全て有したサービスアカウントの)鍵JSONファイルパス設定済み。
開発環境
# OS - Linux(Ubuntu)
$ cat /etc/os-release
NAME="Ubuntu"
VERSION="18.04.5 LTS (Bionic Beaver)"
# バックエンド
# 言語 - Golang
$ go version
go version go1.15.2 linux/amd64
IDE - Goland
GoLand 2020.2.3
Build #GO-202.7319.61, built on September 16, 2020
今回の全ソース
https://github.com/sky0621/go-publisher/tree/v0.1.0
https://github.com/sky0621/go-subscriber/tree/v0.1.0
ソース抜粋解説
go-publisher
5つのエンドポイントを用意。
topic.EnableMessageOrdering = true
とOrderingKey: operationSequence,
の部分は、実は「メッセージの順序指定」のために必要なコード。
ただし、このソースでメッセージをPublishするTopic("my-normal-topic"
)は「メッセージの順序指定」用ではないSubscriptionと対応しているので、「メッセージの順序指定」は機能しない。
package main
import (
"fmt"
"log"
"net/http"
"os"
"time"
"cloud.google.com/go/pubsub"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
func main() {
project := os.Getenv("PUB_PROJECT")
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())
e.GET("/order01", handler(project, "order01"))
e.GET("/order02", handler(project, "order02"))
e.GET("/order03", handler(project, "order03"))
e.GET("/order04", handler(project, "order04"))
e.GET("/order05", handler(project, "order05"))
e.Logger.Fatal(e.Start(":8080"))
}
func handler(project, path string) func(c echo.Context) error {
return func(c echo.Context) error {
ctx := c.Request().Context()
operationSequence := createOperationSequence()
client, err := pubsub.NewClient(ctx, project)
if err != nil {
log.Fatal(err)
}
topic := client.Topic("my-normal-topic")
defer topic.Stop()
topic.EnableMessageOrdering = true
message := &pubsub.Message{
OrderingKey: operationSequence,
Data: []byte(path + ":" + operationSequence),
}
r := topic.Publish(ctx, message)
if r == nil {
log.Fatal("failed to topic.Publish!")
}
log.Printf("%+v", r)
return c.String(http.StatusOK, path+":"+operationSequence)
}
}
func createOperationSequence() string {
return fmt.Sprintf("%d", time.Now().UnixNano())
}
go-subscriber
package main
import (
"encoding/json"
"io"
"io/ioutil"
"log"
"net/http"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
func main() {
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())
e.POST("/", func(c echo.Context) error {
m, err := unmarshal(c.Request().Body)
if err != nil {
return c.String(http.StatusBadRequest, err.Error())
}
data := string(m.Message.Data)
log.Printf("fs_Received__Data:%s", data)
return c.String(http.StatusOK, "OK")
})
e.Logger.Fatal(e.Start(":8080"))
}
type PubSubMessage struct {
Message struct {
Data []byte `json:"data,omitempty"`
ID string `json:"id"`
} `json:"message"`
Subscription string `json:"subscription"`
}
func unmarshal(r io.ReadCloser) (*PubSubMessage, error) {
var m PubSubMessage
body, err := ioutil.ReadAll(r)
if err != nil {
log.Printf("ioutil.ReadAll: %v", err)
return nil, err
}
if err := json.Unmarshal(body, &m); err != nil {
log.Printf("json.Unmarshal: %v", err)
return nil, err
}
return &m, nil
}
5つのエンドポイントを順繰り叩くシェル
かなり雑なつくり。
#!/usr/bin/env bash
set -euox pipefail
for i in {0..15}
do
for j in {1..5}
do
curl "https://go-publisher-xxxxxxxxxxxxxx/order0${j}"
sleepenh 0.005
done
sleepenh 0.005
done
実践
イメージ
RESTの口を持つサービスをCloud Runに載せておいて、アクセスが来たらCloud Pub/SubにPublishし、Pushタイプで用意しておいたSubscriptionが、これまたCloud Runに載せておいたRESTの口を持つサービスにリクエストを飛ばす。
通常のTopicとSubscriptionの場合
TopicとSubscriptionの設定
メッセージ投入結果
メッセージは、以下の順に繰り返しエンドポイントを叩いていて、Publisherのログ上もその順番になっているものの、Subscriberのログでは順不同になっている。
/order01
/order02
/order03
/order04
/order05
エンドポイントを叩くシェルが出したログ上はもちろん順序通り。
「メッセージの順序指定」に対応したTopicとSubscriptionの場合
TopicとSubscriptionの設定
同じリージョンに存在している
という条件があるのでTopicのリージョンを指定。
「メッセージの順序指定」は当然、「有効」にする。
メッセージ投入結果
何度か試してみた感じでは、立ち上がりに一部だけSubscribeする順番が入れ替わるものが発生するケースがあったが、それ以降は順序通りになっていた。
わりと順不同になっていた通常のTopic/Subscriptionの組み合わせの時と比べると安定した順番にはなっている。
まとめ
「メッセージの順序指定」モードにしても、順番が入れ替わる事象が起きたので、お題に対して「実現できる」と言い切れず。。。
これが、試し方の問題なのか、まあBeta版だからなのか、気が向いたら、、、突き詰めてみようか。。。