8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Aucfan (オークファン) グループAdvent Calendar 2020

Day 10

【Go】 AWS Athenaから取得したデータをVegaでグラフ化してみた

Last updated at Posted at 2020-12-09

はじめに

GoからAthenaのクエリを叩き、Athenaで取得したデータをもとにVegaでグラフを作成します。今回は、下の画像のような散布図を作成します。作業は大きく分けて、Goでの作業とVegaでの作業に分かれます。

visualization.png

Vegaとは?

グラフ作成ツールで、json形式のデータを読み込ませ、簡単なグラフから複雑なインタラクティブなグラフまで、様々なグラフを作ることができます。

Goでの作業

準備

.envを用意

GoからAWSにアクセスするために、AWSクレデンシャルを記載するファイルを用意します。

  1. まずは、.envファイルを用意します。
  2. .envにAWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEYを記載します。スイッチロールが必要な場合はROLE_ARNを指定します。
AWS_ACCESS_KEY_ID=XXXXX
AWS_SECRET_ACCESS_KEY=XXXXX
ROLE_ARN=XXXXX

go mod 初期化

Goのプログラムで必要なパッケージをダウンロードするために、下記のコマンドをターミナルで実行します。この時、go mod init hogehogeの部分には、好きな文字列を指定することができ、ここで指定した文字列は、コンパイル後の実行ファイルの名前になります。

$ go mod init hoge

上記のコマンドを実行後、go.modというファイルができていればOK。

main.go

全体像

package main

import (
	"bytes"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"os"
	"strings"
	"log"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/credentials"
	"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/athena"

	"github.com/joho/godotenv"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
)

type Plot_data struct {
	Unixtime string `json:"unixtime"`
	Duration string `json:"duration"`
}

const (
	database = "aucfan_services_log_partition"
	output_location = "s3://output_bucket/2020/12/07"
	query = `
		SELECT unixtime, duration
		FROM aucfan_paapi_nginx_errorlog
		WHERE year = 2020 AND month = 6 AND day = 1  AND status_code = 429
		ORDER BY unixtime;
	`
)

func newSession() *session.Session {
	awscfg := &aws.Config{
		Region: aws.String("ap-northeast-1"),
		Credentials: credentials.NewStaticCredentialsFromCreds(credentials.Value{
			AccessKeyID:     os.Getenv("AWS_ACCESS_KEY_ID"),
			SecretAccessKey: os.Getenv("AWS_SECRET_ACCESS_KEY"),
		}),
	}
	sess := session.Must(session.NewSession(awscfg))

	if len(os.Getenv("ROLE_ARN")) == 0 {
		return sess
	}

	creds := stscreds.NewCredentials(sess, os.Getenv("ROLE_ARN"))
	config := aws.Config{Region: sess.Config.Region, Credentials: creds}
	sSess := session.New(&config)

	return sSess
}

//クエリ実行
func runQuery(query string) error {
	var (
		err error
		r athena.ResultConfiguration
	)
	sSess := newSession()
	svc := athena.New(sSess, aws.NewConfig().WithRegion("ap-northeast-1"))

	//クエリをセット
	var s athena.StartQueryExecutionInput
	s.SetQueryString(query)

	//データベースをセット
	var q athena.QueryExecutionContext
	q.SetDatabase(database)
	s.SetQueryExecutionContext(&q)
	
	//結果の吐き出し用のS3バケット
	fmt.Println(output_location)
	r.SetOutputLocation(output_location)
	s.SetResultConfiguration(&r)

	//クエリ実行
	result, err := svc.StartQueryExecution(&s)
	if err != nil {
		return err
	}
	fmt.Println("StartQueryExecution result:")
	fmt.Println(result.GoString())

	var qri athena.GetQueryExecutionInput
	qri.SetQueryExecutionId(*result.QueryExecutionId)

	var qrop *athena.GetQueryExecutionOutput

	fmt.Println("waiting...")
	for {
		qrop, err = svc.GetQueryExecution(&qri)
		if err != nil {
			return err
		}
		if *qrop.QueryExecution.Status.State == "SUCCEEDED" || *qrop.QueryExecution.Status.State == "FAILED" {
			break
		}
	}
	if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
		err = plot(svc, result.QueryExecutionId)
		return err
	} else {
		fmt.Println(*qrop.QueryExecution.Status.State)
	}
	return err
}

// クエリ実行結果を取得
func getData(svc *athena.Athena, id *string, token *string) (*athena.GetQueryResultsOutput, *string, error) {
	var err error
	ip := &athena.GetQueryResultsInput{
		QueryExecutionId: id,
		NextToken:        token,
	}
	op, err := svc.GetQueryResults(ip)
	if err != nil {
		return nil, nil, err
	}
	return op, op.NextToken, err
}

