4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめに

この記事は GCP(Google Cloud Platform) Advent Calendar 2023 8日目の記事です。

メッセージキューを用いた非同期処理において、パブリッシャーが誤ったメッセージスキーマを用いている場合でも、サブスクライバからパブリッシャーへのエラー通知が難しいという問題があります。

スキーマを事前定義可能なApache Avroを利用しCloud Pub/Subトピックを構成することで、スキーマと異なったメッセージがパブリッシュされた際パブリッシャーへエラーを通知することが出来ます。

本記事では実際にGolangのコードでPub/Subへメッセージを送受信しスキーマ検証の効果について検証します。
また、Avroにはメッセージをシリアライズすることでサイズを圧縮可能という魅力があります。圧縮効果に関しても効果を検証します。

Cloud Pub/Subとは

Google Cloud マネージドな非同期メッセージングサービス。Cloud Pub/Sub等の非同期メッセージングを使用することでシステム間のつながりを非同期にすることが出来ます。

Apache Avroとは

Avroとはシリアライズ可能なデータフォーマットです。データスキーマを定義し、またその定義に沿ったデータをバイナリへシリアライズすることが出来ます。今回使用するGolangの他にも、Java, Python, C/C++/C#, PHP, Ruby, Rust, JavaScript, Perl等にも広く対応しています。

Cloud Pub/SubにAvroスキーマを利用するメリット

メリット1: メッセージスキーマの保証

メッセージングサービスを使用した非同期処理アーキテクチャのデメリットとして、サブスクライバとパブリッシャー間のメッセージ形式が異なっていた場合でも、パブリッシャーにそれを通知することが難しいことが挙げられます。

同期通信を使用において、クライアントが誤ったスキーマのデータをサーバーへ送信した場合、サーバーはクライアントへエラー応答を返すことが出来ます。

image.png

非同期通信の場合、一般的にはパブリッシャーが誤ったメッセージスキーマを送信した場合にもサブスクライバはパブリッシャーへエラー応答を返すことが出来ません。別の方法でエラーを通知することが必要です。

image.png

Avroスキーマを使用して較正したPub/Subトピックを使う場合、パブリッシャーのメッセージスキーマが誤っていた場合にはパブリッシュに成功しません。これによりパブリッシャーが正しいスキーマを使用していることを保証することが出来ます。

image.png

メリット2: メッセージサイズの削減

また、Avroはメッセージをバイナリへとシリアライズすることでメッセージのサイズを削減することが可能です。これ単体でAvroを選択する理由としては弱いかと思いますが、大量のメッセージを送受信するユースケースではコストメリットとなるでしょう。

Avroスキーマを使用したPub/Subの実装手順

このトピックでは実際にAvroスキーマを使用してPub/Subを構成し、メッセージを送受信します。

本記事ではGoogle Cloudプロジェクトの作成、gcloud CLIのインストールおよび設定アプリケーションへのデフォルト認証情報設定が完了している前提で話を進めます

Avroスキーマの作成・Google Cloudへの登録

まずはAvroスキーマを作成し、Pub/Subのスキーマとして登録します。
一連の実装手順において、下記のドキュメント・ブログを参考にしました。

まずはavroスキーマを作成します。Google Cloudドキュメントと同じスキーマを sample.avsc という名前で保存しました。

sample.avsc
{
 "type" : "record",
 "name" : "Avro",
 "fields" : [
   {
     "name" : "StringField",
     "type" : "string"
   },
   {
     "name" : "FloatField",
     "type" : "float"
   },
   {
     "name" : "BooleanField",
     "type" : "boolean"
   }
 ]
}

このスキーマをPub/Sub用のスキーマとして登録します。

gcloud pubsub schemas create avro-sample-schema --type=AVRO --definition-file=sample.avsc

Pub/Subトピックとサブスクリプションの作成

次に、作成したAvroスキーマを使用するようにPub/Subトピックとサブスクリプションを作成します。

