aws-sdk-goを使ってAthenaでクエリを実行する

More than 1 year has passed since last update.

こんにちは、LIFULLのchissoです。

この記事はLIFULL Advent Calendar 2017 その2の12/24分です。

世間はクリスマスイブらしいです。

私は無宗教なので関係ありません。リア充爆発しろ。

本記事では、Athenaをgolangから実行してみたので、その紹介をしたいと思います。Lambdaがgolangをサポートするらしいので夢が膨らみますね。


Athena

みなさん、Amazon Athena使っていますか?


Amazon Athena はインタラクティブなクエリサービスで、Amazon S3 内のデータを標準的な SQL を使用して簡単に分析できます。Athena はサーバーレスなので、インフラストラクチャの管理は不要です。実行したクエリに対してのみ料金が発生します。


S3に溜め込んだログファイルや大量のCSVに対して、気軽にhiveクエリを実行できるやつです。


howto

導入には下記あたりの記事が参考になります。

S3のデータをAmazon Athenaを使って分析する

Amazon Athenaを使ってみました #reinvent

AWS Black Belt Online Seminar 2017 Amazon Athena


tuning

クエリのパフォーマンスを上げたりコストを抑えるためには、いかにデータスキャン量を減らすかがキモとなります。

また、Athenaの実行リージョンとS3バケットのリージョンが異なると、リージョン間でのデータ転送が発生してそのコストもかかりますのでご注意を。

Amazon Athena のパフォーマンスチューニング Tips トップ 10

現場で運用する視点から見た Amazon Athena

ぼくがAthenaで死ぬまで


aws-sdk-goからAthenaを呼び出す

AWSのAthena apiドキュメントにjavaのサンプルはあるのですが、golangはなかったため、aws-sdk-goのドキュメントと、awsのapiドキュメントをにらめっこしながら実装しました。

同じようなことをされる方の参考になれば幸いです。


前提


  • aws-sdk-go (1.12.44)


処理の流れ

大まかに下記の流れです。


  1. Athena用のAPIクライアントを初期化する

  2. 実行するクエリの設定をする

  3. クエリを実行する

  4. クエリの状況を監視する

  5. クエリの結果を取得する

コードのサンプルと共に順に見ていきます。


1. Athena用のAPIクライアントを初期化する

aws-sdk-goを使ったことがある方ならいつものやつです。

https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#New

ACCESS_KEY_IDとSECRET_ACCESS_KEYでcredential, sessionを作成して、clientを初期化します。

var athenaClient *athena.Athena

func init() {
cred := credentials.NewStaticCredentials(
os.Getenv("AWS_ACCESS_KEY_ID"),
os.Getenv("AWS_SECRET_ACCESS_KEY"),
"",
)
conf := aws.Config{
Region: aws.String(os.Getenv("AWS_DEFAULT_REGION")),
Credentials: cred,
}
sess := session.New(&conf)
athenaClient = athena.New(sess)
}

注意点としては、AthenaからS3にアクセスする際、Athenaを呼び出す上記ACCESS_KEYのユーザー権限で対象のS3へアクセスします。このため、ここで使うユーザーにAthena/S3双方の権限を設定するようにしてください。


2. 実行するクエリの設定をする

実行したいクエリの設定をしましょう。aws-sdk-goでいうと、StartQueryExecutionInputを作成することになります。

NamedQueryを作成して呼び出すこともできますが、私はむしろ管理が煩雑になる気がするので、普通にクエリを実行するようにします。

StartQueryExecutionInputは、ドキュメントの通りQueryString

(string)
ResultConfigurationが必須項目です。

QueryStringは実行するクエリをそのまま渡すだけです。ただし、型がstringではなく*stringのため、一度変数にいれたりする必要があります。

参考)golang で string のポインタを取得する

Athenaで実行したクエリの結果はS3に自動保存されるのですが、ResultConfigurationは、その保存先と保存時に暗号化するかどうかを設定します。

resultConf := &athena.ResultConfiguration{}

resultConf.SetOutputLocation(os.Getenv("AWS_S3_BUCKET_FOR_ATHENA_RESULT"))

input := &athena.StartQueryExecutionInput{
QueryString: query,
ResultConfiguration: resultConf,
}

なお、OutputLocationにはバケット+パスまで設定できるので、s3://{:bucket}/result/{:ymd}/みたいなことをすると後で参照しやすいです。


