こんにちは、LIFULLのchissoです。
この記事はLIFULL Advent Calendar 2017 その2の12/24分です。
世間はクリスマスイブらしいです。
私は無宗教なので関係ありません。リア充爆発しろ。
本記事では、Athenaをgolangから実行してみたので、その紹介をしたいと思います。Lambdaがgolangをサポートするらしいので夢が膨らみますね。
Athena
みなさん、Amazon Athena使っていますか?
Amazon Athena はインタラクティブなクエリサービスで、Amazon S3 内のデータを標準的な SQL を使用して簡単に分析できます。Athena はサーバーレスなので、インフラストラクチャの管理は不要です。実行したクエリに対してのみ料金が発生します。
S3に溜め込んだログファイルや大量のCSVに対して、気軽にhiveクエリを実行できるやつです。
howto
導入には下記あたりの記事が参考になります。
S3のデータをAmazon Athenaを使って分析する
Amazon Athenaを使ってみました #reinvent
AWS Black Belt Online Seminar 2017 Amazon Athena
tuning
クエリのパフォーマンスを上げたりコストを抑えるためには、いかにデータスキャン量を減らすかがキモとなります。
また、Athenaの実行リージョンとS3バケットのリージョンが異なると、リージョン間でのデータ転送が発生してそのコストもかかりますのでご注意を。
Amazon Athena のパフォーマンスチューニング Tips トップ 10
現場で運用する視点から見た Amazon Athena
ぼくがAthenaで死ぬまで
aws-sdk-goからAthenaを呼び出す
AWSのAthena apiドキュメントにjavaのサンプルはあるのですが、golangはなかったため、aws-sdk-goのドキュメントと、awsのapiドキュメントをにらめっこしながら実装しました。
同じようなことをされる方の参考になれば幸いです。
前提
- aws-sdk-go (1.12.44)
処理の流れ
大まかに下記の流れです。
- Athena用のAPIクライアントを初期化する
- 実行するクエリの設定をする
- クエリを実行する
- クエリの状況を監視する
- クエリの結果を取得する
コードのサンプルと共に順に見ていきます。
1. Athena用のAPIクライアントを初期化する
aws-sdk-goを使ったことがある方ならいつものやつです。
https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#New
ACCESS_KEY_IDとSECRET_ACCESS_KEYでcredential, sessionを作成して、clientを初期化します。
var athenaClient *athena.Athena
func init() {
cred := credentials.NewStaticCredentials(
os.Getenv("AWS_ACCESS_KEY_ID"),
os.Getenv("AWS_SECRET_ACCESS_KEY"),
"",
)
conf := aws.Config{
Region: aws.String(os.Getenv("AWS_DEFAULT_REGION")),
Credentials: cred,
}
sess := session.New(&conf)
athenaClient = athena.New(sess)
}
注意点としては、AthenaからS3にアクセスする際、Athenaを呼び出す上記ACCESS_KEYのユーザー権限で対象のS3へアクセスします。このため、ここで使うユーザーにAthena/S3双方の権限を設定するようにしてください。
2. 実行するクエリの設定をする
実行したいクエリの設定をしましょう。aws-sdk-goでいうと、StartQueryExecutionInputを作成することになります。
NamedQueryを作成して呼び出すこともできますが、私はむしろ管理が煩雑になる気がするので、普通にクエリを実行するようにします。
StartQueryExecutionInputは、ドキュメントの通りQueryString (string)とResultConfigurationが必須項目です。
QueryStringは実行するクエリをそのまま渡すだけです。ただし、型がstringではなく*stringのため、一度変数にいれたりする必要があります。
参考)golang で string のポインタを取得する
Athenaで実行したクエリの結果はS3に自動保存されるのですが、ResultConfigurationは、その保存先と保存時に暗号化するかどうかを設定します。
resultConf := &athena.ResultConfiguration{}
resultConf.SetOutputLocation(os.Getenv("AWS_S3_BUCKET_FOR_ATHENA_RESULT"))
input := &athena.StartQueryExecutionInput{
QueryString: query,
ResultConfiguration: resultConf,
}
なお、OutputLocationにはバケット+パスまで設定できるので、s3://{:bucket}/result/{:ymd}/みたいなことをすると後で参照しやすいです。
3. クエリを実行する
ここは特にありません。StartQueryExecutionInputを引数に、StartQueryExecutionを呼び出すのみです。
ただ、StartQueryExecutionはクエリ実行のリクエストにすぎないことにご注意ください。
output, err := athenaClient.StartQueryExecution(input)
4. クエリの状況を監視する
3でクエリの実行をリクエストしました。その返り値として、StartQueryExecutionOutputが得られます。
StartQueryExecutionOutputはQueryExecutionIdを持っており、このQueryExecutionIdがこのあと重要になります。
まずはリクエストしたクエリが完了するまで監視します。
GetQueryExecutionというAPIが、クエリの実行状況を返却してくれます。引数の
GetQueryExecutionInputに先ほど実行したクエリのQueryExecutionIdを設定してリクエストします。
GetQueryExecutionOutputのQueryExecution.Status.Stateを確認して、"SUCCEEDED", "FAILED"など完了が確認できるまで繰り返しリクエストしましょう。
id = output.QueryExecutionId
executionInput := &athena.GetQueryExecutionInput{
QueryExecutionId: id,
}
// 終わるまで待つ
var executionOutput *athena.GetQueryExecutionOutput
for {
executionOutput, err = athenaClient.GetQueryExecution(executionInput)
if err != nil {
return nil, err
}
// executionOutput.QueryExecution.Status.Stateは*string
switch *executionOutput.QueryExecution.Status.State {
// Stateはconstで管理されている
// https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#pkg-consts
case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
// 待って再実行
time.Sleep(5 * time.Second)
case athena.QueryExecutionStateSucceeded:
return id, nil
default: // athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled
return nil, errors.New(executionOutput.String())
}
}
5. クエリの結果を取得する
クエリIDなどを含めたGetQueryResultsInputを引数に、GetQueryResultsを呼び出しましょう。
ここに関してはサンプルコードありません。というのも、結果を少しずつ取得できるのはよい反面、ResultSet, Row, Datumで順次ラップされてるのでむしろ扱いが面倒で。。
最初にStartQueryExecutionInput.ResultConfigurationで設定したS3のパスに、結果CSVが{QueryExecutionId}.csvという名前で保存されているため、普通にS3のAPIでダウンロードしてenconding/csvでパースしたほうが扱いやすいです。
サンプルコードまとめ
package athenaclient
import (
"errors"
"os"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/athena"
)
var athenaClient *athena.Athena
func init() {
cred := credentials.NewStaticCredentials(
os.Getenv("AWS_ACCESS_KEY_ID"), // Access key
os.Getenv("AWS_SECRET_ACCESS_KEY"), // secret
"",
)
conf := aws.Config{
Region: aws.String(os.Getenv("AWS_DEFAULT_REGION")),
Credentials: cred,
}
sess := session.New(&conf)
athenaClient = athena.New(sess)
}
var ExecuteAthenaQuery = func(query *string) (id *string, err error) {
resultConf := &athena.ResultConfiguration{}
resultConf.SetOutputLocation(os.Getenv("AWS_S3_BUCKET_FOR_ATHENA_RESULT"))
input := &athena.StartQueryExecutionInput{
QueryString: query,
ResultConfiguration: resultConf,
}
output, err := athenaClient.StartQueryExecution(input)
if err != nil {
return nil, err
}
id = output.QueryExecutionId
executionInput := &athena.GetQueryExecutionInput{
QueryExecutionId: id,
}
// 終わるまで待つ
var executionOutput *athena.GetQueryExecutionOutput
for {
executionOutput, err = athenaClient.GetQueryExecution(executionInput)
if err != nil {
return nil, err
}
// executionOutput.QueryExecution.Status.Stateは*string
switch *executionOutput.QueryExecution.Status.State {
// Stateはconstで管理されている
// https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#pkg-consts
case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
// 待って再実行
time.Sleep(5 * time.Second)
case athena.QueryExecutionStateSucceeded:
return id, nil
default: // athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled
return nil, errors.New(executionOutput.String())
}
}
}
なお、実行用のfuncをvar宣言しているのは、clientをスタブしてmodel層のテストを実行するためです。
詳しくは昨年のAdvent Calenderの記事をご参照ください。