gcloud pubsub topics create my-topic --message-encoding=JSON --schema=avro-sample-schema
gcloud pubsub subscriptions create my-sub --topic my-topic

スキーマはトピックに設定します。サブスクリプションの作成コマンドがスキーマに言及していないことからもこれは読み取ることが出来ます。

--message-encoding オプションはAvroスキーマをシリアライズした状態でPub/Subに送信するかどうかのオプションで、これによってはスキーマの検証方法が変わります。下記の2パターンから選択可能です。

  • JSON: データをJSON文字列の形式でPub/Subへ送信
  • BINARY: データをシリアライズしバイナリ形式でPub/Subへ送信

ここでは JSON としておきます。

--message-encoding=BINARY にした場合どうなるかも後程検証します

Go言語を用いたアプリケーションの構築

今回はGo言語を使用してアプリケーションを構築します。
使用するバージョンはこちらです。

$ go version
go version go1.21.5 windows/amd64

モジュールを作成し、使用するpubsubパッケージget しておきます。

go mod init example.com/go-pubsub-avro-example
go get cloud.google.com/go/pubsub

次に、アプリケーションコードを作成します。
メッセージを1件publishした後subscribeするだけの簡単な処理になっています。

コードはクイックスタートで紹介されているものを一部改変(解説用にコメント追加、変数定義の位置変更、ログフォーマット変更)しただけのためコード自体の詳細な説明は省略します。

main.go
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"cloud.google.com/go/pubsub"
)

var (
	projectID = os.Getenv("PROJECT_ID")
	topicID   = "my-topic"
	subID     = "my-sub"
)

func main() {
	// まずはトピックにメッセージをパブリッシュする
	err := publish()
	if err != nil {
		fmt.Printf("failed to publish: %v\n", err)
		os.Exit(1)
	}

	// その後、先ほどパブリッシュしたメッセージをサブスクライブする
	err = subscribe()
	if err != nil {
		fmt.Printf("failed to subscribe: %v\n", err)
		os.Exit(1)
	}
}

// publish - トピックにメッセージをパブリッシュする
func publish() error {
	ctx := context.Background()

	// Pub/Subクライアントを作成
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub: NewClient: %w", err)
	}
	defer client.Close()

	// パブリッシュするメッセージを定義
	msg := []byte(`{"StringField":"hello","FloatField":3.14,"BooleanField":true}`)

	// メッセージをパブリッシュする
	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: msg,
	})

	// Get()はパブリッシュが完了するまで待機し、Pub/Subにより発行されるメッセージIDを返す
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("pubsub: result.Get: %w", err)
	}
	fmt.Printf("message published.  ID:%s size:%d\n  message:%s\n", id, len(msg), msg)
	return nil
}

// subscribe - トピックにパブリッシュされたメッセージを10秒間サブスクライブする
func subscribe() error {
	ctx := context.Background()

	// Pub/Subクライアントを作成し、サブスクリプションを取得
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()
	sub := client.Subscription(subID)

	// 10秒間メッセージを受信する
	// テスト用に10秒間のタイムアウトを設定しているため、
	// 本番環境ではこの部分は削除すること
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		// 受信したメッセージを表示
		fmt.Printf("message published.  ID:%s size:%d\n  message:%s\n", msg.ID, len(msg.Data), msg.Data)

		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	}

	return nil
}

実行に必要な環境変数を設定して

export PROJECT_ID=<your-project-id>

アプリケーションを実行します。

$ go run main.go
message published.  ID:10085227112253211 size:61
  message:{"StringField":"hello","FloatField":3.14,"BooleanField":true}
message published.  ID:10085227112253211 size:61
  message:{"StringField":"hello","FloatField":3.14,"BooleanField":true}

ID・メッセージが同じことから、publish()にて送信したメッセージがsubscribe()にて受信できていることが分かります。
また、メッセージはJSON形式の文字列で送受信されていることが分かります。

Pub/SubとAvroによるメッセージの検証を確認する

メッセージの型が誤っているときにPub/Subへのパブリッシュが失敗することを確認します。

