14
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ElixirAdvent Calendar 2022

Day 19

Livebook から Amazon Athena で S3 上の CSV ファイルに SQL クエリを実行する

Last updated at Posted at 2022-12-05

はじめに

Livebook から AWS のサービスを操作するシリーズです

今回は Amazon Athena を操作します

Athena は S3 上に保存したテキストファイル(CSVやTSV、JSON)を RDB のテーブルに見立て、 SQL を実行できるサービスです

SQL クエリによって 1 TB スキャンしたら $5 という金額なので、かなり安く利用できます

実は私もつい最近このサービスを知ったのですが、いちいちデータベースを作りたくないときに便利そうです

今回は Livebook から S3 上に CSV をアップロードし、 Athena から SQL クエリを実行してみます

実装したノートブックはこちら

事前作業

AWS のアカンウトと、 S3 の権限を持った IAM ユーザーと、その認証情報(ACCESS_KEY_ID と SECRET_ACCESS_KEY)が必要です

実行環境

Livebook 0.7.2 の Docker イメージを元にしたコンテナで動かしました

コンテナ定義はこちらを参照

セットアップ

前回ex_aws を使いましたが、 ex_aws_athena の最終コミットが4年前でアーカイブされていたため、今回は aws-elixir の方を使ってみます

README には以下のように書いてあります

Most of the code is generated using the aws-codegen library from the JSON descriptions of AWS services provided by Amazon. They can be found in lib/aws/generated.

実は aws-elixir のコードは aws-sdk-go の仕様から自動生成されたものです

ほぼ全てのの AWS サービスをカバーしている代わりに、ドキュメントがあまり親切ではありません

ほとんどの関数の第2引数が input で、これだけ見ても何を渡せばいいのか分かりません

input の詳細な内容は各 API の仕様を見ましょう

Mix.install([
  {:aws, "~> 0.13"},
  {:uuid, "~> 1.1"},
  {:hackney, "~> 1.18"},
  {:explorer, "~> 0.3"},
  {:kino, "~> 0.7"}
])

エイリアス等の準備をします

alias Explorer.DataFrame
alias Explorer.Series
require Explorer.DataFrame

認証

入力エリアを用意し、そこに IAM ユーザーの認証情報を入力します

ACCESS_KEY_ID と SECRET_ACCESS_KEY は秘密情報なので、値が見えないように Kino.Input.password を使います

access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")

リージョンもここで入力しておきましょう

region_input = Kino.Input.text("REGION")

スクリーンショット 2022-12-03 0.01.54.png

各認証情報を使って、 aws-elixir で AWS にリクエストを投げるためのクライアントを生成します

client =
  AWS.Client.create(
    Kino.Input.read(access_key_id_input),
    Kino.Input.read(secret_access_key_input),
    Kino.Input.read(region_input)
  )

スクリーンショット 2022-12-05 14.21.03.png

データ作成

今回は Explorer からワインデータセットを使用します

wine_df = Explorer.Datasets.wine()

wine_df
|> Kino.DataTable.new()

スクリーンショット 2022-12-05 14.49.01.png

CSV に保存します

csv_filename = "wine.csv"

wine_df
|> DataFrame.to_csv(csv_filename)

データアップロード

アップロード先の S3 バケットを確認します

buckets_res =
  client
  |> AWS.S3.list_buckets()
  |> elem(1)

スクリーンショット 2022-12-05 14.59.49.png

これでは見にくいので、データテーブルで表示します

buckets_res["ListAllMyBucketsResult"]["Buckets"]["Bucket"]
|> Kino.DataTable.new()

スクリーンショット 2022-12-05 15.01.03.png

バケット名を入力します

bucket_name_input = Kino.Input.text("BUCKET_ANME")

スクリーンショット 2022-12-05 15.03.56.png

CSV アップロード先のバケットとパスを指定します

bucket_name = Kino.Input.read(bucket_name_input)
athena_prefix = "athena/"

CSV ファイルを S3 にアップロードします

file = File.read!(csv_filename)
md5 = :crypto.hash(:md5, file) |> Base.encode64()

client
|> AWS.S3.put_object(
  bucket_name,
  athena_prefix <> csv_filename,
  %{"Body" => file, "ContentMD5" => md5}
)

スクリーンショット 2022-12-05 15.04.40.png

データベース定義

まずデータベースを作成します

(作成せずに default データベースを使っても良いです)

Athena では SQL を実行する度に一意のトークンを求められます

AWS CLI の場合は内部的に生成してくれているのですが、 API を直接呼ぶ場合は自分で生成しないといけません

トークンは 32 〜 128 文字ということなので、 UUID を使います

create_db_query = "CREATE DATABASE athena_sample"
create_db_token = UUID.uuid1()

スクリーンショット 2022-12-05 15.08.53.png

クエリを実行します

