LoginSignup
4
1

More than 3 years have passed since last update.

Cloud Pub/Subの「メッセージの順序指定」によりFIFOキューが実現できるのか?

Posted at

お題

Cloud Pub/Sub使ってますか?
大量のメッセージを捌きたい時に使いますね。
これ、メッセージ投入時の順序、保証したい時、ないですか?
AWSの場合であればSQSのFIFOキューを使うのかなと思います。
(過去に使おうとしたことがあったものの、まだ東京リージョンなかったのと1秒に最大3000メッセージという制約に尻込みして、結局使わなかった記憶が。。。)
SQSに相当するといったらGCPではCloud Pub/Subだと思いますが、残念ながらFIFOキューは無い。
と思っていたら、
(Beta版ですが)「メッセージの順序指定」という機能が出来ていたようで。
これで、メッセージがPublishした順にSubscribeできるのかと思い、試してみました。

とはいえ、だいぶアバウトな試し方なので、とりあえず、こういう試し方をしたところ、こういう結果になったというレベルの捉え方をしてもらえれば。

想定する読者

  • GCPについては知っている。
  • Cloud Pub/Subを使ったことがある。ないし、おおよそどういうものかは知っている。
  • Golangもそれなりに書ける。

前提

  • ローカルにGoの開発環境構築済み。
  • GCP契約済み。
  • ローカルでCloud SDKのセットアップ済み。
  • ローカルの環境変数GOOGLE_APPLICATION_CREDENTIALSに(必要な権限を全て有したサービスアカウントの)鍵JSONファイルパス設定済み。

開発環境

# OS - Linux(Ubuntu)

$ cat /etc/os-release 
NAME="Ubuntu"
VERSION="18.04.5 LTS (Bionic Beaver)"

# バックエンド

# 言語 - Golang

$ go version
go version go1.15.2 linux/amd64

IDE - Goland

GoLand 2020.2.3
Build #GO-202.7319.61, built on September 16, 2020

今回の全ソース

https://github.com/sky0621/go-publisher/tree/v0.1.0
https://github.com/sky0621/go-subscriber/tree/v0.1.0

ソース抜粋解説

go-publisher

5つのエンドポイントを用意。
topic.EnableMessageOrdering = trueOrderingKey: operationSequence,の部分は、実は「メッセージの順序指定」のために必要なコード。
ただし、このソースでメッセージをPublishするTopic("my-normal-topic")は「メッセージの順序指定」用ではないSubscriptionと対応しているので、「メッセージの順序指定」は機能しない。

main.go
package main

import (
    "fmt"
    "log"
    "net/http"
    "os"
    "time"

    "cloud.google.com/go/pubsub"
    "github.com/labstack/echo/v4"
    "github.com/labstack/echo/v4/middleware"
)

func main() {
    project := os.Getenv("PUB_PROJECT")

    e := echo.New()
    e.Use(middleware.Logger())
    e.Use(middleware.Recover())

    e.GET("/order01", handler(project, "order01"))
    e.GET("/order02", handler(project, "order02"))
    e.GET("/order03", handler(project, "order03"))
    e.GET("/order04", handler(project, "order04"))
    e.GET("/order05", handler(project, "order05"))

    e.Logger.Fatal(e.Start(":8080"))
}

func handler(project, path string) func(c echo.Context) error {
    return func(c echo.Context) error {
        ctx := c.Request().Context()
        operationSequence := createOperationSequence()

        client, err := pubsub.NewClient(ctx, project)
        if err != nil {
            log.Fatal(err)
        }

        topic := client.Topic("my-normal-topic")
        defer topic.Stop()

        topic.EnableMessageOrdering = true

        message := &pubsub.Message{
            OrderingKey: operationSequence,
            Data:        []byte(path + ":" + operationSequence),
        }
        r := topic.Publish(ctx, message)
        if r == nil {
            log.Fatal("failed to topic.Publish!")
        }
        log.Printf("%+v", r)

        return c.String(http.StatusOK, path+":"+operationSequence)
    }
}

