はじめに
GoからAthenaのクエリを叩き、Athenaで取得したデータをもとにVegaでグラフを作成します。今回は、下の画像のような散布図を作成します。作業は大きく分けて、Goでの作業とVegaでの作業に分かれます。
Vegaとは?
グラフ作成ツールで、json形式のデータを読み込ませ、簡単なグラフから複雑なインタラクティブなグラフまで、様々なグラフを作ることができます。
Goでの作業
準備
.envを用意
GoからAWSにアクセスするために、AWSクレデンシャルを記載するファイルを用意します。
- まずは、.envファイルを用意します。
- .envに
AWS_ACCESS_KEY_ID
,AWS_SECRET_ACCESS_KEY
を記載します。スイッチロールが必要な場合はROLE_ARN
を指定します。
AWS_ACCESS_KEY_ID=XXXXX
AWS_SECRET_ACCESS_KEY=XXXXX
ROLE_ARN=XXXXX
go mod 初期化
Goのプログラムで必要なパッケージをダウンロードするために、下記のコマンドをターミナルで実行します。この時、go mod init hoge
のhoge
の部分には、好きな文字列を指定することができ、ここで指定した文字列は、コンパイル後の実行ファイルの名前になります。
$ go mod init hoge
上記のコマンドを実行後、go.mod
というファイルができていればOK。
main.go
全体像
package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"strings"
"log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/athena"
"github.com/joho/godotenv"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
)
type Plot_data struct {
Unixtime string `json:"unixtime"`
Duration string `json:"duration"`
}
const (
database = "aucfan_services_log_partition"
output_location = "s3://output_bucket/2020/12/07"
query = `
SELECT unixtime, duration
FROM aucfan_paapi_nginx_errorlog
WHERE year = 2020 AND month = 6 AND day = 1 AND status_code = 429
ORDER BY unixtime;
`
)
func newSession() *session.Session {
awscfg := &aws.Config{
Region: aws.String("ap-northeast-1"),
Credentials: credentials.NewStaticCredentialsFromCreds(credentials.Value{
AccessKeyID: os.Getenv("AWS_ACCESS_KEY_ID"),
SecretAccessKey: os.Getenv("AWS_SECRET_ACCESS_KEY"),
}),
}
sess := session.Must(session.NewSession(awscfg))
if len(os.Getenv("ROLE_ARN")) == 0 {
return sess
}
creds := stscreds.NewCredentials(sess, os.Getenv("ROLE_ARN"))
config := aws.Config{Region: sess.Config.Region, Credentials: creds}
sSess := session.New(&config)
return sSess
}
//クエリ実行
func runQuery(query string) error {
var (
err error
r athena.ResultConfiguration
)
sSess := newSession()
svc := athena.New(sSess, aws.NewConfig().WithRegion("ap-northeast-1"))
//クエリをセット
var s athena.StartQueryExecutionInput
s.SetQueryString(query)
//データベースをセット
var q athena.QueryExecutionContext
q.SetDatabase(database)
s.SetQueryExecutionContext(&q)
//結果の吐き出し用のS3バケット
fmt.Println(output_location)
r.SetOutputLocation(output_location)
s.SetResultConfiguration(&r)
//クエリ実行
result, err := svc.StartQueryExecution(&s)
if err != nil {
return err
}
fmt.Println("StartQueryExecution result:")
fmt.Println(result.GoString())
var qri athena.GetQueryExecutionInput
qri.SetQueryExecutionId(*result.QueryExecutionId)
var qrop *athena.GetQueryExecutionOutput
fmt.Println("waiting...")
for {
qrop, err = svc.GetQueryExecution(&qri)
if err != nil {
return err
}
if *qrop.QueryExecution.Status.State == "SUCCEEDED" || *qrop.QueryExecution.Status.State == "FAILED" {
break
}
}
if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
err = plot(svc, result.QueryExecutionId)
return err
} else {
fmt.Println(*qrop.QueryExecution.Status.State)
}
return err
}
// クエリ実行結果を取得
func getData(svc *athena.Athena, id *string, token *string) (*athena.GetQueryResultsOutput, *string, error) {
var err error
ip := &athena.GetQueryResultsInput{
QueryExecutionId: id,
NextToken: token,
}
op, err := svc.GetQueryResults(ip)
if err != nil {
return nil, nil, err
}
return op, op.NextToken, err
}
// 読み込んだ結果をjsonに書き込む
func plot(svc *athena.Athena, id *string) error {
var (
token *string
op *athena.GetQueryResultsOutput
)
token = nil
ioutil.WriteFile("data.json", []byte("["), os.ModePerm)
f, err := os.OpenFile("data.json", os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
log.Print(err)
return err
}
defer f.Close()
for { //tokenを使い、全てのデータを取得する
op, token, err = getData(svc, id, token)
if err != nil {
return err
}
var (
data Plot_data
eol string //行末
)
//取得したデータをjsonファイルへ書き込む
for i, s := range op.ResultSet.Rows {
for j, t := range s.Data {
if j == 0 { //Asia/Tokyoの部分を削除
data.Unixtime = strings.Replace(*t.VarCharValue, " Asia/Tokyo", "", 1)
} else {
data.Duration = *t.VarCharValue
}
}
if i != 0 {
if i == (len(op.ResultSet.Rows)-1) && token == nil{
eol = "]"
} else {
eol = ","
}
jsonBytes, err := json.Marshal(data)
if err != nil {
return err
}
out := new(bytes.Buffer)
json.Indent(out, jsonBytes, "", " ")
plot := out.String() + eol
fmt.Fprint(f, "\n" + plot)
}
}
if token == nil {
break
}
}
return err
}
func handler(c echo.Context) error {
fmt.Println(query)
err := runQuery(query) //クエリ実行
if err != nil {
return err
}
return c.File("data.json")
}
func main() {
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())
e.Use(middleware.CORS())
if err := godotenv.Load(".env"); err != nil {
log.Fatal(err)
}
e.GET("/", handler)
e.Start(":1323")
}
解説
定数
- database
- データを取得したいAthenaのデータベースを指定します。
- output_location
- クエリの結果の吐き出し先S3バケットを指定します。
- query
- 実行したいSQLをここに記述します 。今回のような散布図を作成する場合、
SELECT x座標, y座標
になるようにSQLを作ると良いでしょう。
- 実行したいSQLをここに記述します 。今回のような散布図を作成する場合、
const (
database = "aucfan_services_log_partition"
output_location = "s3://output_bucket/2020/12/07"
query = `
SELECT unixtime, duration
FROM aucfan_paapi_nginx_errorlog
WHERE year = 2020 AND month = 6 AND day = 1 AND status_code = 429
ORDER BY unixtime;
`
)
main
Go echoを使って、取得したAthenaのデータをJSON形式で返すAPIサーバーを立てます。
また、.envの読み込みもここでしています。godotenv.Load
の部分には作成した.envファイルまでのファイルパスを記述します。
func main() {
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())
e.Use(middleware.CORS())
if err := godotenv.Load(".env"); err != nil {
log.Fatal(err)
}
e.GET("/", handler)
e.Start(":1323")
}
handler
http://localhost:1323/ にアクセスした時にこの関数が実行されます。ここでは、Athenaでクエリを実行後、取得したデータをまとめたjsonファイルを返しています。
func handler(c echo.Context) error {
fmt.Println(query)
err := runQuery(query) //クエリ実行
if err != nil {
return err
}
return c.File("data.json")
}
runQuery
クエリ、データベース、S3バケットを設定後、Athenaでクエリを実行します。クエリ実行後、クエリの実行が終わるまで待ちます。クエリの実行が成功(SUCCEEDED)したら、データを取得する作業に移ります。クエリの実行が失敗(FAILED)したら、処理を終了します。
func runQuery(query string) error {
var (
err error
r athena.ResultConfiguration
)
sSess := newSession()
svc := athena.New(sSess, aws.NewConfig().WithRegion("ap-northeast-1"))
//クエリをセット
var s athena.StartQueryExecutionInput
s.SetQueryString(query)
//データベースをセット
var q athena.QueryExecutionContext
q.SetDatabase(database)
s.SetQueryExecutionContext(&q)
//結果の吐き出し用のS3バケット
fmt.Println(output_location)
r.SetOutputLocation(output_location)
s.SetResultConfiguration(&r)
//クエリ実行
result, err := svc.StartQueryExecution(&s)
if err != nil {
return err
}
fmt.Println("StartQueryExecution result:")
fmt.Println(result.GoString())
var qri athena.GetQueryExecutionInput
qri.SetQueryExecutionId(*result.QueryExecutionId)
var qrop *athena.GetQueryExecutionOutput
fmt.Println("waiting...")
for {
qrop, err = svc.GetQueryExecution(&qri)
if err != nil {
return err
}
if *qrop.QueryExecution.Status.State == "SUCCEEDED" || *qrop.QueryExecution.Status.State == "FAILED" {
break
}
}
if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
err = plot(svc, result.QueryExecutionId)
return err
} else {
fmt.Println(*qrop.QueryExecution.Status.State)
}
return err
}
newSession
AWSに接続するためのSessionを作成します。ここで、.envに記載してクレデンシャル情報を利用しています。
func newSession() *session.Session {
awscfg := &aws.Config{
Region: aws.String("ap-northeast-1"),
Credentials: credentials.NewStaticCredentialsFromCreds(credentials.Value{
AccessKeyID: os.Getenv("AWS_ACCESS_KEY_ID"),
SecretAccessKey: os.Getenv("AWS_SECRET_ACCESS_KEY"),
}),
}
sess := session.Must(session.NewSession(awscfg))
if len(os.Getenv("ROLE_ARN")) == 0 {
return sess
}
creds := stscreds.NewCredentials(sess, os.Getenv("ROLE_ARN"))
config := aws.Config{Region: sess.Config.Region, Credentials: creds}
sSess := session.New(&config)
return sSess
}
plot
クエリの実行結果を読み込み、jsonファイルに書き込みます。forループの中では、1行ずつ結果を読み込み、jsonファイルに書き込んでいます。
func plot(svc *athena.Athena, id *string) error {
var (
token *string
op *athena.GetQueryResultsOutput
)
token = nil
ioutil.WriteFile("data.json", []byte("["), os.ModePerm)
f, err := os.OpenFile("data.json", os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
log.Print(err)
return err
}
defer f.Close()
for { //tokenを使い、全てのデータを取得する
op, token, err = getData(svc, id, token)
if err != nil {
return err
}
var (
data Plot_data
eol string //行末
)
//取得したデータをjsonファイルへ書き込む
for i, s := range op.ResultSet.Rows {
for j, t := range s.Data {
if j == 0 { //Asia/Tokyoの部分を削除
data.Unixtime = strings.Replace(*t.VarCharValue, " Asia/Tokyo", "", 1)
} else {
data.Duration = *t.VarCharValue
}
}
if i != 0 {
if i == (len(op.ResultSet.Rows)-1) && token == nil{
eol = "]"
} else {
eol = ","
}
jsonBytes, err := json.Marshal(data)
if err != nil {
return err
}
out := new(bytes.Buffer)
json.Indent(out, jsonBytes, "", " ")
plot := out.String() + eol
fmt.Fprint(f, "\n" + plot)
}
}
if token == nil {
break
}
}
return err
}
getData
Athenaのクエリ実行結果を取得している部分です。実行結果は一度に最大1000件までしか取得できないため、1000件を超える場合はtokenを利用して、ページングにより次のデータを取得することができます。ループとtokenを使えば、全てのデータを取得することができます。
func getData(svc *athena.Athena, id *string, token *string) (*athena.GetQueryResultsOutput, *string, error) {
var err error
ip := &athena.GetQueryResultsInput{
QueryExecutionId: id,
NextToken: token,
}
op, err := svc.GetQueryResults(ip)
if err != nil {
return nil, nil, err
}
return op, op.NextToken, err
}
確認
1.コンパイルします
$ go build
2.プログラムを実行します
$ ./hoge
3.ブラウザで http://localhost:1323/ にアクセスし、jsonデータが表示されればOK。
Vegaでの作業
- https://vega.github.io/editor/ にアクセスします。
- エディタに下記のような内容を記述すれば、右側に空っぽのグラフが作成されます。
{
"$schema": "https://vega.github.io/schema/vega/v5.json",
"width": 400,
"height": 400,
"padding": {"left": 25, "top": 20, "right": 5, "bottom": 80},
"title": {"text": "Graph"},
"data": [
{
"name": "table",
"format": {
"type": "json",
"parse": {"unixtime": "date", "duration": "number"}
},
"async": true,
"url": "http://localhost:1323/"
}
],
"scales": [
{
"name": "xscale",
"type": "time",
"domain": {"data": "table", "field": "unixtime"},
"range": "width",
"padding": 0.05,
"round": true
},
{
"name": "yscale",
"domain": {"data": "table", "field": "duration"},
"nice": true,
"range": "height"
}
],
"axes": [
{
"orient": "bottom",
"scale": "xscale",
"labelAngle": 90,
"labelAlign": "left",
"format": "%b %d %H:%M"
},
{"orient": "left", "scale": "yscale", "title": "Response Time [sec]"}
],
"marks": [
{
"type": "symbol",
"from": {"data": "table"},
"encode": {
"enter": {
"size": {"value": 5},
"x": {"scale": "xscale", "field": "unixtime"},
"y": {"scale": "yscale", "field": "duration"}
}
}
}
]
}
3.先ほど作成したgoのプログラムを実行します
$ ./hoge
4.ブラウザのページを更新し、数秒後にグラフが表示されればOK。