ResultConfigurationOutputLocation に S3 バケットを指定します(指定しないとエラーになります)

クエリは非同期に実行されるため、即結果が返ってきません

実行結果の QueryExecutionId を使って実行状態を取得する必要があります

exec_id =
  client
  |> AWS.Athena.start_query_execution(%{
    "QueryString" => create_db_query,
    "ClientRequestToken" => create_db_token,
    "ResultConfiguration" => %{
      "OutputLocation" => "s3://#{bucket_name}"
    }
  })
  |> elem(1)
  |> then(& &1["QueryExecutionId"])

スクリーンショット 2022-12-05 15.11.37.png

実行状況を確認します

client
|> AWS.Athena.get_query_execution(%{"QueryExecutionId" => exec_id})
|> elem(1)
|> then(& &1["QueryExecution"]["Status"]["State"])

スクリーンショット 2022-12-05 15.12.46.png

成功したようです

SQL 実行関数定義

毎回書くのは面倒なので、 SQL 実行と実行状況確認の関数を作っておきます

exec_query = fn query ->
  client
  |> AWS.Athena.start_query_execution(%{
    "QueryString" => query,
    "ClientRequestToken" => UUID.uuid1(),
    "ResultConfiguration" => %{
      "OutputLocation" => "s3://#{bucket_name}"
    }
  })
  |> elem(1)
  |> then(& &1["QueryExecutionId"])
end
get_state = fn exec_id ->
  client
  |> AWS.Athena.get_query_execution(%{"QueryExecutionId" => exec_id})
  |> elem(1)
  |> then(& &1["QueryExecution"]["Status"]["State"])
end

テーブル作成

次にテーブルを作成します

CSV からよしなに作ってほしいところですが、自分で定義してあげないといけません

再実行時にエラーにならないよう、もしテーブルがあれば削除しておきます

exec_id = exec_query.("DROP TABLE IF EXISTS athena_sample.wine_table")
get_state.(exec_id)

スクリーンショット 2022-12-05 18.00.20.png

CREATE 文の列名と型をいちいち書きたくないので、データフレームから生成します

列名とタイプのタプル配列を生成します

(列の順序が合っていないといけないため、 names で取得する順で並ぶようにしています)

types =
  wine_df
  |> DataFrame.names
  |> Enum.map(fn name ->
    {name, DataFrame.dtypes(wine_df)[name]}
  end)

スクリーンショット 2022-12-05 17.59.03.png

# CREATE 文の列定義を生成
table_columns =
  types
  |> Enum.map(fn {name, type} ->
    name <>
      " " <>
      case type do
        :integer ->
          "int"

        :float ->
          "float"
      end
  end)
  |> Enum.join(",")

スクリーンショット 2022-12-05 18.01.37.png

CREATE 文を定義して実行します

詳細についてはこちらの記事をご参照ください

