お題
Cloud Pub/Sub使ってますか?
大量のメッセージを捌きたい時に使いますね。
これ、メッセージ投入時の順序、保証したい時、ないですか?
AWSの場合であればSQSのFIFOキューを使うのかなと思います。
(過去に使おうとしたことがあったものの、まだ東京リージョンなかったのと1秒に最大3000メッセージという制約に尻込みして、結局使わなかった記憶が。。。)
SQSに相当するといったらGCPではCloud Pub/Subだと思いますが、残念ながらFIFOキューは無い。
と思っていたら、
(Beta版ですが)「メッセージの順序指定」という機能が出来ていたようで。
これで、メッセージがPublishした順にSubscribeできるのかと思い、試してみました。
とはいえ、だいぶアバウトな試し方なので、とりあえず、こういう試し方をしたところ、こういう結果になったというレベルの捉え方をしてもらえれば。
想定する読者
- GCPについては知っている。
 - Cloud Pub/Subを使ったことがある。ないし、おおよそどういうものかは知っている。
 - Golangもそれなりに書ける。
 
前提
- ローカルにGoの開発環境構築済み。
 - GCP契約済み。
 - ローカルでCloud SDKのセットアップ済み。
 - ローカルの環境変数
GOOGLE_APPLICATION_CREDENTIALSに(必要な権限を全て有したサービスアカウントの)鍵JSONファイルパス設定済み。 
開発環境
# OS - Linux(Ubuntu)
$ cat /etc/os-release 
NAME="Ubuntu"
VERSION="18.04.5 LTS (Bionic Beaver)"
# バックエンド
# 言語 - Golang
$ go version
go version go1.15.2 linux/amd64
IDE - Goland
GoLand 2020.2.3
Build #GO-202.7319.61, built on September 16, 2020
今回の全ソース
https://github.com/sky0621/go-publisher/tree/v0.1.0
https://github.com/sky0621/go-subscriber/tree/v0.1.0
ソース抜粋解説
go-publisher
5つのエンドポイントを用意。
topic.EnableMessageOrdering = trueとOrderingKey: operationSequence,の部分は、実は「メッセージの順序指定」のために必要なコード。
ただし、このソースでメッセージをPublishするTopic("my-normal-topic")は「メッセージの順序指定」用ではないSubscriptionと対応しているので、「メッセージの順序指定」は機能しない。
package main
import (
	"fmt"
	"log"
	"net/http"
	"os"
	"time"
	"cloud.google.com/go/pubsub"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
)
func main() {
	project := os.Getenv("PUB_PROJECT")
	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())
	e.GET("/order01", handler(project, "order01"))
	e.GET("/order02", handler(project, "order02"))
	e.GET("/order03", handler(project, "order03"))
	e.GET("/order04", handler(project, "order04"))
	e.GET("/order05", handler(project, "order05"))
	e.Logger.Fatal(e.Start(":8080"))
}
func handler(project, path string) func(c echo.Context) error {
	return func(c echo.Context) error {
		ctx := c.Request().Context()
		operationSequence := createOperationSequence()
		client, err := pubsub.NewClient(ctx, project)
		if err != nil {
			log.Fatal(err)
		}
		topic := client.Topic("my-normal-topic")
		defer topic.Stop()
		topic.EnableMessageOrdering = true
		message := &pubsub.Message{
			OrderingKey: operationSequence,
			Data:        []byte(path + ":" + operationSequence),
		}
		r := topic.Publish(ctx, message)
		if r == nil {
			log.Fatal("failed to topic.Publish!")
		}
		log.Printf("%+v", r)
		return c.String(http.StatusOK, path+":"+operationSequence)
	}
}
func createOperationSequence() string {
	return fmt.Sprintf("%d", time.Now().UnixNano())
}
go-subscriber
package main
import (
	"encoding/json"
	"io"
	"io/ioutil"
	"log"
	"net/http"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
)
func main() {
	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())
	e.POST("/", func(c echo.Context) error {
		m, err := unmarshal(c.Request().Body)
		if err != nil {
			return c.String(http.StatusBadRequest, err.Error())
		}
		data := string(m.Message.Data)
		log.Printf("fs_Received__Data:%s", data)
		return c.String(http.StatusOK, "OK")
	})
	e.Logger.Fatal(e.Start(":8080"))
}
type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}
func unmarshal(r io.ReadCloser) (*PubSubMessage, error) {
	var m PubSubMessage
	body, err := ioutil.ReadAll(r)
	if err != nil {
		log.Printf("ioutil.ReadAll: %v", err)
		return nil, err
	}
	if err := json.Unmarshal(body, &m); err != nil {
		log.Printf("json.Unmarshal: %v", err)
		return nil, err
	}
	return &m, nil
}
5つのエンドポイントを順繰り叩くシェル
かなり雑なつくり。
# !/usr/bin/env bash
set -euox pipefail
for i in {0..15}
do
	for j in {1..5}
	do
		curl "https://go-publisher-xxxxxxxxxxxxxx/order0${j}"
		sleepenh 0.005
	done
	sleepenh 0.005
done
実践
イメージ
RESTの口を持つサービスをCloud Runに載せておいて、アクセスが来たらCloud Pub/SubにPublishし、Pushタイプで用意しておいたSubscriptionが、これまたCloud Runに載せておいたRESTの口を持つサービスにリクエストを飛ばす。
通常のTopicとSubscriptionの場合
TopicとSubscriptionの設定
メッセージ投入結果
メッセージは、以下の順に繰り返しエンドポイントを叩いていて、Publisherのログ上もその順番になっているものの、Subscriberのログでは順不同になっている。
/order01
/order02
/order03
/order04
/order05
エンドポイントを叩くシェルが出したログ上はもちろん順序通り。
「メッセージの順序指定」に対応したTopicとSubscriptionの場合
TopicとSubscriptionの設定
同じリージョンに存在しているという条件があるのでTopicのリージョンを指定。
「メッセージの順序指定」は当然、「有効」にする。
メッセージ投入結果
何度か試してみた感じでは、立ち上がりに一部だけSubscribeする順番が入れ替わるものが発生するケースがあったが、それ以降は順序通りになっていた。
わりと順不同になっていた通常のTopic/Subscriptionの組み合わせの時と比べると安定した順番にはなっている。
まとめ
「メッセージの順序指定」モードにしても、順番が入れ替わる事象が起きたので、お題に対して「実現できる」と言い切れず。。。
これが、試し方の問題なのか、まあBeta版だからなのか、気が向いたら、、、突き詰めてみようか。。。








