LoginSignup
8

More than 5 years have passed since last update.

aws-sdk-goを使ってAthenaでクエリを実行する

Last updated at Posted at 2017-12-24

こんにちは、LIFULLのchissoです。

この記事はLIFULL Advent Calendar 2017 その2の12/24分です。

世間はクリスマスイブらしいです。
私は無宗教なので関係ありません。リア充爆発しろ。

本記事では、Athenaをgolangから実行してみたので、その紹介をしたいと思います。Lambdaがgolangをサポートするらしいので夢が膨らみますね。

Athena

みなさん、Amazon Athena使っていますか?

Amazon Athena はインタラクティブなクエリサービスで、Amazon S3 内のデータを標準的な SQL を使用して簡単に分析できます。Athena はサーバーレスなので、インフラストラクチャの管理は不要です。実行したクエリに対してのみ料金が発生します。

S3に溜め込んだログファイルや大量のCSVに対して、気軽にhiveクエリを実行できるやつです。

howto

導入には下記あたりの記事が参考になります。

S3のデータをAmazon Athenaを使って分析する
Amazon Athenaを使ってみました #reinvent
AWS Black Belt Online Seminar 2017 Amazon Athena

tuning

クエリのパフォーマンスを上げたりコストを抑えるためには、いかにデータスキャン量を減らすかがキモとなります。
また、Athenaの実行リージョンとS3バケットのリージョンが異なると、リージョン間でのデータ転送が発生してそのコストもかかりますのでご注意を。

Amazon Athena のパフォーマンスチューニング Tips トップ 10
現場で運用する視点から見た Amazon Athena
ぼくがAthenaで死ぬまで

aws-sdk-goからAthenaを呼び出す

AWSのAthena apiドキュメントにjavaのサンプルはあるのですが、golangはなかったため、aws-sdk-goのドキュメントと、awsのapiドキュメントをにらめっこしながら実装しました。
同じようなことをされる方の参考になれば幸いです。

前提

  • aws-sdk-go (1.12.44)

処理の流れ

大まかに下記の流れです。

  1. Athena用のAPIクライアントを初期化する
  2. 実行するクエリの設定をする
  3. クエリを実行する
  4. クエリの状況を監視する
  5. クエリの結果を取得する

コードのサンプルと共に順に見ていきます。

1. Athena用のAPIクライアントを初期化する

aws-sdk-goを使ったことがある方ならいつものやつです。
https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#New
ACCESS_KEY_IDとSECRET_ACCESS_KEYでcredential, sessionを作成して、clientを初期化します。

var athenaClient *athena.Athena

func init() {
    cred := credentials.NewStaticCredentials(
        os.Getenv("AWS_ACCESS_KEY_ID"),
        os.Getenv("AWS_SECRET_ACCESS_KEY"),
        "",
    )
    conf := aws.Config{
        Region:      aws.String(os.Getenv("AWS_DEFAULT_REGION")),
        Credentials: cred,
    }
    sess := session.New(&conf)
    athenaClient = athena.New(sess)
}

注意点としては、AthenaからS3にアクセスする際、Athenaを呼び出す上記ACCESS_KEYのユーザー権限で対象のS3へアクセスします。このため、ここで使うユーザーにAthena/S3双方の権限を設定するようにしてください。

2. 実行するクエリの設定をする

実行したいクエリの設定をしましょう。aws-sdk-goでいうと、StartQueryExecutionInputを作成することになります。
NamedQueryを作成して呼び出すこともできますが、私はむしろ管理が煩雑になる気がするので、普通にクエリを実行するようにします。

StartQueryExecutionInputは、ドキュメントの通りQueryString
(string)
ResultConfigurationが必須項目です。

QueryStringは実行するクエリをそのまま渡すだけです。ただし、型がstringではなく*stringのため、一度変数にいれたりする必要があります。
参考)golang で string のポインタを取得する

Athenaで実行したクエリの結果はS3に自動保存されるのですが、ResultConfigurationは、その保存先と保存時に暗号化するかどうかを設定します。

resultConf := &athena.ResultConfiguration{}
resultConf.SetOutputLocation(os.Getenv("AWS_S3_BUCKET_FOR_ATHENA_RESULT"))

input := &athena.StartQueryExecutionInput{
    QueryString:         query,
    ResultConfiguration: resultConf,
}

なお、OutputLocationにはバケット+パスまで設定できるので、s3://{:bucket}/result/{:ymd}/みたいなことをすると後で参照しやすいです。

