CloudWatch logsをgolangで操作するには?
CloudWatchはかなりAWSの中でもかなり便利なサービスで大好きなんですが、いかんせんSDKにクセがあるので、
自分用のメモがてらラッパーライブラリを実装してみました。
作った機能
業務で使っている限り、大部分の時間はLogs Insight
で検索したりしているのですが、
これが結構もっさりしてたり、そもそもコンソール開くのが煩わしいので、CLI化したい、というのがモチベーションでした。
ただ、APを新規で作っているときはメッセージ保存とかも当然やるので、必要そうな機能は実装しました。
- LogGroup
- LogGroup一覧を返す
- メッセージ検索
- LogStream
- 新規作成
- LogStream一覧を返す
- LogEvent
- 新規作成
- 保存
リポジトリ
install
go get gitlab.com/kuritayu/logs
Getting Started
初期化
セッション情報を設定し、logs.New
を呼び出します。
func main() {
region := "ap-northeast-1"
sess := session.Must(
session.NewSession(&aws.Config{
Region: aws.String(region),
}))
cloudwatch := logs.New(sess) // ココ
LogGroup
一覧
FindAll
でLogGroupの一覧を取得します。Lambdaとか大量に使うシステムだと画面では1ページにおさまらないので、
一覧はほしくなりました。
// read log group
log.Println("ALL Log Group")
log.Println(cloudwatch.LogGroup().FindAll())
検索
メインにやりたかったところです。LogGroup
、キーワード、レンジ(From/To)を指定し、全ストリームを検索しています。
GrepByMessage
はLogs Insights
で使うクエリを内部的に使っています。
ポイントは、AWS SDKのStartQuery
が投げるとクエリIDを返してバックグラウンドで動くところでしょうか。
何らかの待ちを加えないと、Queryが終わる前に結果を取りに行く→結果なしになってしまいます。
この例では、time.sleep
でやりましたが、ループ処理をする等、結果の拾い方は検討の余地がありそうです。
// query
log.Println("query")
query, err := cloudwatch.LogGroup().GrepByMessage(
"/aws/lambda/get-sqs",
"test",
"2021-09-22T00:00:00+09:00",
"2021-09-22T23:59:59+09:00",
)
if err != nil {
log.Println(err)
}
log.Println(query)
func (lg LogGroup) GrepByMessage(logGroup string, keyword string, from string, to string) (string, error) {
// クエリの構築
query := fmt.Sprintf("fields @timestamp, @message, @logStream | filter @message like /%v/", keyword)
// クエリ投入定義構築
input := &cloudwatchlogs.StartQueryInput{
EndTime: aws.Int64(UnixMillisecond(ParsedTo)),
LogGroupName: aws.String(logGroup),
QueryString: aws.String(query),
StartTime: aws.Int64(UnixMillisecond(ParsedFrom)),
}
// クエリ実行
out, err := lg.Logs.client.StartQuery(input)
if err != nil {
return "", err
}
return aws.StringValue(out.QueryId), nil
}
結果の受け取りの部分です。クエリIDを指定すると、結果を取得できるようにしています。
ここでのポイントは、@timestamp
フィールドがUTCで返ってくることでしょうか。見づらいので、JST変換をしています。
また、過去の体験上、Logs Insigihts
でメッセージ全検索→Stream名取得→該当Streamの付近を見る、のケースがほとんどだったので、
今回のクエリはquery := fmt.Sprintf("fields @timestamp, @message, @logStream | filter @message like /%v/", keyword)
で十分と思い、ハードコーディングしました。
time.Sleep(5 * time.Second)
result, err := cloudwatch.LogGroup().Result(query)
if err != nil {
log.Println(err)
}
for _, line := range result {
fmt.Printf("%v %v %v\n", logs.Jst(line.Timestamp), line.LogStream, line.Message)
}
LogStream
LogGroupとほぼ実装は同じでした。
新規作成
// create log stream
log.Println("create log stream")
err := s.Save("testStream")
if err != nil {
log.Println(err)
}
一覧
// read log stream
log.Println("ALL Log Stream")
s := cloudwatch.LogStream("/aws/lambda/get-sqs")
log.Println(s.FindAll())
LogEvent
全件取得
LogGroupとLogStreamを指定することで全件取得できるようにしました。
// read log event
log.Println("FindAll Log Event")
e := cloudwatch.LogEvent(
"/aws/lambda/get-sqs",
"2021/09/12/[$LATEST]7d7fe71b3aac4a649003f10d9085752b")
events, _ := e.FindAll()
for _, event := range events {
fmt.Print(event.Message)
}
ライブラリ側の実装としては、Tokenを使ってTokenがある限りループする実装を組む必要があるところがポイントでした。
func (le LogEvent) FindAll() ([]LogContent, error) {
var events []LogContent
for {
items, err := le.Logs.client.GetLogEvents(&le.Input)
if err != nil {
return nil, err
}
for _, item := range items.Events {
event := &LogContent{
TimeStamp: aws.Int64Value(item.Timestamp),
Message: aws.StringValue(item.Message),
}
events = append(events, *event)
}
if aws.StringValue(items.NextBackwardToken) == aws.StringValue(le.Input.NextToken) {
break
}
le.Input.NextToken = items.NextBackwardToken
}
return events, nil
}
保存
// create log event
log.Println("create log event")
e.LogStreamName = "testStream"
err = e.Save("test message")
if err != nil {
log.Println(err)
}
こちらも、Tokenに気をつけないとうまく保存ができないです。
特に、Streamに1件でもデータがある場合はTokenがないとエラーになります。
func (le LogEvent) Save(message string) error {
// トークンを取得する
token, err := le.fetchToken()
if err != nil {
return err
}
// 時刻をミリ秒に変換する
eventTime := now()
var inputLogEvents []*cloudwatchlogs.InputLogEvent
inputLogEvent := &cloudwatchlogs.InputLogEvent{
Message: aws.String(message),
Timestamp: aws.Int64(eventTime),
}
inputLogEvents = append(inputLogEvents, inputLogEvent)
return le.doSave(inputLogEvents, token)
}
func (le LogEvent) doSave(inputLogEvent []*cloudwatchlogs.InputLogEvent, token *string) error {
input := &cloudwatchlogs.PutLogEventsInput{
LogEvents: inputLogEvent,
LogGroupName: aws.String(le.LogGroupName),
LogStreamName: aws.String(le.LogStreamName),
SequenceToken: token,
}
_, err := le.Logs.client.PutLogEvents(input)
if err != nil {
return err
}
return nil
}