AvroスキーマにおけるBooleanFieldに文字列が入るようにアプリケーションを変更します。

main.go
-		msg := []byte(`{"StringField":"hello","FloatField":3.14,"BooleanField":true}`)
+		msg := []byte(`{"StringField":"hello","FloatField":3.14,"BooleanField":"this is not boolean value"}`)

この状態でアプリケーションを実行すると、エラーが発生します。

$ go run main.go
failed to publish: pubsub: result.Get: rpc error: code = InvalidArgument desc = Invalid data in message: Message failed schema validation.
error details: name = ErrorInfo reason = INVALID_JSON_AVRO_MESSAGE domain = pubsub.googleapis.com metadata = 
map[message:Message failed schema validation revisionInfo:Could not validate message with any schema revision for schema: projects/507527431412/schemas/avro-sample-schema, last checked revision: revision_id=1107d938 failed with status: JSON object with type string does not match schema which expected boolean]
exit status 1

failed to publish: pubsub: result.Get:というところから、パブリッシャー側がエラーで落ちていることが分かります。パブリッシュのためにPub/Sub APIを呼び出したものの、エラー応答がPub/Subから返ってきたようです。

エラーメッセージ JSON object with type string does not match schema which expected boolean からBooleanが想定されたフィールドに文字列が入っていたことが読み取れ、想定通りのエラーが発生していることが分かります。

このように、Avroスキーマを使うことで非同期処理設計において難しいメッセージの形式の保証を実現することができます。

シリアライズしたメッセージをPub/Subにて送受信する

先ほどはメッセージをJSON形式で送受信しましたが、Avroの魅力の1つであるメッセージのシリアライズを用いて、バイナリ形式のメッセージを送受信します。

トピックの再作成

トピックを削除し、--message-encoding=BINARYオプションを付与し再度作成します。これによりPub/Subに送信されるメッセージはシリアライズした状態であることが期待されます。

# トピックを削除
gcloud pubsub topics delete my-topic
gcloud pubsub subscriptions delete my-sub
# トピックを再作成
gcloud pubsub topics create my-topic --message-encoding=BINARY --schema=avro-sample-schema
gcloud pubsub subscriptions create my-sub --topic my-topic

アプリケーションの修正

まずはAvroのシリアライズ/デシリアライズに必要なgoavroモジュールを get しておきます。

$ go get github.com/linkedin/goavro/v2

トピックにあわせ、アプリケーションコードもメッセージをシリアライズして送信するように変更します。
シリアライズ/デシリアライズにはgoavroモジュールを使用することで簡単に実装できています。

main.go
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"cloud.google.com/go/pubsub"
+	"github.com/linkedin/goavro/v2"
)

var (
	projectID = os.Getenv("PROJECT_ID")
	topicID   = "my-topic"
	subID     = "my-sub"
)

func main() {
	// まずはトピックにメッセージをパブリッシュする
	err := publish()
	if err != nil {
		fmt.Printf("failed to publish: %v\n", err)
		os.Exit(1)
	}

	// その後、先ほどパブリッシュしたメッセージをサブスクライブする
	err = subscribe()
	if err != nil {
		fmt.Printf("failed to subscribe: %v\n", err)
		os.Exit(1)
	}
}

// publish - トピックにメッセージをパブリッシュする
func publish() error {
	ctx := context.Background()

	// Pub/Subクライアントを作成
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub: NewClient: %w", err)
	}
	defer client.Close()

+	// Avroスキーマを読み込み
+	avroSource, err := os.ReadFile("sample.avsc")
+	if err != nil {
+		return fmt.Errorf("os.ReadFile err: %w", err)
+	}
+	codec, err := goavro.NewCodec(string(avroSource))
+	if err != nil {
+		return fmt.Errorf("goavro.NewCodec err: %w", err)
+	}
	// パブリッシュするメッセージを定義
