こんにちは、LIFULLのchissoです。
この記事はLIFULL Advent Calendar 2017 その2の12/24分です。
世間はクリスマスイブらしいです。
私は無宗教なので関係ありません。リア充爆発しろ。
本記事では、Athenaをgolangから実行してみたので、その紹介をしたいと思います。Lambdaがgolangをサポートするらしいので夢が膨らみますね。
#Athena
みなさん、Amazon Athena使っていますか?
Amazon Athena はインタラクティブなクエリサービスで、Amazon S3 内のデータを標準的な SQL を使用して簡単に分析できます。Athena はサーバーレスなので、インフラストラクチャの管理は不要です。実行したクエリに対してのみ料金が発生します。
S3に溜め込んだログファイルや大量のCSVに対して、気軽にhiveクエリを実行できるやつです。
##howto
導入には下記あたりの記事が参考になります。
S3のデータをAmazon Athenaを使って分析する
Amazon Athenaを使ってみました #reinvent
AWS Black Belt Online Seminar 2017 Amazon Athena
##tuning
クエリのパフォーマンスを上げたりコストを抑えるためには、いかにデータスキャン量を減らすかがキモとなります。
また、Athenaの実行リージョンとS3バケットのリージョンが異なると、リージョン間でのデータ転送が発生してそのコストもかかりますのでご注意を。
Amazon Athena のパフォーマンスチューニング Tips トップ 10
現場で運用する視点から見た Amazon Athena
ぼくがAthenaで死ぬまで
#aws-sdk-goからAthenaを呼び出す
AWSのAthena apiドキュメントにjavaのサンプルはあるのですが、golangはなかったため、aws-sdk-goのドキュメントと、awsのapiドキュメントをにらめっこしながら実装しました。
同じようなことをされる方の参考になれば幸いです。
###前提
- aws-sdk-go (1.12.44)
###処理の流れ
大まかに下記の流れです。
- Athena用のAPIクライアントを初期化する
- 実行するクエリの設定をする
- クエリを実行する
- クエリの状況を監視する
- クエリの結果を取得する
コードのサンプルと共に順に見ていきます。
##1. Athena用のAPIクライアントを初期化する
aws-sdk-goを使ったことがある方ならいつものやつです。
https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#New
ACCESS_KEY_IDとSECRET_ACCESS_KEYでcredential, sessionを作成して、clientを初期化します。
var athenaClient *athena.Athena
func init() {
cred := credentials.NewStaticCredentials(
os.Getenv("AWS_ACCESS_KEY_ID"),
os.Getenv("AWS_SECRET_ACCESS_KEY"),
"",
)
conf := aws.Config{
Region: aws.String(os.Getenv("AWS_DEFAULT_REGION")),
Credentials: cred,
}
sess := session.New(&conf)
athenaClient = athena.New(sess)
}
注意点としては、AthenaからS3にアクセスする際、Athenaを呼び出す上記ACCESS_KEYのユーザー権限で対象のS3へアクセスします。このため、ここで使うユーザーにAthena/S3双方の権限を設定するようにしてください。
##2. 実行するクエリの設定をする
実行したいクエリの設定をしましょう。aws-sdk-goでいうと、StartQueryExecutionInput
を作成することになります。
NamedQuery
を作成して呼び出すこともできますが、私はむしろ管理が煩雑になる気がするので、普通にクエリを実行するようにします。
StartQueryExecutionInput
は、ドキュメントの通りQueryString (string)
とResultConfiguration
が必須項目です。
QueryString
は実行するクエリをそのまま渡すだけです。ただし、型がstring
ではなく*string
のため、一度変数にいれたりする必要があります。
参考)golang で string のポインタを取得する
Athenaで実行したクエリの結果はS3に自動保存されるのですが、ResultConfiguration
は、その保存先と保存時に暗号化するかどうかを設定します。
resultConf := &athena.ResultConfiguration{}
resultConf.SetOutputLocation(os.Getenv("AWS_S3_BUCKET_FOR_ATHENA_RESULT"))
input := &athena.StartQueryExecutionInput{
QueryString: query,
ResultConfiguration: resultConf,
}
なお、OutputLocation
にはバケット+パスまで設定できるので、s3://{:bucket}/result/{:ymd}/
みたいなことをすると後で参照しやすいです。
##3. クエリを実行する
ここは特にありません。StartQueryExecutionInput
を引数に、StartQueryExecution
を呼び出すのみです。
ただ、StartQueryExecution
はクエリ実行のリクエストにすぎないことにご注意ください。
output, err := athenaClient.StartQueryExecution(input)
##4. クエリの状況を監視する
3でクエリの実行をリクエストしました。その返り値として、StartQueryExecutionOutput
が得られます。
StartQueryExecutionOutput
はQueryExecutionId
を持っており、このQueryExecutionId
がこのあと重要になります。
まずはリクエストしたクエリが完了するまで監視します。
GetQueryExecution
というAPIが、クエリの実行状況を返却してくれます。引数の
GetQueryExecutionInput
に先ほど実行したクエリのQueryExecutionId
を設定してリクエストします。
GetQueryExecutionOutput
のQueryExecution.Status.State
を確認して、"SUCCEEDED", "FAILED"など完了が確認できるまで繰り返しリクエストしましょう。
id = output.QueryExecutionId
executionInput := &athena.GetQueryExecutionInput{
QueryExecutionId: id,
}
// 終わるまで待つ
var executionOutput *athena.GetQueryExecutionOutput
for {
executionOutput, err = athenaClient.GetQueryExecution(executionInput)
if err != nil {
return nil, err
}
// executionOutput.QueryExecution.Status.Stateは*string
switch *executionOutput.QueryExecution.Status.State {
// Stateはconstで管理されている
// https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#pkg-consts
case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
// 待って再実行
time.Sleep(5 * time.Second)
case athena.QueryExecutionStateSucceeded:
return id, nil
default: // athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled
return nil, errors.New(executionOutput.String())
}
}
##5. クエリの結果を取得する
クエリIDなどを含めたGetQueryResultsInput
を引数に、GetQueryResults
を呼び出しましょう。
ここに関してはサンプルコードありません。というのも、結果を少しずつ取得できるのはよい反面、ResultSet
, Row
, Datum
で順次ラップされてるのでむしろ扱いが面倒で。。
最初にStartQueryExecutionInput.ResultConfiguration
で設定したS3のパスに、結果CSVが{QueryExecutionId}.csv
という名前で保存されているため、普通にS3のAPIでダウンロードしてenconding/csv
でパースしたほうが扱いやすいです。
#サンプルコードまとめ
https://docs.aws.amazon.com/sdk-for-go/api/service/athena/
package athenaclient
import (
"errors"
"os"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/athena"
)
var athenaClient *athena.Athena
func init() {
cred := credentials.NewStaticCredentials(
os.Getenv("AWS_ACCESS_KEY_ID"), // Access key
os.Getenv("AWS_SECRET_ACCESS_KEY"), // secret
"",
)
conf := aws.Config{
Region: aws.String(os.Getenv("AWS_DEFAULT_REGION")),
Credentials: cred,
}
sess := session.New(&conf)
athenaClient = athena.New(sess)
}
var ExecuteAthenaQuery = func(query *string) (id *string, err error) {
resultConf := &athena.ResultConfiguration{}
resultConf.SetOutputLocation(os.Getenv("AWS_S3_BUCKET_FOR_ATHENA_RESULT"))
input := &athena.StartQueryExecutionInput{
QueryString: query,
ResultConfiguration: resultConf,
}
output, err := athenaClient.StartQueryExecution(input)
if err != nil {
return nil, err
}
id = output.QueryExecutionId
executionInput := &athena.GetQueryExecutionInput{
QueryExecutionId: id,
}
// 終わるまで待つ
var executionOutput *athena.GetQueryExecutionOutput
for {
executionOutput, err = athenaClient.GetQueryExecution(executionInput)
if err != nil {
return nil, err
}
// executionOutput.QueryExecution.Status.Stateは*string
switch *executionOutput.QueryExecution.Status.State {
// Stateはconstで管理されている
// https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#pkg-consts
case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
// 待って再実行
time.Sleep(5 * time.Second)
case athena.QueryExecutionStateSucceeded:
return id, nil
default: // athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled
return nil, errors.New(executionOutput.String())
}
}
}
なお、実行用のfunc
をvar
宣言しているのは、clientをスタブしてmodel層のテストを実行するためです。
詳しくは昨年のAdvent Calenderの記事をご参照ください。