Help us understand the problem. What is going on with this article?

aws-sdk-goを使ってAthenaでクエリを実行する

More than 1 year has passed since last update.

こんにちは、LIFULLのchissoです。

この記事はLIFULL Advent Calendar 2017 その2の12/24分です。

世間はクリスマスイブらしいです。
私は無宗教なので関係ありません。リア充爆発しろ。

本記事では、Athenaをgolangから実行してみたので、その紹介をしたいと思います。Lambdaがgolangをサポートするらしいので夢が膨らみますね。

Athena

みなさん、Amazon Athena使っていますか?

Amazon Athena はインタラクティブなクエリサービスで、Amazon S3 内のデータを標準的な SQL を使用して簡単に分析できます。Athena はサーバーレスなので、インフラストラクチャの管理は不要です。実行したクエリに対してのみ料金が発生します。

S3に溜め込んだログファイルや大量のCSVに対して、気軽にhiveクエリを実行できるやつです。

howto

導入には下記あたりの記事が参考になります。

S3のデータをAmazon Athenaを使って分析する
Amazon Athenaを使ってみました #reinvent
AWS Black Belt Online Seminar 2017 Amazon Athena

tuning

クエリのパフォーマンスを上げたりコストを抑えるためには、いかにデータスキャン量を減らすかがキモとなります。
また、Athenaの実行リージョンとS3バケットのリージョンが異なると、リージョン間でのデータ転送が発生してそのコストもかかりますのでご注意を。

Amazon Athena のパフォーマンスチューニング Tips トップ 10
現場で運用する視点から見た Amazon Athena
ぼくがAthenaで死ぬまで

aws-sdk-goからAthenaを呼び出す

AWSのAthena apiドキュメントにjavaのサンプルはあるのですが、golangはなかったため、aws-sdk-goのドキュメントと、awsのapiドキュメントをにらめっこしながら実装しました。
同じようなことをされる方の参考になれば幸いです。

前提

  • aws-sdk-go (1.12.44)

処理の流れ

大まかに下記の流れです。

  1. Athena用のAPIクライアントを初期化する
  2. 実行するクエリの設定をする
  3. クエリを実行する
  4. クエリの状況を監視する
  5. クエリの結果を取得する

コードのサンプルと共に順に見ていきます。

1. Athena用のAPIクライアントを初期化する

aws-sdk-goを使ったことがある方ならいつものやつです。
https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#New
ACCESS_KEY_IDとSECRET_ACCESS_KEYでcredential, sessionを作成して、clientを初期化します。

var athenaClient *athena.Athena

func init() {
    cred := credentials.NewStaticCredentials(
        os.Getenv("AWS_ACCESS_KEY_ID"),
        os.Getenv("AWS_SECRET_ACCESS_KEY"),
        "",
    )
    conf := aws.Config{
        Region:      aws.String(os.Getenv("AWS_DEFAULT_REGION")),
        Credentials: cred,
    }
    sess := session.New(&conf)
    athenaClient = athena.New(sess)
}

注意点としては、AthenaからS3にアクセスする際、Athenaを呼び出す上記ACCESS_KEYのユーザー権限で対象のS3へアクセスします。このため、ここで使うユーザーにAthena/S3双方の権限を設定するようにしてください。

2. 実行するクエリの設定をする

実行したいクエリの設定をしましょう。aws-sdk-goでいうと、StartQueryExecutionInputを作成することになります。
NamedQueryを作成して呼び出すこともできますが、私はむしろ管理が煩雑になる気がするので、普通にクエリを実行するようにします。

StartQueryExecutionInputは、ドキュメントの通りQueryString
(string)
ResultConfigurationが必須項目です。

QueryStringは実行するクエリをそのまま渡すだけです。ただし、型がstringではなく*stringのため、一度変数にいれたりする必要があります。
参考)golang で string のポインタを取得する

Athenaで実行したクエリの結果はS3に自動保存されるのですが、ResultConfigurationは、その保存先と保存時に暗号化するかどうかを設定します。

resultConf := &athena.ResultConfiguration{}
resultConf.SetOutputLocation(os.Getenv("AWS_S3_BUCKET_FOR_ATHENA_RESULT"))

input := &athena.StartQueryExecutionInput{
    QueryString:         query,
    ResultConfiguration: resultConf,
}

なお、OutputLocationにはバケット+パスまで設定できるので、s3://{:bucket}/result/{:ymd}/みたいなことをすると後で参照しやすいです。

3. クエリを実行する

ここは特にありません。StartQueryExecutionInputを引数に、StartQueryExecutionを呼び出すのみです。
ただ、StartQueryExecutionクエリ実行のリクエストにすぎないことにご注意ください。

