LoginSignup
16
8

More than 5 years have passed since last update.

aws-sdk-goを使ってAthenaでクエリを実行する

Last updated at Posted at 2017-12-24

こんにちは、LIFULLのchissoです。

この記事はLIFULL Advent Calendar 2017 その2の12/24分です。

世間はクリスマスイブらしいです。
私は無宗教なので関係ありません。リア充爆発しろ。

本記事では、Athenaをgolangから実行してみたので、その紹介をしたいと思います。Lambdaがgolangをサポートするらしいので夢が膨らみますね。

Athena

みなさん、Amazon Athena使っていますか?

Amazon Athena はインタラクティブなクエリサービスで、Amazon S3 内のデータを標準的な SQL を使用して簡単に分析できます。Athena はサーバーレスなので、インフラストラクチャの管理は不要です。実行したクエリに対してのみ料金が発生します。

S3に溜め込んだログファイルや大量のCSVに対して、気軽にhiveクエリを実行できるやつです。

howto

導入には下記あたりの記事が参考になります。

S3のデータをAmazon Athenaを使って分析する
Amazon Athenaを使ってみました #reinvent
AWS Black Belt Online Seminar 2017 Amazon Athena

tuning

クエリのパフォーマンスを上げたりコストを抑えるためには、いかにデータスキャン量を減らすかがキモとなります。
また、Athenaの実行リージョンとS3バケットのリージョンが異なると、リージョン間でのデータ転送が発生してそのコストもかかりますのでご注意を。

Amazon Athena のパフォーマンスチューニング Tips トップ 10
現場で運用する視点から見た Amazon Athena
ぼくがAthenaで死ぬまで

aws-sdk-goからAthenaを呼び出す

AWSのAthena apiドキュメントにjavaのサンプルはあるのですが、golangはなかったため、aws-sdk-goのドキュメントと、awsのapiドキュメントをにらめっこしながら実装しました。
同じようなことをされる方の参考になれば幸いです。

前提

  • aws-sdk-go (1.12.44)

処理の流れ

大まかに下記の流れです。

  1. Athena用のAPIクライアントを初期化する
  2. 実行するクエリの設定をする
  3. クエリを実行する
  4. クエリの状況を監視する
  5. クエリの結果を取得する

コードのサンプルと共に順に見ていきます。

1. Athena用のAPIクライアントを初期化する

aws-sdk-goを使ったことがある方ならいつものやつです。
https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#New
ACCESS_KEY_IDとSECRET_ACCESS_KEYでcredential, sessionを作成して、clientを初期化します。

var athenaClient *athena.Athena

func init() {
    cred := credentials.NewStaticCredentials(
        os.Getenv("AWS_ACCESS_KEY_ID"),
        os.Getenv("AWS_SECRET_ACCESS_KEY"),
        "",
    )
    conf := aws.Config{
        Region:      aws.String(os.Getenv("AWS_DEFAULT_REGION")),
        Credentials: cred,
    }
    sess := session.New(&conf)
    athenaClient = athena.New(sess)
}

注意点としては、AthenaからS3にアクセスする際、Athenaを呼び出す上記ACCESS_KEYのユーザー権限で対象のS3へアクセスします。このため、ここで使うユーザーにAthena/S3双方の権限を設定するようにしてください。

2. 実行するクエリの設定をする

実行したいクエリの設定をしましょう。aws-sdk-goでいうと、StartQueryExecutionInputを作成することになります。
NamedQueryを作成して呼び出すこともできますが、私はむしろ管理が煩雑になる気がするので、普通にクエリを実行するようにします。

StartQueryExecutionInputは、ドキュメントの通りQueryString
(string)
ResultConfigurationが必須項目です。

QueryStringは実行するクエリをそのまま渡すだけです。ただし、型がstringではなく*stringのため、一度変数にいれたりする必要があります。
参考)golang で string のポインタを取得する

Athenaで実行したクエリの結果はS3に自動保存されるのですが、ResultConfigurationは、その保存先と保存時に暗号化するかどうかを設定します。

resultConf := &athena.ResultConfiguration{}
resultConf.SetOutputLocation(os.Getenv("AWS_S3_BUCKET_FOR_ATHENA_RESULT"))

input := &athena.StartQueryExecutionInput{
    QueryString:         query,
    ResultConfiguration: resultConf,
}

なお、OutputLocationにはバケット+パスまで設定できるので、s3://{:bucket}/result/{:ymd}/みたいなことをすると後で参照しやすいです。

3. クエリを実行する

ここは特にありません。StartQueryExecutionInputを引数に、StartQueryExecutionを呼び出すのみです。
ただ、StartQueryExecutionクエリ実行のリクエストにすぎないことにご注意ください。

output, err := athenaClient.StartQueryExecution(input)

4. クエリの状況を監視する

3でクエリの実行をリクエストしました。その返り値として、StartQueryExecutionOutputが得られます。
StartQueryExecutionOutputQueryExecutionIdを持っており、このQueryExecutionIdがこのあと重要になります。

まずはリクエストしたクエリが完了するまで監視します。
GetQueryExecutionというAPIが、クエリの実行状況を返却してくれます。引数の
GetQueryExecutionInputに先ほど実行したクエリのQueryExecutionIdを設定してリクエストします。
GetQueryExecutionOutputQueryExecution.Status.Stateを確認して、"SUCCEEDED", "FAILED"など完了が確認できるまで繰り返しリクエストしましょう。

id = output.QueryExecutionId

executionInput := &athena.GetQueryExecutionInput{
    QueryExecutionId: id,
}

// 終わるまで待つ
var executionOutput *athena.GetQueryExecutionOutput

for {
    executionOutput, err = athenaClient.GetQueryExecution(executionInput)
    if err != nil {
        return nil, err
    }
    // executionOutput.QueryExecution.Status.Stateは*string
    switch *executionOutput.QueryExecution.Status.State {
    // Stateはconstで管理されている
    // https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#pkg-consts
    case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
        // 待って再実行
        time.Sleep(5 * time.Second)
    case athena.QueryExecutionStateSucceeded:
        return id, nil
    default: // athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled
        return nil, errors.New(executionOutput.String())
    }
}

5. クエリの結果を取得する

クエリIDなどを含めたGetQueryResultsInputを引数に、GetQueryResultsを呼び出しましょう。

ここに関してはサンプルコードありません。というのも、結果を少しずつ取得できるのはよい反面、ResultSet, Row, Datumで順次ラップされてるのでむしろ扱いが面倒で。。
最初にStartQueryExecutionInput.ResultConfigurationで設定したS3のパスに、結果CSVが{QueryExecutionId}.csvという名前で保存されているため、普通にS3のAPIでダウンロードしてenconding/csvでパースしたほうが扱いやすいです。

サンプルコードまとめ


package athenaclient

import (
    "errors"
    "os"
    "time"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/credentials"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/athena"
)

var athenaClient *athena.Athena

func init() {
    cred := credentials.NewStaticCredentials(
        os.Getenv("AWS_ACCESS_KEY_ID"),     // Access key
        os.Getenv("AWS_SECRET_ACCESS_KEY"), // secret
        "",
    )
    conf := aws.Config{
        Region:      aws.String(os.Getenv("AWS_DEFAULT_REGION")),
        Credentials: cred,
    }
    sess := session.New(&conf)
    athenaClient = athena.New(sess)
}

var ExecuteAthenaQuery = func(query *string) (id *string, err error) {
    resultConf := &athena.ResultConfiguration{}
    resultConf.SetOutputLocation(os.Getenv("AWS_S3_BUCKET_FOR_ATHENA_RESULT"))

    input := &athena.StartQueryExecutionInput{
        QueryString:         query,
        ResultConfiguration: resultConf,
    }

    output, err := athenaClient.StartQueryExecution(input)
    if err != nil {
        return nil, err
    }

    id = output.QueryExecutionId
    executionInput := &athena.GetQueryExecutionInput{
        QueryExecutionId: id,
    }

    // 終わるまで待つ
    var executionOutput *athena.GetQueryExecutionOutput

    for {
        executionOutput, err = athenaClient.GetQueryExecution(executionInput)
        if err != nil {
            return nil, err
        }
        // executionOutput.QueryExecution.Status.Stateは*string
        switch *executionOutput.QueryExecution.Status.State {
            // Stateはconstで管理されている
            // https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#pkg-consts
            case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
                // 待って再実行
                time.Sleep(5 * time.Second)
            case athena.QueryExecutionStateSucceeded:
                return id, nil
            default: // athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled
                return nil, errors.New(executionOutput.String())
        }
    }
}

なお、実行用のfuncvar宣言しているのは、clientをスタブしてmodel層のテストを実行するためです。
詳しくは昨年のAdvent Calenderの記事をご参照ください。

16
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
8