5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

AWS X-Ray+Goを使ってSQSでのメッセージングを観測する

Last updated at Posted at 2020-06-20

概要

タイトルの遠り、AWS X-Rayを使って、SQSでのメッセージングを観測します。X-Rayを使えば、複数のアプリケーション間で「どこからどこにメッセージが送られているか」が図示されるため、アプリ間通信の仕様や障害発生箇所の把握がしやすくなります。今回は、Go言語を使って実装します。

結論(ソース)はページ下部にあります。

環境

  • Go言語 1.14.3
  • aws-sdk-go
  • aws-xray-sdk-go
  • Mac OS X 10.15.4

前提として、aws-sdk-go, aws-xray-sdk-goはgo getコマンドでインストール済みとします。

この記事の対象読者

  • AWSの有名どころサービスなら開発したことがある。AWS CLIも設定済み。
  • SQSは知識があるがX-Rayはあまり知らない。X-Rayの概要や実装方法を知りたい
  • (実装部分については)Go言語はそれなりに読める

X-Rayの概要

アプリから情報を収集して、以下のような__サービスグラフ__を作成したり、リクエストの実行時間を計測することができます。アプリのパフォーマンス可視化や障害発生箇所の特定など、可観測性の向上に役立てることができます。また様々なAWSサービスと統合することができます。
image.png
出典:https://docs.aws.amazon.com/ja_jp/xray/latest/devguide/aws-xray.html

データ収集の仕組み

X-Rayは、アプリに組み込まれたSDKが情報を収集し、X-Rayデーモンを経由してコンソールに情報を表示します。以下のイメージがわかりやすいです。左下のアプリケーションからの流れになります。
※スクリプトやツールから直接X-RayのAPIへデータを送る流れはこの記事では扱いません。
image.png
出典:20200526 AWS Black Belt Online Seminar AWS X-Ray

そのため、以下2つの準備が必要になります。

  • SQSを実行するアプリにxray-sdkを組み込む
  • X-Rayデーモンの起動

X-Rayの主要な要素

今回の実装に関係する部分のみ記載します。

セグメント/サブセグメント

セグメントは、X-Ray上で処理を分割する単位です。アプリやリソース、ホスト名などがイメージしやすいです。
一方、サブセグメントはセグメントを分割した細かな処理の単位です。HTTPのリクエスト、SQSのキューなどを定義できます。
基本的には、設定した名前がX-Rayのサービスグラフ上で表示される名前となります。

トレースヘッダー

X-Rayでは、アプリケーション間の通信をトレースするために__トレースID__が発行されます。これがHTTP通信ヘッダーにX-Amzn-Trace-Idとして設定されることで、通信のトレースが可能となります。これをトレースヘッダーと呼びます。

今回のSQSとX-Rayの統合では、SQSのメッセージ送受信元のアプリをセグメントに設定し、メッセージングにトレースヘッダーを設定することでトレースを可視化します。

その他の概要やより詳細な説明は、公式のドキュメントやBlack Belt資料がわかりやすいです。
AWS X-Ray の概念
20200526 AWS Black Belt Online Seminar AWS X-Ray

SQSとX-Rayの統合

先述の「データ収集の仕組み」に沿って、2つの準備をします。

SQSを実行するアプリにxray-sdkを組み込む

aws-xray-sdk-goの基本的な使い方として、コンテキスト(context)を用います。Go言語のコンテキストは、HTTPリクエスト等に引数として渡すことでタイムアウトやキャンセルを可能とする仕組みですが、X-Rayではトレースのためにコンテキストを利用しているわけです。

contextの設定
  ctx, seg := xray.BeginSegment(context.Background(), "service-name")  // service-nameがセグメント名
  subCtx, subSeg := xray.BeginSubsegment(ctx, "subsegment-name")  // subsegment-nameがサブセグメント名
  // ...略...
  // 終了時にはcontectのクローズを行う
  subSeg.Close(nil)
  seg.Close(nil)