-	msg := []byte(`{"StringField":"hello","FloatField":3.14,"BooleanField":true}`)
+	record := map[string]interface{}{
+		"StringField":  "hello",
+		"FloatField":   3.14,
+		"BooleanField": true,
+	}
+
+	// メッセージをバイナリへシリアライズ
+	msg, err := codec.BinaryFromNative(nil, record)
+	if err != nil {
+		return fmt.Errorf("codec.BinaryFromNative err: %w", err)
+	}

	// メッセージをパブリッシュする
	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: msg,
	})

	// Get()はパブリッシュが完了するまで待機し、Pub/Subにより発行されるメッセージIDを返す
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("pubsub: result.Get: %w", err)
	}
	fmt.Printf("message published.  ID:%s size:%d\n  message:%s\n", id, len(msg), msg)
	return nil
}

// subscribe - トピックにパブリッシュされたメッセージを10秒間サブスクライブする
func subscribe() error {
	ctx := context.Background()

	// Pub/Subクライアントを作成し、サブスクリプションを取得
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()
	sub := client.Subscription(subID)

	// 10秒間メッセージを受信する
	// テスト用に10秒間のタイムアウトを設定しているため、
	// 本番環境ではこの部分は削除すること
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
+
+	// Avroスキーマを読み込み
+	avroSource, err := os.ReadFile("sample.avsc")
+	if err != nil {
+		return fmt.Errorf("os.ReadFile err: %w", err)
+	}
+	codec, err := goavro.NewCodec(string(avroSource))
+	if err != nil {
+		return fmt.Errorf("goavro.NewCodec err: %w", err)
+	}

	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		// 受信したメッセージを表示
		fmt.Printf("message published.  ID:%s size:%d\n  message:%s\n", msg.ID, len(msg.Data), msg.Data)

+		// メッセージをデシリアライズし、元のメッセージを表示
+		original, _, err := codec.NativeFromBinary(msg.Data)
+		if err != nil {
+			fmt.Println(err)
+		}
+		fmt.Printf("original message: %v\n", original)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	}

	return nil
}

アプリケーションを実行します。

$ go run main.go
message published.  ID:10085698170393081 size:11
  message:
hello��H@
message published.  ID:10085698170393081 size:11
  message:
hello��H@
original message: map[BooleanField:true FloatField:3.14 StringField:hello]

作成したメッセージが先ほどと異なりバイナリ形式で送受信されていることが分かります。メッセージの長さも先ほどの61バイトから11バイトへと圧縮されています。

ただし、バイナリ形式となったことでログ出力が文字化けしてしまいました。送信時に内容をログで出したい場合等はシリアライズする前にログ出力を挟む等、少し工夫が必要となります。

またgoavroモジュール等の追加の依存モジュールが発生するため、少しでも依存モジュールを減らしたいユースケースにおいてはJSON形式を選択することとなりそうです。

まとめ

Apache Avroスキーマを用いてCloud Pub/Subトピックを構成することで、メッセージのスキーマをパブリッシュ時に検証することが出来ます。これによりパブリッシャー/サブスクライバ間でメッセージスキーマが異なることによってサブスクライバ側でエラーが発生することを事前に防ぐことが可能となります。

またAvroのシリアライズ機能によりメッセージサイズの削減も可能です。ただし、JSON形式で送受信することにより

  • ログ出力が見やすいものとなる
  • Avro用のライブラリを追加する必要がなくなる

といったメリットもあるためここはアプリケーションの仕様や要件によって選択可能なオプションである印象を持ちました。

余談

  1. Apache Avroのリポジトリのサンプルを見る限りはAvroの要素にはnullableなものも指定できるようですが、実装が複雑になりそうなのでnullかどうかを示すbooleanなフィールドを用意するようなスキーマにした方が簡単にできそうです。
  2. アプリケーションのインターフェースとしてAvroスキーマを使用する場合には、Avroスキーマのバージョン管理も重要になります。今回は単純にテキストファイルを用いましたがAvro用のリポジトリを作成し、それによるバージョン管理が重要となりますが紙面の都合上省略しています。
4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?