func createOperationSequence() string {
    return fmt.Sprintf("%d", time.Now().UnixNano())
}

go-subscriber

main.go
package main

import (
    "encoding/json"
    "io"
    "io/ioutil"
    "log"
    "net/http"

    "github.com/labstack/echo/v4"
    "github.com/labstack/echo/v4/middleware"
)

func main() {
    e := echo.New()
    e.Use(middleware.Logger())
    e.Use(middleware.Recover())

    e.POST("/", func(c echo.Context) error {
        m, err := unmarshal(c.Request().Body)
        if err != nil {
            return c.String(http.StatusBadRequest, err.Error())
        }
        data := string(m.Message.Data)
        log.Printf("fs_Received__Data:%s", data)
        return c.String(http.StatusOK, "OK")
    })

    e.Logger.Fatal(e.Start(":8080"))
}

type PubSubMessage struct {
    Message struct {
        Data []byte `json:"data,omitempty"`
        ID   string `json:"id"`
    } `json:"message"`
    Subscription string `json:"subscription"`
}

func unmarshal(r io.ReadCloser) (*PubSubMessage, error) {
    var m PubSubMessage
    body, err := ioutil.ReadAll(r)
    if err != nil {
        log.Printf("ioutil.ReadAll: %v", err)
        return nil, err
    }
    if err := json.Unmarshal(body, &m); err != nil {
        log.Printf("json.Unmarshal: %v", err)
        return nil, err
    }
    return &m, nil
}

5つのエンドポイントを順繰り叩くシェル

かなり雑なつくり。

#!/usr/bin/env bash
set -euox pipefail

for i in {0..15}
do
    for j in {1..5}
    do
        curl "https://go-publisher-xxxxxxxxxxxxxx/order0${j}"
        sleepenh 0.005
    done
    sleepenh 0.005
done

実践

イメージ

RESTの口を持つサービスをCloud Runに載せておいて、アクセスが来たらCloud Pub/SubにPublishし、Pushタイプで用意しておいたSubscriptionが、これまたCloud Runに載せておいたRESTの口を持つサービスにリクエストを飛ばす。

screenshot-app.cloudskew.com-2020.10.13-01_50_36.png

通常のTopicとSubscriptionの場合

TopicとSubscriptionの設定

screenshot-console.cloud.google.com-2020.10.12-23_04_10.png

screenshot-console.cloud.google.com-2020.10.12-23_06_00.png

メッセージ投入結果

メッセージは、以下の順に繰り返しエンドポイントを叩いていて、Publisherのログ上もその順番になっているものの、Subscriberのログでは順不同になっている。
/order01
/order02
/order03
/order04
/order05

screenshot-console.cloud.google.com-2020.10.13-00_48_38.png

エンドポイントを叩くシェルが出したログ上はもちろん順序通り。

Screenshot at 2020-10-13 00-52-17.png

「メッセージの順序指定」に対応したTopicとSubscriptionの場合

TopicとSubscriptionの設定

同じリージョンに存在しているという条件があるのでTopicのリージョンを指定。

screenshot-console.cloud.google.com-2020.10.12-23_14_13.png
screenshot-console.cloud.google.com-2020.10.12-23_24_06.png

「メッセージの順序指定」は当然、「有効」にする。

screenshot-console.cloud.google.com-2020.10.12-23_22_08.png

メッセージ投入結果

何度か試してみた感じでは、立ち上がりに一部だけSubscribeする順番が入れ替わるものが発生するケースがあったが、それ以降は順序通りになっていた。
わりと順不同になっていた通常のTopic/Subscriptionの組み合わせの時と比べると安定した順番にはなっている。

Screenshot at 2020-10-13 01-30-45.png

まとめ

「メッセージの順序指定」モードにしても、順番が入れ替わる事象が起きたので、お題に対して「実現できる」と言い切れず。。。
これが、試し方の問題なのか、まあBeta版だからなのか、気が向いたら、、、突き詰めてみようか。。。

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1