// 読み込んだ結果をjsonに書き込む
func plot(svc *athena.Athena, id *string) error {
	var (
		token *string
		op    *athena.GetQueryResultsOutput
	)
	token = nil

	ioutil.WriteFile("data.json", []byte("["), os.ModePerm)
	f, err := os.OpenFile("data.json", os.O_APPEND|os.O_WRONLY, 0600)
	if err != nil {
		log.Print(err)
		return err
	}
	defer f.Close()
	
	for { //tokenを使い、全てのデータを取得する
		op, token, err = getData(svc, id, token)
		if err != nil {
			return err
		}
		var (
			data Plot_data
			eol  string //行末
		)

		//取得したデータをjsonファイルへ書き込む
		for i, s := range op.ResultSet.Rows {
			for j, t := range s.Data {
				if j == 0 { //Asia/Tokyoの部分を削除
					data.Unixtime = strings.Replace(*t.VarCharValue, " Asia/Tokyo", "", 1)
				} else {
					data.Duration = *t.VarCharValue
				}
			}
			if i != 0 {
				if i == (len(op.ResultSet.Rows)-1) && token == nil{
					eol = "]"
				} else {
					eol = ","
				}
				jsonBytes, err := json.Marshal(data)
				if err != nil {
					return err
				}
				out := new(bytes.Buffer)
				json.Indent(out, jsonBytes, "", "    ")
				plot := out.String() + eol
				fmt.Fprint(f, "\n" + plot)
			}
		}

		if token == nil {
			break
		}
	}
	return err
}

func handler(c echo.Context) error {
	fmt.Println(query)
	err := runQuery(query) //クエリ実行
	if err != nil {
		return err
	}

	return c.File("data.json")
}

func main() {
	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())
	e.Use(middleware.CORS())
	if err := godotenv.Load(".env"); err != nil {
		log.Fatal(err)
	}
	e.GET("/", handler)
	e.Start(":1323")
}

解説

定数

  • database
    • データを取得したいAthenaのデータベースを指定します。
  • output_location
    • クエリの結果の吐き出し先S3バケットを指定します。
  • query
    • 実行したいSQLをここに記述します 。今回のような散布図を作成する場合、SELECT x座標, y座標になるようにSQLを作ると良いでしょう。
const (
	database = "aucfan_services_log_partition"
	output_location = "s3://output_bucket/2020/12/07"
	query = `
		SELECT unixtime, duration
		FROM aucfan_paapi_nginx_errorlog
		WHERE year = 2020 AND month = 6 AND day = 1  AND status_code = 429
		ORDER BY unixtime;
	`
)

main

Go echoを使って、取得したAthenaのデータをJSON形式で返すAPIサーバーを立てます。
また、.envの読み込みもここでしています。godotenv.Loadの部分には作成した.envファイルまでのファイルパスを記述します。

func main() {
	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())
	e.Use(middleware.CORS())
	if err := godotenv.Load(".env"); err != nil {
		log.Fatal(err)
	}
	e.GET("/", handler)
	e.Start(":1323")
}

handler

http://localhost:1323/ にアクセスした時にこの関数が実行されます。ここでは、Athenaでクエリを実行後、取得したデータをまとめたjsonファイルを返しています。

func handler(c echo.Context) error {
	fmt.Println(query)
	err := runQuery(query) //クエリ実行
	if err != nil {
		return err
	}

	return c.File("data.json")
}

runQuery

クエリ、データベース、S3バケットを設定後、Athenaでクエリを実行します。クエリ実行後、クエリの実行が終わるまで待ちます。クエリの実行が成功(SUCCEEDED)したら、データを取得する作業に移ります。クエリの実行が失敗(FAILED)したら、処理を終了します。

func runQuery(query string) error {
	var (
		err error
		r athena.ResultConfiguration
	)
	sSess := newSession()
	svc := athena.New(sSess, aws.NewConfig().WithRegion("ap-northeast-1"))

	//クエリをセット
	var s athena.StartQueryExecutionInput
	s.SetQueryString(query)

	//データベースをセット
	var q athena.QueryExecutionContext
	q.SetDatabase(database)
	s.SetQueryExecutionContext(&q)
	
	//結果の吐き出し用のS3バケット
	fmt.Println(output_location)
	r.SetOutputLocation(output_location)
	s.SetResultConfiguration(&r)

	//クエリ実行
	result, err := svc.StartQueryExecution(&s)
	if err != nil {
		return err
	}
	fmt.Println("StartQueryExecution result:")
	fmt.Println(result.GoString())

	var qri athena.GetQueryExecutionInput
	qri.SetQueryExecutionId(*result.QueryExecutionId)

	var qrop *athena.GetQueryExecutionOutput

	fmt.Println("waiting...")
	for {
		qrop, err = svc.GetQueryExecution(&qri)
		if err != nil {
			return err
		}
		if *qrop.QueryExecution.Status.State == "SUCCEEDED" || *qrop.QueryExecution.Status.State == "FAILED" {
			break
		}
	}
	if *qrop.QueryExecution.Status.State == "SUCCEEDED" {
		err = plot(svc, result.QueryExecutionId)
		return err
	} else {
		fmt.Println(*qrop.QueryExecution.Status.State)
	}
	return err
}

newSession

AWSに接続するためのSessionを作成します。ここで、.envに記載してクレデンシャル情報を利用しています。