通常、SQSでメッセージを送信するにはSendMessageを用いますが、これにはコンテキストを渡すことができません。そのため、コンテキストを渡すせるようWithContextを付加した関数が用意されています。SQSへのメッセージ送信の場合だとSendMessageWithContextとなります。この関数にcontextを追加で渡すことでトレースヘッダーが設定されます。受信側でそれを受け取ることにより、トレースが可能となるわけです。

SQSへのメッセージ送信処理
resp, err := svc.SendMessage(params)  // 通常のメッセージ送信
resp, err := svc.SendMessageWithContext(ctx, params)  // X-Rayを使う場合のメッセージ送信

コンテキストを追加で渡すだけで、それ以外は元の関数と差異はありません。

X-Rayデーモンの起動

X-Rayデーモンは、実行可能ファイルがAWS公式から提供されています。ここから環境に合わせた実行可能ファイルをダウンロードして実行します。
AWS X-Ray デーモン

なお、MacOSの場合、「開発元を確認できないため開けません」というメッセージが表示されることがありますが、以下などを参考に回避可能でした。
Macで「開発元を確認できないため、開けません」と表示された時の対処法

ソース

実際にX-RaySDKを組み込んだGoのソースです。

  • メッセージ送信処理(sqs-sender)
sqs-send-sample.go
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
	"github.com/aws/aws-xray-sdk-go/xray"
)

var (
	QueURL         = "https://sqs.<リージョン>.amazonaws.com/<アカウントID>/<キューの名前>"
	AwsRegion      = "<リージョン>"
)

var svc *sqs.SQS

func SendMessage() error {
	// SQSクライアントをxrayでラップ
	xray.AWS(svc.Client)

	// キューに送るメッセージと送信時に渡す構造体を設定
	message := "hello"
	params := &sqs.SendMessageInput{
		QueueUrl:                 aws.String(QueURL),
		MessageBody:              aws.String(message), 
	}

	// セグメントの宣言、contextの生成
	ctx, seg := xray.BeginSegment(context.Background(), "sqs-sender")
	subctx, subseg := xray.BeginSubsegment(ctx, "sqs-sender-sub")

	// メッセージ送信処理
	resp, err := svc.SendMessageWithContext(subctx, params)
	if err != nil {
		return err
	}
	fmt.Println(resp)

	// セグメントのクローズ
	subseg.Close(nil)
	seg.Close(nil)

	return nil
}

func main() {
	sampleSession := session.Must(session.NewSession())
	svc = sqs.New(sampleSession, aws.NewConfig().WithRegion(AwsRegion))

	if err := SendMessage(); err != nil {
		log.Fatal(err)
    }
}
  • メッセージ受信処理(sqs-reciever)
sqs-recieve-sample.go
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sqs"
	"github.com/aws/aws-xray-sdk-go/xray"
)

var (
	AwsRegion      = "<リージョン>"
	QueURL         = "https://sqs.<リージョン>.amazonaws.com/<アカウントID>/<キューの名前>"  // エンドポイント
)

var svc *sqs.SQS

func GetMessage() error {
	xray.AWS(svc.Client)

	// SQSから受け取る際に渡す構造体
	params := &sqs.ReceiveMessageInput{
		AttributeNames:      aws.StringSlice([]string{"AWSTraceHeader"}),  // トレースヘッダーを受け取る
		QueueUrl:            aws.String(QueURL),
		MaxNumberOfMessages: aws.Int64(10), // 一度に取得するメッセージの最大数
		WaitTimeSeconds:     aws.Int64(20), // ロングポーリングの時間
	}
	// セグメントの宣言、contextの生成
	ctx, seg := xray.BeginSegment(context.Background(), "sqs-reciever")
	subctx, subseg := xray.BeginSubsegment(ctx, "sqs-reciever-sub")

	// メッセージ受信の実行
	resp, err := svc.ReceiveMessageWithContext(subctx, params)
	if err != nil {
		return err
	}

	}
	for _, msg := range resp.Messages {
		fmt.Println(*msg.Body)
		
		// トレースヘッダーも取得&表示してみる
		msgAtr := msg.Attributes
		traceHeaderStr := msgAtr["AWSTraceHeader"]
		fmt.Println("AWSTraceHeader: ", traceHeaderStr)

		// メッセージ削除関数にもcontextを渡す
		if err := DeleteMessage(subctx, msg); err != nil {
			fmt.Println(err)
		}
	}
	// セグメントのクローズ
	subseg.Close(nil)
	seg.Close(nil)

	return nil
}