output, err := athenaClient.StartQueryExecution(input)

4. クエリの状況を監視する

3でクエリの実行をリクエストしました。その返り値として、StartQueryExecutionOutputが得られます。
StartQueryExecutionOutputQueryExecutionIdを持っており、このQueryExecutionIdがこのあと重要になります。

まずはリクエストしたクエリが完了するまで監視します。
GetQueryExecutionというAPIが、クエリの実行状況を返却してくれます。引数の
GetQueryExecutionInputに先ほど実行したクエリのQueryExecutionIdを設定してリクエストします。
GetQueryExecutionOutputQueryExecution.Status.Stateを確認して、"SUCCEEDED", "FAILED"など完了が確認できるまで繰り返しリクエストしましょう。

id = output.QueryExecutionId

executionInput := &athena.GetQueryExecutionInput{
    QueryExecutionId: id,
}

// 終わるまで待つ
var executionOutput *athena.GetQueryExecutionOutput

for {
    executionOutput, err = athenaClient.GetQueryExecution(executionInput)
    if err != nil {
        return nil, err
    }
    // executionOutput.QueryExecution.Status.Stateは*string
    switch *executionOutput.QueryExecution.Status.State {
    // Stateはconstで管理されている
    // https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#pkg-consts
    case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
        // 待って再実行
        time.Sleep(5 * time.Second)
    case athena.QueryExecutionStateSucceeded:
        return id, nil
    default: // athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled
        return nil, errors.New(executionOutput.String())
    }
}

5. クエリの結果を取得する

クエリIDなどを含めたGetQueryResultsInputを引数に、GetQueryResultsを呼び出しましょう。

ここに関してはサンプルコードありません。というのも、結果を少しずつ取得できるのはよい反面、ResultSet, Row, Datumで順次ラップされてるのでむしろ扱いが面倒で。。
最初にStartQueryExecutionInput.ResultConfigurationで設定したS3のパスに、結果CSVが{QueryExecutionId}.csvという名前で保存されているため、普通にS3のAPIでダウンロードしてenconding/csvでパースしたほうが扱いやすいです。

サンプルコードまとめ

https://docs.aws.amazon.com/sdk-for-go/api/service/athena/

package athenaclient

import (
    "errors"
    "os"
    "time"

    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/credentials"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/athena"
)

var athenaClient *athena.Athena

func init() {
    cred := credentials.NewStaticCredentials(
        os.Getenv("AWS_ACCESS_KEY_ID"),     // Access key
        os.Getenv("AWS_SECRET_ACCESS_KEY"), // secret
        "",
    )
    conf := aws.Config{
        Region:      aws.String(os.Getenv("AWS_DEFAULT_REGION")),
        Credentials: cred,
    }
    sess := session.New(&conf)
    athenaClient = athena.New(sess)
}

var ExecuteAthenaQuery = func(query *string) (id *string, err error) {
    resultConf := &athena.ResultConfiguration{}
    resultConf.SetOutputLocation(os.Getenv("AWS_S3_BUCKET_FOR_ATHENA_RESULT"))

    input := &athena.StartQueryExecutionInput{
        QueryString:         query,
        ResultConfiguration: resultConf,
    }

    output, err := athenaClient.StartQueryExecution(input)
    if err != nil {
        return nil, err
    }

    id = output.QueryExecutionId
    executionInput := &athena.GetQueryExecutionInput{
        QueryExecutionId: id,
    }

    // 終わるまで待つ
    var executionOutput *athena.GetQueryExecutionOutput

    for {
        executionOutput, err = athenaClient.GetQueryExecution(executionInput)
        if err != nil {
            return nil, err
        }
        // executionOutput.QueryExecution.Status.Stateは*string
        switch *executionOutput.QueryExecution.Status.State {
            // Stateはconstで管理されている
            // https://docs.aws.amazon.com/sdk-for-go/api/service/athena/#pkg-consts
            case athena.QueryExecutionStateQueued, athena.QueryExecutionStateRunning:
                // 待って再実行
                time.Sleep(5 * time.Second)
            case athena.QueryExecutionStateSucceeded:
                return id, nil
            default: // athena.QueryExecutionStateFailed, athena.QueryExecutionStateCancelled
                return nil, errors.New(executionOutput.String())
        }
    }
}

なお、実行用のfuncvar宣言しているのは、clientをスタブしてmodel層のテストを実行するためです。
詳しくは昨年のAdvent Calenderの記事をご参照ください。

chisso
lifull
日本最大級の不動産・住宅情報サイト「LIFULL HOME'S」を始め、人々の生活に寄り添う様々な情報サービス事業を展開しています。
https://lifull.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした