概要
タイトルの遠り、AWS X-Rayを使って、SQSでのメッセージングを観測します。X-Rayを使えば、複数のアプリケーション間で「どこからどこにメッセージが送られているか」が図示されるため、アプリ間通信の仕様や障害発生箇所の把握がしやすくなります。今回は、Go言語を使って実装します。
結論(ソース)はページ下部にあります。
環境
- Go言語 1.14.3
- aws-sdk-go
- aws-xray-sdk-go
- Mac OS X 10.15.4
前提として、aws-sdk-go, aws-xray-sdk-goはgo get
コマンドでインストール済みとします。
この記事の対象読者
- AWSの有名どころサービスなら開発したことがある。AWS CLIも設定済み。
- SQSは知識があるがX-Rayはあまり知らない。X-Rayの概要や実装方法を知りたい
- (実装部分については)Go言語はそれなりに読める
X-Rayの概要
アプリから情報を収集して、以下のような__サービスグラフ__を作成したり、リクエストの実行時間を計測することができます。アプリのパフォーマンス可視化や障害発生箇所の特定など、可観測性の向上に役立てることができます。また様々なAWSサービスと統合することができます。
出典:https://docs.aws.amazon.com/ja_jp/xray/latest/devguide/aws-xray.html
データ収集の仕組み
X-Rayは、アプリに組み込まれたSDKが情報を収集し、X-Rayデーモンを経由してコンソールに情報を表示します。以下のイメージがわかりやすいです。左下のアプリケーションからの流れになります。
※スクリプトやツールから直接X-RayのAPIへデータを送る流れはこの記事では扱いません。
出典:20200526 AWS Black Belt Online Seminar AWS X-Ray
そのため、以下2つの準備が必要になります。
- SQSを実行するアプリにxray-sdkを組み込む
- X-Rayデーモンの起動
X-Rayの主要な要素
今回の実装に関係する部分のみ記載します。
セグメント/サブセグメント
セグメントは、X-Ray上で処理を分割する単位です。アプリやリソース、ホスト名などがイメージしやすいです。
一方、サブセグメントはセグメントを分割した細かな処理の単位です。HTTPのリクエスト、SQSのキューなどを定義できます。
基本的には、設定した名前がX-Rayのサービスグラフ上で表示される名前となります。
トレースヘッダー
X-Rayでは、アプリケーション間の通信をトレースするために__トレースID__が発行されます。これがHTTP通信ヘッダーにX-Amzn-Trace-Id
として設定されることで、通信のトレースが可能となります。これをトレースヘッダーと呼びます。
今回のSQSとX-Rayの統合では、SQSのメッセージ送受信元のアプリをセグメントに設定し、メッセージングにトレースヘッダーを設定することでトレースを可視化します。
その他の概要やより詳細な説明は、公式のドキュメントやBlack Belt資料がわかりやすいです。
AWS X-Ray の概念
20200526 AWS Black Belt Online Seminar AWS X-Ray
SQSとX-Rayの統合
先述の「データ収集の仕組み」に沿って、2つの準備をします。
SQSを実行するアプリにxray-sdkを組み込む
aws-xray-sdk-goの基本的な使い方として、コンテキスト(context)を用います。Go言語のコンテキストは、HTTPリクエスト等に引数として渡すことでタイムアウトやキャンセルを可能とする仕組みですが、X-Rayではトレースのためにコンテキストを利用しているわけです。
ctx, seg := xray.BeginSegment(context.Background(), "service-name") // service-nameがセグメント名
subCtx, subSeg := xray.BeginSubsegment(ctx, "subsegment-name") // subsegment-nameがサブセグメント名
// ...略...
// 終了時にはcontectのクローズを行う
subSeg.Close(nil)
seg.Close(nil)
通常、SQSでメッセージを送信するにはSendMessage
を用いますが、これにはコンテキストを渡すことができません。そのため、コンテキストを渡すせるようWithContext
を付加した関数が用意されています。SQSへのメッセージ送信の場合だとSendMessageWithContext
となります。この関数にcontext
を追加で渡すことでトレースヘッダーが設定されます。受信側でそれを受け取ることにより、トレースが可能となるわけです。
resp, err := svc.SendMessage(params) // 通常のメッセージ送信
resp, err := svc.SendMessageWithContext(ctx, params) // X-Rayを使う場合のメッセージ送信
コンテキストを追加で渡すだけで、それ以外は元の関数と差異はありません。
X-Rayデーモンの起動
X-Rayデーモンは、実行可能ファイルがAWS公式から提供されています。ここから環境に合わせた実行可能ファイルをダウンロードして実行します。
AWS X-Ray デーモン
なお、MacOSの場合、「開発元を確認できないため開けません」というメッセージが表示されることがありますが、以下などを参考に回避可能でした。
Macで「開発元を確認できないため、開けません」と表示された時の対処法
ソース
実際にX-RaySDKを組み込んだGoのソースです。
- メッセージ送信処理(sqs-sender)
package main
import (
"context"
"fmt"
"log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-xray-sdk-go/xray"
)
var (
QueURL = "https://sqs.<リージョン>.amazonaws.com/<アカウントID>/<キューの名前>"
AwsRegion = "<リージョン>"
)
var svc *sqs.SQS
func SendMessage() error {
// SQSクライアントをxrayでラップ
xray.AWS(svc.Client)
// キューに送るメッセージと送信時に渡す構造体を設定
message := "hello"
params := &sqs.SendMessageInput{
QueueUrl: aws.String(QueURL),
MessageBody: aws.String(message),
}
// セグメントの宣言、contextの生成
ctx, seg := xray.BeginSegment(context.Background(), "sqs-sender")
subctx, subseg := xray.BeginSubsegment(ctx, "sqs-sender-sub")
// メッセージ送信処理
resp, err := svc.SendMessageWithContext(subctx, params)
if err != nil {
return err
}
fmt.Println(resp)
// セグメントのクローズ
subseg.Close(nil)
seg.Close(nil)
return nil
}
func main() {
sampleSession := session.Must(session.NewSession())
svc = sqs.New(sampleSession, aws.NewConfig().WithRegion(AwsRegion))
if err := SendMessage(); err != nil {
log.Fatal(err)
}
}
- メッセージ受信処理(sqs-reciever)
package main
import (
"context"
"fmt"
"log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-xray-sdk-go/xray"
)
var (
AwsRegion = "<リージョン>"
QueURL = "https://sqs.<リージョン>.amazonaws.com/<アカウントID>/<キューの名前>" // エンドポイント
)
var svc *sqs.SQS
func GetMessage() error {
xray.AWS(svc.Client)
// SQSから受け取る際に渡す構造体
params := &sqs.ReceiveMessageInput{
AttributeNames: aws.StringSlice([]string{"AWSTraceHeader"}), // トレースヘッダーを受け取る
QueueUrl: aws.String(QueURL),
MaxNumberOfMessages: aws.Int64(10), // 一度に取得するメッセージの最大数
WaitTimeSeconds: aws.Int64(20), // ロングポーリングの時間
}
// セグメントの宣言、contextの生成
ctx, seg := xray.BeginSegment(context.Background(), "sqs-reciever")
subctx, subseg := xray.BeginSubsegment(ctx, "sqs-reciever-sub")
// メッセージ受信の実行
resp, err := svc.ReceiveMessageWithContext(subctx, params)
if err != nil {
return err
}
}
for _, msg := range resp.Messages {
fmt.Println(*msg.Body)
// トレースヘッダーも取得&表示してみる
msgAtr := msg.Attributes
traceHeaderStr := msgAtr["AWSTraceHeader"]
fmt.Println("AWSTraceHeader: ", traceHeaderStr)
// メッセージ削除関数にもcontextを渡す
if err := DeleteMessage(subctx, msg); err != nil {
fmt.Println(err)
}
}
// セグメントのクローズ
subseg.Close(nil)
seg.Close(nil)
return nil
}
func DeleteMessage(ctx context.Context, msg *sqs.Message) error {
params := &sqs.DeleteMessageInput{
QueueUrl: aws.String(QueURL),
ReceiptHandle: aws.String(*msg.ReceiptHandle),
}
// メッセージの削除を実行。このときもcontextを渡す
_, err := svc.DeleteMessageWithContext(ctx, params)
if err != nil {
return err
}
return nil
}
func main() {
sampleSession := session.Must(session.NewSession())
svc = sqs.New(sampleSession, aws.NewConfig().WithRegion(AwsRegion))
// ポーリング
for {
if err := GetMessage(); err != nil {
log.Fatal(err)
}
}
}
実行結果
事前にX-Rayデーモンを動かしておきます。
$ ./xray_mac -o -n ap-northeast-1
2020-06-19T22:29:57+09:00 [Info] Initializing AWS X-Ray daemon 3.2.0
2020-06-19T22:29:57+09:00 [Debug] Listening on UDP 127.0.0.1:2000
2020-06-19T22:29:57+09:00 [Info] Using buffer memory limit of 163 MB
2020-06-19T22:29:57+09:00 [Info] 2608 segment buffers allocated
2020-06-19T22:29:57+09:00 [Debug] Using Endpoint read from Config file: xray.ap-northeast-1.amazonaws.com
2020-06-19T22:29:57+09:00 [Debug] Using proxy address:
... # 以下省略
その後、メッセージの送受信処理を動かします。一応マスクしていますが、こんな感じの実行結果が出力されます。
$ go run sqs-send-sample.go
2020-06-20T20:33:00+09:00 [INFO] X-Ray proxy using address : 127.0.0.1:2000
{
MD5OfMessageBody: "XXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
MessageId: "XXXXXX-XXXXX-XXXX-XXXX-XXXXXXXXXXXXXXX"
}
2020-06-20T20:33:00+09:00 [INFO] Emitter using address: 127.0.0.1:2000
$ go run sqs-recieve-sample.go
2020-06-20T20:32:54+09:00 [INFO] X-Ray proxy using address : 127.0.0.1:2000
hello # メッセージ本文
AWSTraceHeader: XXXXXXXXXXX # 16進数の値が出力される
2020-06-20T20:33:00+09:00 [INFO] Emitter using address: 127.0.0.1:2000
... # ポーリングのため中断されるまで続きます
X-Rayのコンソール
以下の通り、サービスマップが表示されました。下のsqs-senderからキューにメッセージが送られ、sqs-recieverがそれをポーリングしている状態です。
終わりに
X-RayをGo言語で使っているサンプルが少なく色々苦労しましたが、なんとか良い感じのグラフを描くことができました。X-Rayは使いこなせれば面白いサービスだと思うので、他のAWSサービスとも統合させてみたいと思います。
余談
SQSは、SNSと統合して用いることが多々あると思います。X-RayはSNSとの統合もサポートしているのですが、SNSのサブスクライバーとしてトレースがサポートされているのは、2020/6時点でHTTP/HTTPSとAWS Lambdaの2つです。つまり、残念ながらSNS+SQSの統合は、SQSがサブスクライバーとして統合されていないため、まとめてサービスグラフを描くことができません。今後のアップデートに期待したいと思います。