exec_id = exec_query.("
  CREATE EXTERNAL TABLE
    athena_sample.wine_table (#{table_columns})
  ROW FORMAT
    SerDe 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
  WITH SerDeProperties (
    'serialization.format' = ',',
    'field.delim' = ','
  )
  STORED AS TEXTFILE
  LOCATION
    's3://#{bucket_name}/#{athena_prefix}' 
  TBLPROPERTIES (
    'has_encrypted_data'='false',
    'skip.header.line.count'='1'
  )
  ")

CSV の場合、 ROW FORMAT ... 以降を指定しておけばおおよそ問題ありません

LOCATION で CSV をアップロードしたバケット、パスを指定します

get_state.(exec_id)

スクリーンショット 2022-12-05 15.22.44.png

データ取得

いよいよ SQL でデータを取得してみます

まずは全件全項目です

exec_id = exec_query.("
  SELECT
    *
  FROM
    athena_sample.wine_table
  ")
get_state.(exec_id)

スクリーンショット 2022-12-05 15.26.07.png

とりあえず成功はしましたが、結果はまた別途取得しないといけません

results =
  client
  |> AWS.Athena.get_query_results(%{"QueryExecutionId" => exec_id})
  |> elem(1)

スクリーンショット 2022-12-05 18.03.07.png

何やら色々返ってきていますが、 ResultSet の中の ResultSetMetadata に結果の列情報が入っています

ここから各列の名前と型を取得します

# 列名と型を取得
types =
  results
  |> then(& &1["ResultSet"]["ResultSetMetadata"]["ColumnInfo"])
  |> Enum.map(fn info ->
    {
      info["Name"],
      case info["Type"] do
        "integer" ->
          :integer

        "float" ->
          :float

        _ ->
          :string
      end
    }
  end)

スクリーンショット 2022-12-05 18.03.53.png

また、 ResultSet の Rows に結果の各行が入っています

ただし、先頭行はヘッダーなので無視します

扱いやすいようにデータフレームにしておきましょう

# データを取得
results_df =
  results
  |> then(& &1["ResultSet"]["Rows"])
  |> Enum.map(& &1["Data"])
  |> then(fn columns ->
    [headers | values] = columns

    headers
    |> Enum.map(& &1["VarCharValue"])
    |> Enum.with_index()
    |> Enum.reduce(%{}, fn {col_name, index}, acc ->
      col_values =
        Enum.map(values, fn each_values ->
          each_values
          |> Enum.at(index)
          |> then(& &1["VarCharValue"])
        end)

      %{col_name => col_values}
      |> Map.merge(acc)
    end)
  end)
  |> DataFrame.new()

スクリーンショット 2022-12-05 18.04.55.png

取得してきた時点では全列が文字列型になっているので、型情報を元に変換します

parsed_df =
  types
  |> Enum.reduce(results_df, fn {col_name, col_type}, df ->
    DataFrame.mutate_with(df, &%{col_name => Series.cast(&1[col_name], col_type)})
  end)

スクリーンショット 2022-12-05 18.04.24.png

データテーブルで表示してみましょう

parsed_df
|> Kino.DataTable.new()

スクリーンショット 2022-12-05 18.05.30.png

アップロードした元データと同じく、 178 件取得できました

データ取得も何かと面倒なので、関数化しておきます

get_results = fn exec_id ->
  results =
    client
    |> AWS.Athena.get_query_results(%{"QueryExecutionId" => exec_id})
    |> elem(1)

  results_df =
    results
    |> then(& &1["ResultSet"]["Rows"])
    |> Enum.map(& &1["Data"])
    |> then(fn columns ->
      [headers | values] = columns

      headers
      |> Enum.map(& &1["VarCharValue"])
      |> Enum.with_index()
      |> Enum.reduce(%{}, fn {col_name, index}, acc ->
        col_values =
          Enum.map(values, fn each_values ->
            each_values
            |> Enum.at(index)
            |> then(& &1["VarCharValue"])
          end)

        %{col_name => col_values}
        |> Map.merge(acc)
      end)
    end)
    |> DataFrame.new()

  results
  |> then(& &1["ResultSet"]["ResultSetMetadata"]["ColumnInfo"])
  |> Enum.map(fn info ->
    {
      info["Name"],
      case info["Type"] do
        "integer" ->
          :integer

        "float" ->
          :float

        _ ->
          :string
      end
    }
  end)
  |> Enum.reduce(results_df, fn {col_name, col_type}, df ->
    DataFrame.mutate_with(df, &%{col_name => Series.cast(&1[col_name], col_type)})
  end)
end

次は WHEREORDER BY を使ってみましょう

exec_id = exec_query.("
  SELECT
    class,
    color_intensity,
    flavanoids
  FROM
    athena_sample.wine_table
  WHERE
    color_intensity < 5.0
    AND flavanoids >= 2.0
  ORDER BY
    class
  ")
exec_id
|> get_results.()
|> Kino.DataTable.new()

スクリーンショット 2022-12-05 18.07.06.png

見事に抽出、ソートができているようです

GROUP BY も使ってみましょう

exec_id = exec_query.("
  SELECT
    class,
    max(alcohol) AS alcohol
  FROM
    athena_sample.wine_table
  GROUP BY
    class
  ORDER BY
    alcohol DESC
  ")
exec_id
|> get_results.()
|> Kino.DataTable.new()

スクリーンショット 2022-12-05 18.07.49.png

しっかり集計できていますね

テーブル削除

テーブルを消しておきます

exec_id = exec_query.("DROP TABLE wine_table")
get_state.(exec_id)

スクリーンショット 2022-12-05 15.37.45.png

データベース削除

データベースを消しておきます

exec_id = exec_query.("DROP DATABASE athena_sample")
get_state.(exec_id)

スクリーンショット 2022-12-05 15.37.45.png

DataFrame で同じ動作

SQL と同じことを DataFrame でそのままやってみましょう

wine_df
|> DataFrame.filter(color_intensity < 5)
|> DataFrame.filter(flavanoids >= 2.0)
|> DataFrame.arrange(class)
|> DataFrame.select(["class", "color_intensity", "flavanoids"])
|> Kino.DataTable.new()

スクリーンショット 2022-12-05 18.09.57.png

wine_df
|> DataFrame.group_by("class")
|> DataFrame.summarise(alcohol: max(alcohol))
|> DataFrame.arrange(desc: alcohol)
|> Kino.DataTable.new()

スクリーンショット 2022-12-05 18.11.52.png

当然できますね

まとめ

というわけで、よほど大きいデータでローカルで開けない、というような事情でもない限り、 Athena は使わないと思います

DataFrame なら SQL 以上に複雑な操作も可能ですしね

(ここまで書いておいて)

14
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?