func newSession() *session.Session {
	awscfg := &aws.Config{
		Region: aws.String("ap-northeast-1"),
		Credentials: credentials.NewStaticCredentialsFromCreds(credentials.Value{
			AccessKeyID:     os.Getenv("AWS_ACCESS_KEY_ID"),
			SecretAccessKey: os.Getenv("AWS_SECRET_ACCESS_KEY"),
		}),
	}
	sess := session.Must(session.NewSession(awscfg))

	if len(os.Getenv("ROLE_ARN")) == 0 {
		return sess
	}

	creds := stscreds.NewCredentials(sess, os.Getenv("ROLE_ARN"))
	config := aws.Config{Region: sess.Config.Region, Credentials: creds}
	sSess := session.New(&config)

	return sSess
}

plot

クエリの実行結果を読み込み、jsonファイルに書き込みます。forループの中では、1行ずつ結果を読み込み、jsonファイルに書き込んでいます。

func plot(svc *athena.Athena, id *string) error {
	var (
		token *string
		op    *athena.GetQueryResultsOutput
	)
	token = nil

	ioutil.WriteFile("data.json", []byte("["), os.ModePerm)
	f, err := os.OpenFile("data.json", os.O_APPEND|os.O_WRONLY, 0600)
	if err != nil {
		log.Print(err)
		return err
	}
	defer f.Close()
	
	for { //tokenを使い、全てのデータを取得する
		op, token, err = getData(svc, id, token)
		if err != nil {
			return err
		}
		var (
			data Plot_data
			eol  string //行末
		)

		//取得したデータをjsonファイルへ書き込む
		for i, s := range op.ResultSet.Rows {
			for j, t := range s.Data {
				if j == 0 { //Asia/Tokyoの部分を削除
					data.Unixtime = strings.Replace(*t.VarCharValue, " Asia/Tokyo", "", 1)
				} else {
					data.Duration = *t.VarCharValue
				}
			}
			if i != 0 {
				if i == (len(op.ResultSet.Rows)-1) && token == nil{
					eol = "]"
				} else {
					eol = ","
				}
				jsonBytes, err := json.Marshal(data)
				if err != nil {
					return err
				}
				out := new(bytes.Buffer)
				json.Indent(out, jsonBytes, "", "    ")
				plot := out.String() + eol
				fmt.Fprint(f, "\n" + plot)
			}
		}

		if token == nil {
			break
		}
	}
	return err
}

getData

Athenaのクエリ実行結果を取得している部分です。実行結果は一度に最大1000件までしか取得できないため、1000件を超える場合はtokenを利用して、ページングにより次のデータを取得することができます。ループとtokenを使えば、全てのデータを取得することができます。

func getData(svc *athena.Athena, id *string, token *string) (*athena.GetQueryResultsOutput, *string, error) {
	var err error
	ip := &athena.GetQueryResultsInput{
		QueryExecutionId: id,
		NextToken:        token,
	}
	op, err := svc.GetQueryResults(ip)
	if err != nil {
		return nil, nil, err
	}
	return op, op.NextToken, err
}

確認

1.コンパイルします

$ go build

2.プログラムを実行します

$ ./hoge

3.ブラウザで http://localhost:1323/ にアクセスし、jsonデータが表示されればOK。
スクリーンショット 2020-12-08 12.52.50.png

Vegaでの作業

  1. https://vega.github.io/editor/ にアクセスします。
  2. エディタに下記のような内容を記述すれば、右側に空っぽのグラフが作成されます。
{
  "$schema": "https://vega.github.io/schema/vega/v5.json",
  "width": 400,
  "height": 400,
  "padding": {"left": 25, "top": 20, "right": 5, "bottom": 80},
  "title": {"text": "Graph"},
  "data": [
    {
      "name": "table",
      "format": {
        "type": "json",
        "parse": {"unixtime": "date", "duration": "number"}
      },
      "async": true,
      "url": "http://localhost:1323/"
    }
  ],
  "scales": [
    {
      "name": "xscale",
      "type": "time",
      "domain": {"data": "table", "field": "unixtime"},
      "range": "width",
      "padding": 0.05,
      "round": true
    },
    {
      "name": "yscale",
      "domain": {"data": "table", "field": "duration"},
      "nice": true,
      "range": "height"
    }
  ],
  "axes": [
    {
      "orient": "bottom",
      "scale": "xscale",
      "labelAngle": 90,
      "labelAlign": "left",
      "format": "%b %d %H:%M"
    },
    {"orient": "left", "scale": "yscale", "title": "Response Time [sec]"}
  ],
  "marks": [
    {
      "type": "symbol",
      "from": {"data": "table"},
      "encode": {
        "enter": {
          "size": {"value": 5},
          "x": {"scale": "xscale", "field": "unixtime"},
          "y": {"scale": "yscale", "field": "duration"}
        }
      }
    }
  ]
}

スクリーンショット 2020-12-08 13.08.55.png

3.先ほど作成したgoのプログラムを実行します

$ ./hoge

4.ブラウザのページを更新し、数秒後にグラフが表示されればOK。

スクリーンショット 2020-12-08 13.17.39.png

8
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?