3. クエリを実行する

ここは特にありません。StartQueryExecutionInputを引数に、StartQueryExecutionを呼び出すのみです。

ただ、StartQueryExecutionクエリ実行のリクエストにすぎないことにご注意ください。

output, err := athenaClient.StartQueryExecution(input)


4. クエリの状況を監視する

3でクエリの実行をリクエストしました。その返り値として、StartQueryExecutionOutputが得られます。

StartQueryExecutionOutputQueryExecutionIdを持っており、このQueryExecutionIdがこのあと重要になります。

まずはリクエストしたクエリが完了するまで監視します。

GetQueryExecutionというAPIが、クエリの実行状況を返却してくれます。引数の

GetQueryExecutionInputに先ほど実行したクエリのQueryExecutionIdを設定してリクエストします。

GetQueryExecutionOutputQueryExecution.Status.Stateを確認して、"SUCCEEDED", "FAILED"など完了が確認できるまで繰り返しリクエストしましょう。

id = output.QueryExecutionId

executionInput := &athena.GetQueryExecutionInput{
QueryExecutionId: id,
}

// 終わるまで待つ
var executionOutput *athena.GetQueryExecutionOutput

for {
executionOutput, err = athenaClient.GetQueryExecution(executionInput)
if err != nil {
return nil, err
}
// executionOutput.QueryExecution.Status.Stateは*string
switch *executionOutput.QueryExecution.Status.State {
// Stateはconstで管理されている
// https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#pkg-consts
case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
// 待って再実行
time.Sleep(5 * time.Second)
case athena.QueryExecutionStateSucceeded:
return id, nil
default: // athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled
return nil, errors.New(executionOutput.String())
}
}


5. クエリの結果を取得する

クエリIDなどを含めたGetQueryResultsInputを引数に、GetQueryResultsを呼び出しましょう。

ここに関してはサンプルコードありません。というのも、結果を少しずつ取得できるのはよい反面、ResultSet, Row, Datumで順次ラップされてるのでむしろ扱いが面倒で。。

最初にStartQueryExecutionInput.ResultConfigurationで設定したS3のパスに、結果CSVが{QueryExecutionId}.csvという名前で保存されているため、普通にS3のAPIでダウンロードしてenconding/csvでパースしたほうが扱いやすいです。


サンプルコードまとめ

https://docs.aws.amazon.com/sdk-for-go/api/service/athena/



package athenaclient

import (
"errors"
"os"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/athena"
)

var athenaClient *athena.Athena

func init() {
cred := credentials.NewStaticCredentials(
os.Getenv("AWS_ACCESS_KEY_ID"), // Access key
os.Getenv("AWS_SECRET_ACCESS_KEY"), // secret
"",
)
conf := aws.Config{
Region: aws.String(os.Getenv("AWS_DEFAULT_REGION")),
Credentials: cred,
}
sess := session.New(&conf)
athenaClient = athena.New(sess)
}

var ExecuteAthenaQuery = func(query *string) (id *string, err error) {
resultConf := &athena.ResultConfiguration{}
resultConf.SetOutputLocation(os.Getenv("AWS_S3_BUCKET_FOR_ATHENA_RESULT"))

input := &athena.StartQueryExecutionInput{
QueryString: query,
ResultConfiguration: resultConf,
}

output, err := athenaClient.StartQueryExecution(input)
if err != nil {
return nil, err
}

id = output.QueryExecutionId
executionInput := &athena.GetQueryExecutionInput{
QueryExecutionId: id,
}

// 終わるまで待つ
var executionOutput *athena.GetQueryExecutionOutput

for {
executionOutput, err = athenaClient.GetQueryExecution(executionInput)
if err != nil {
return nil, err
}
// executionOutput.QueryExecution.Status.Stateは*string
switch *executionOutput.QueryExecution.Status.State {
        // Stateはconstで管理されている
        // https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#pkg-consts
        case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
        // 待って再実行
        time.Sleep(5 * time.Second)
        case athena.QueryExecutionStateSucceeded:
        return id, nil
        default: // athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled
        return nil, errors.New(executionOutput.String())
}
}
}

なお、実行用のfuncvar宣言しているのは、clientをスタブしてmodel層のテストを実行するためです。

詳しくは昨年のAdvent Calenderの記事をご参照ください。