3. クエリを実行する

ここは特にありません。StartQueryExecutionInputを引数に、StartQueryExecutionを呼び出すのみです。
ただ、StartQueryExecutionクエリ実行のリクエストにすぎないことにご注意ください。

output, err := athenaClient.StartQueryExecution(input)

4. クエリの状況を監視する

3でクエリの実行をリクエストしました。その返り値として、StartQueryExecutionOutputが得られます。
StartQueryExecutionOutputQueryExecutionIdを持っており、このQueryExecutionIdがこのあと重要になります。

まずはリクエストしたクエリが完了するまで監視します。
GetQueryExecutionというAPIが、クエリの実行状況を返却してくれます。引数の
GetQueryExecutionInputに先ほど実行したクエリのQueryExecutionIdを設定してリクエストします。
GetQueryExecutionOutputQueryExecution.Status.Stateを確認して、"SUCCEEDED", "FAILED"など完了が確認できるまで繰り返しリクエストしましょう。

id = output.QueryExecutionId

executionInput := &athena.GetQueryExecutionInput{
    QueryExecutionId: id,
}

// 終わるまで待つ
var executionOutput *athena.GetQueryExecutionOutput

for {
    executionOutput, err = athenaClient.GetQueryExecution(executionInput)
    if err != nil {
        return nil, err
    }
    // executionOutput.QueryExecution.Status.Stateは*string
    switch *executionOutput.QueryExecution.Status.State {
    // Stateはconstで管理されている
    // https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#pkg-consts
    case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
        // 待って再実行
        time.Sleep(5 * time.Second)
    case athena.QueryExecutionStateSucceeded:
        return id, nil
    default: // athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled
        return nil, errors.New(executionOutput.String())
    }
}

5. クエリの結果を取得する

クエリIDなどを含めたGetQueryResultsInputを引数に、GetQueryResultsを呼び出しましょう。

ここに関してはサンプルコードありません。というのも、結果を少しずつ取得できるのはよい反面、ResultSet, Row, Datumで順次ラップされてるのでむしろ扱いが面倒で。。
最初にStartQueryExecutionInput.ResultConfigurationで設定したS3のパスに、結果CSVが{QueryExecutionId}.csvという名前で保存されているため、普通にS3のAPIでダウンロードしてenconding/csvでパースしたほうが扱いやすいです。

サンプルコードまとめ


package athenaclient

import (
    "errors"
    "os"
    "time"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/credentials"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/athena"
)

var athenaClient *athena.Athena

func init() {
    cred := credentials.NewStaticCredentials(
        os.Getenv("AWS_ACCESS_KEY_ID"),     // Access key
        os.Getenv("AWS_SECRET_ACCESS_KEY"), // secret
        "",
    )
    conf := aws.Config{
        Region:      aws.String(os.Getenv("AWS_DEFAULT_REGION")),
        Credentials: cred,
    }
    sess := session.New(&conf)
    athenaClient = athena.New(sess)
}

var ExecuteAthenaQuery = func(query *string) (id *string, err error) {
    resultConf := &athena.ResultConfiguration{}
    resultConf.SetOutputLocation(os.Getenv("AWS_S3_BUCKET_FOR_ATHENA_RESULT"))

    input := &athena.StartQueryExecutionInput{
        QueryString:         query,
        ResultConfiguration: resultConf,
    }

    output, err := athenaClient.StartQueryExecution(input)
    if err != nil {
        return nil, err
    }

    id = output.QueryExecutionId
    executionInput := &athena.GetQueryExecutionInput{
        QueryExecutionId: id,
    }

    // 終わるまで待つ
    var executionOutput *athena.GetQueryExecutionOutput

    for {
        executionOutput, err = athenaClient.GetQueryExecution(executionInput)
        if err != nil {
            return nil, err
        }
        // executionOutput.QueryExecution.Status.Stateは*string
        switch *executionOutput.QueryExecution.Status.State {
            // Stateはconstで管理されている
            // https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#pkg-consts
            case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
                // 待って再実行
                time.Sleep(5 * time.Second)
            case athena.QueryExecutionStateSucceeded:
                return id, nil
            default: // athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled
                return nil, errors.New(executionOutput.String())
        }
    }
}

なお、実行用のfuncvar宣言しているのは、clientをスタブしてmodel層のテストを実行するためです。
詳しくは昨年のAdvent Calenderの記事をご参照ください。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
8