func DeleteMessage(ctx context.Context, msg *sqs.Message) error {
	params := &sqs.DeleteMessageInput{
		QueueUrl:      aws.String(QueURL),
		ReceiptHandle: aws.String(*msg.ReceiptHandle),
	}
	// メッセージの削除を実行。このときもcontextを渡す
	_, err := svc.DeleteMessageWithContext(ctx, params)

	if err != nil {
		return err
	}
	return nil

}

func main() {
	sampleSession := session.Must(session.NewSession())
	svc = sqs.New(sampleSession, aws.NewConfig().WithRegion(AwsRegion))

	// ポーリング
	for {
		if err := GetMessage(); err != nil {
			log.Fatal(err)
		}
	}
}

実行結果

事前にX-Rayデーモンを動かしておきます。

X-Rayデーモンの実行(東京リージョンの場合)
$ ./xray_mac -o -n ap-northeast-1
2020-06-19T22:29:57+09:00 [Info] Initializing AWS X-Ray daemon 3.2.0
2020-06-19T22:29:57+09:00 [Debug] Listening on UDP 127.0.0.1:2000
2020-06-19T22:29:57+09:00 [Info] Using buffer memory limit of 163 MB
2020-06-19T22:29:57+09:00 [Info] 2608 segment buffers allocated
2020-06-19T22:29:57+09:00 [Debug] Using Endpoint read from Config file: xray.ap-northeast-1.amazonaws.com
2020-06-19T22:29:57+09:00 [Debug] Using proxy address:
...  # 以下省略

その後、メッセージの送受信処理を動かします。一応マスクしていますが、こんな感じの実行結果が出力されます。

メッセージ送信処理
$ go run sqs-send-sample.go
2020-06-20T20:33:00+09:00 [INFO] X-Ray proxy using address : 127.0.0.1:2000
{
  MD5OfMessageBody: "XXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
  MessageId: "XXXXXX-XXXXX-XXXX-XXXX-XXXXXXXXXXXXXXX"
}
2020-06-20T20:33:00+09:00 [INFO] Emitter using address: 127.0.0.1:2000
メッセージ受信処理
$ go run sqs-recieve-sample.go
2020-06-20T20:32:54+09:00 [INFO] X-Ray proxy using address : 127.0.0.1:2000
hello  # メッセージ本文
AWSTraceHeader:  XXXXXXXXXXX  # 16進数の値が出力される
2020-06-20T20:33:00+09:00 [INFO] Emitter using address: 127.0.0.1:2000
...  # ポーリングのため中断されるまで続きます 

X-Rayのコンソール

以下の通り、サービスマップが表示されました。下のsqs-senderからキューにメッセージが送られ、sqs-recieverがそれをポーリングしている状態です。
image.png

終わりに

X-RayをGo言語で使っているサンプルが少なく色々苦労しましたが、なんとか良い感じのグラフを描くことができました。X-Rayは使いこなせれば面白いサービスだと思うので、他のAWSサービスとも統合させてみたいと思います。

余談

SQSは、SNSと統合して用いることが多々あると思います。X-RayはSNSとの統合もサポートしているのですが、SNSのサブスクライバーとしてトレースがサポートされているのは、2020/6時点でHTTP/HTTPSとAWS Lambdaの2つです。つまり、残念ながらSNS+SQSの統合は、SQSがサブスクライバーとして統合されていないため、まとめてサービスグラフを描くことができません。今後のアップデートに期待したいと思います。

Amazon SNS および AWS X-Ray

その他参考

5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?