はじめに
Amazon S3 Tables を使うと、Amazon S3 上で Apache Iceberg テーブルを管理できます
普通の S3 バケットにファイルを置いて Athena から読む構成よりも、テーブルとして扱うための仕組みが最初から用意されており、しかも自動的にパフォーマンスを最適化してくれます
S3 Tables では table bucket という専用のバケット種別を使い、その中で namespace ごとにテーブルを整理します
namespace は Athena や Glue では database に相当する概念です
本記事では Elixir の Livebook から Amazon S3 Tables を操作して、以下の流れを順番に試します
- サンプルデータを用意する
- Glue Data Catalog を準備する
- table bucket と namespace を作る
- Iceberg テーブルを作る
- Athena でデータを投入、検索する
- 後片付けをする
実装したノートブックはこちら
セットアップ
必要なモジュールをインストールします
Mix.install([
{:aws, "~> 1.0"},
{:hackney, "~> 1.16"},
{:uuid, "~> 1.1"},
{:explorer, "~> 0.11"},
{:kino, "~> 0.19"}
])
Explorer を使ってサンプルデータを扱うので、先に alias も定義しておきます
alias Explorer.DataFrame
alias Explorer.Series
require Explorer.DataFrame
サンプルデータの準備
今回は Explorer に含まれている wine データセットを使います
wine_df = Explorer.Datasets.wine()
Kino.DataTable.new(wine_df)
実行すると、ワインの成分が表形式で表示されます
AWS 認証
まずは Livebook 上で AWS の認証情報を入力します
access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")
[
access_key_id_input,
secret_access_key_input,
region_input
]
|> Kino.Layout.grid(columns: 3)
表示された入力欄にアクセスキー、シークレットキー、リージョンを入力してください
続いて AWS クライアントを作成します
client =
AWS.Client.create(
Kino.Input.read(access_key_id_input),
Kino.Input.read(secret_access_key_input),
Kino.Input.read(region_input)
)
リージョンとアカウント ID も後続で使うので取得しておきます
region = Kino.Input.read(region_input)
account_id =
client
|> AWS.STS.get_caller_identity(%{})
|> elem(1)
|> Map.get("GetCallerIdentityResponse")
|> Map.get("GetCallerIdentityResult")
|> Map.get("Account")
Glue Data Catalog の準備
S3 Tables を Athena から扱うために、Glue Data Catalog 側の準備を行います
ここで作る s3tablescatalog は、Athena から S3 Tables の table bucket を参照するための Glue Catalog (データの置き場所や構造を管理する台帳)です
Athena はこの catalog を入口として、どの table bucket 配下の namespace やテーブルを見るのかを解決します
つまり役割としては、
- S3 Tables を Athena から見えるように登録する
-
s3tablescatalog/<table_bucket_name>という形で、参照先の table bucket を識別する - その配下の namespace を Athena から database のように扱えるようにする
というイメージです
まずは s3tablescatalog がすでに存在するか確認します
exist_s3tablescatalog =
client
|> AWS.Glue.get_catalogs(%{})
|> elem(1)
|> Map.get("CatalogList")
|> Enum.any?(fn catalog -> Map.get(catalog, "Name") == "s3tablescatalog" end)
なければ作成します
if not exist_s3tablescatalog do
client
|> AWS.Glue.create_catalog(%{
"Name" => "s3tablescatalog",
"CatalogInput" => %{
"Description" => "Federated catalog for S3 Tables",
"FederatedCatalog" => %{
"Identifier" => "arn:aws:s3tables:#{region}:#{account_id}:bucket/*",
"ConnectionName" => "aws:s3tables"
},
"CreateDatabaseDefaultPermissions" => [%{
"Principal" => %{
"DataLakePrincipalIdentifier" => "IAM_ALLOWED_PRINCIPALS"
},
"Permissions" => ["ALL"]
}],
"CreateTableDefaultPermissions" => [%{
"Principal" => %{
"DataLakePrincipalIdentifier" => "IAM_ALLOWED_PRINCIPALS"
},
"Permissions" => ["ALL"]
}]
}
})
end
一覧を表示して確認します
client
|> AWS.Glue.get_catalogs(%{})
|> elem(1)
|> Map.get("CatalogList")
"Name" => "s3tablescatalog" のカタログが見えていれば OK です
Athena の結果出力先バケットを作成する
Athena でクエリを実行すると、結果の保存先 S3 バケットが必要です
まずは名前を入力します
output_bucket_name_input = Kino.Input.text("OUTPUT_BUCKET_NAME")
読み取ってから作成します
output_bucket_name = Kino.Input.read(output_bucket_name_input)
AWS.S3.create_bucket(client, output_bucket_name, %{
"CreateBucketConfiguration" => %{
"LocationConstraint" => region
}
})
table bucket を作成する
次は S3 Tables 本体の保存先になる table bucket を作ります
table_bucket_name_input = Kino.Input.text("TABLE_BUCKET_NAME")
こちらも名前を入力します
table_bucket_name = Kino.Input.read(table_bucket_name_input)
table bucket を作成します
buckets_res =
client
|> AWS.S3Tables.create_table_bucket(%{
"name" => table_bucket_name
})
返ってきたレスポンスから ARN を取り出します
buckets_arn =
buckets_res
|> elem(1)
|> Map.get("arn")
一覧も確認してみましょう
client
|> AWS.S3Tables.list_table_buckets()
|> elem(1)
|> Map.get("tableBuckets")
ここで、作成した table bucket が表示されれば大丈夫です
namespace を作成する
S3 Tables では、テーブルを namespace 単位で整理します
Athena や Glue の感覚だと database に近いです
今回は sample_namespace という名前で作成します
namespace_res =
client
|> AWS.S3Tables.create_namespace(buckets_arn, %{
"namespace" => ["sample_namespace"]
})
作成後に一覧を見てみます
client
|> AWS.S3Tables.list_namespaces(buckets_arn)
|> elem(1)
|> Map.get("namespaces")
テーブルを作成する
S3 Tables にテーブルを作るには、スキーマ情報を渡す必要があります
Explorer の型情報をもとに、フィールド定義へ変換します
fields =
wine_df
|> Map.get(:dtypes)
|> Enum.map(fn {name, type} ->
%{
"name" => name,
"type" =>
case type do
{:f, _} -> "float"
{:s, _} -> "int"
_ -> "string"
end
}
end)
実行結果
[
%{"name" => "alcalinity_of_ash", "type" => "float"},
%{"name" => "alcohol", "type" => "float"},
%{"name" => "ash", "type" => "float"},
...
]
そのまま wine テーブルを作成します
フォーマットは ICEBERG を指定します
table_res =
client
|> AWS.S3Tables.create_table("sample_namespace", buckets_arn, %{
"name" => "wine",
"format" => "ICEBERG",
"metadata" => %{
"iceberg" => %{
"schema" => %{
"fields" => fields
}
}
}
})
一覧で確認します
client
|> AWS.S3Tables.list_tables(buckets_arn)
|> elem(1)
|> Map.get("tables")
Athena 実行用の関数を準備する
ここからは Athena 経由で SQL を流していきます
まずはクエリを実行して QueryExecutionId を返す関数を作ります
exec_query = fn query ->
client
|> AWS.Athena.start_query_execution(%{
"QueryString" => query,
"ClientRequestToken" => UUID.uuid1(),
"QueryExecutionContext" => %{
"Catalog" => "s3tablescatalog/#{table_bucket_name}",
"Database" => "sample_namespace"
},
"ResultConfiguration" => %{
"OutputLocation" => "s3://#{output_bucket_name}"
}
})
|> elem(1)
|> Map.get("QueryExecutionId")
end
Athena で S3 Tables を使うときは、catalog に s3tablescatalog/<table_bucket_name> を指定します
続いて実行状態を取得する関数です
get_state = fn exec_id ->
client
|> AWS.Athena.get_query_execution(%{"QueryExecutionId" => exec_id})
|> elem(1)
|> then(& &1["QueryExecution"]["Status"]["State"])
end
最後に Athena の結果を DataFrame に変換する関数も用意しておきます
get_results = fn exec_id ->
results =
client
|> AWS.Athena.get_query_results(%{"QueryExecutionId" => exec_id})
|> elem(1)
results_df =
results
|> then(& &1["ResultSet"]["Rows"])
|> Enum.map(& &1["Data"])
|> then(fn columns ->
[headers | values] = columns
headers
|> Enum.map(& &1["VarCharValue"])
|> Enum.with_index()
|> Enum.reduce(%{}, fn {col_name, index}, acc ->
col_values =
Enum.map(values, fn each_values ->
each_values
|> Enum.at(index)
|> then(& &1["VarCharValue"])
end)
%{col_name => col_values}
|> Map.merge(acc)
end)
end)
|> DataFrame.new()
results
|> then(& &1["ResultSet"]["ResultSetMetadata"]["ColumnInfo"])
|> Enum.map(fn info ->
{
info["Name"],
case info["Type"] do
"integer" -> :integer
"float" -> :float
_ -> :string
end
}
end)
|> Enum.reduce(results_df, fn {col_name, col_type}, df ->
DataFrame.mutate_with(df, &%{col_name => Series.cast(&1[col_name], col_type)})
end)
end
データを投入する
まずは再実行しやすいように、既存データを削除しておきます
exec_id = exec_query.("DELETE FROM \"wine\";")
状態を見てみます。
get_state.(exec_id)
実行直後は "RUNNING" になります
実行完了していると "SUCCEEDED" になるので、それまで繰り返し状態を確認します
続いて、本当に空になっているか確認します
exec_id = exec_query.("SELECT * FROM \"wine\";")
exec_id
|> get_results.()
|> Kino.DataTable.new()
問題なさそうなら、DataFrame の内容から INSERT 文を組み立てます
cols = Explorer.DataFrame.names(wine_df)
vals =
wine_df
|> Explorer.DataFrame.to_rows()
|> Enum.map(fn row ->
Enum.map(cols, fn name ->
Map.get(row, name) |> then(fn val -> "#{val}" end)
end)
end)
各行ごとの INSERT 文を作ります
queries =
vals
|> Enum.map(fn row ->
"INSERT INTO wine (#{Enum.join(cols, ",")}) VALUES (#{Enum.join(row, ",")})"
end)
実行結果
["INSERT INTO wine (class,alcohol,malic_acid,ash,alcalinity_of_ash,magnesium,total_phenols,flavanoids,nonflavanoid_phenols,proanthocyanins,color_intensity,hue,od280_over_od315_of_diluted_wines,proline) VALUES (1,14.23,1.71,2.43,15.6,127,2.8,3.06,0.28,2.29,5.64,1.04,3.92,1065)",
"INSERT INTO wine (class,alcohol,malic_acid,ash,alcalinity_of_ash,magnesium,total_phenols,flavanoids,nonflavanoid_phenols,proanthocyanins,color_intensity,hue,od280_over_od315_of_diluted_wines,proline) VALUES (1,13.2,1.78,2.14,11.2,100,2.65,2.76,0.26,1.28,4.38,1.05,3.4,1050)",
"INSERT INTO wine (class,alcohol,malic_acid,ash,alcalinity_of_ash,magnesium,total_phenols,flavanoids,nonflavanoid_phenols,proanthocyanins,color_intensity,hue,od280_over_od315_of_diluted_wines,proline) VALUES (1,13.16,2.36,2.67,18.6,101,2.8,3.24,0.3,2.81,5.68,1.03,3.17,1185)",
...
]
順番に実行します
Enum.map(queries, fn query ->
exec_id = exec_query.(query)
Process.sleep(1_000)
exec_id
|> get_state.()
|> IO.inspect()
end)
SUCCEEDED が並べば投入完了です。
データを取得する
まずは全件取得してみます
exec_id = exec_query.("SELECT * FROM \"wine\"")
exec_id
|> get_results.()
|> Kino.DataTable.new()
ちゃんとテーブルとして読めています
次に、条件付き検索も試してみます
exec_id = exec_query.("
SELECT
class,
color_intensity,
flavanoids
FROM
wine
WHERE
color_intensity < 5.0
AND flavanoids >= 2.0
ORDER BY
class
")
exec_id
|> get_results.()
|> Kino.DataTable.new()
普通に SQL で絞り込みできます
さらに集計もやってみます
exec_id = exec_query.("
SELECT
class,
max(alcohol) AS alcohol
FROM
wine
GROUP BY
class
ORDER BY
alcohol DESC
")
exec_id
|> get_results.()
|> Kino.DataTable.new()
Athena から S3 Tables を操作しているので、使い心地はかなり素直です
後片付け
作成したリソースは最後に削除しておきます
まずはテーブルです。
AWS.S3Tables.delete_table(client, "wine", "sample_namespace", buckets_arn, %{})
次に namespace を削除します
AWS.S3Tables.delete_namespace(client, "sample_namespace", buckets_arn, %{})
table bucket も削除します
AWS.S3Tables.delete_table_bucket(client, buckets_arn, %{})
Athena の出力先バケットは、オブジェクトを消してから削除します
keys =
client
|> AWS.S3.list_objects(output_bucket_name)
|> elem(1)
|> Map.get("ListBucketResult")
|> Map.get("Contents", [])
|> Enum.map(fn content -> Map.get(content, "Key") end)
keys
|> Enum.map(fn key ->
AWS.S3.delete_object(client, output_bucket_name, key, %{})
end)
AWS.S3.delete_bucket(client, output_bucket_name, %{})
まとめ
Amazon S3 Tables を使うと、S3 上に Apache Iceberg テーブルをかなり素直に置けます
特に、
- table bucket という専用の入れ物がある
- namespace でテーブルを整理できる
- Glue Data Catalog と統合すると Athena からそのまま SQL で扱える
あたりが分かりやすくて良いですね
今回の例では Elixir の Livebook から、
- table bucket の作成
- namespace の作成
- テーブル作成
- Athena での INSERT
- Athena での SELECT
まで一通り試せました
S3 にファイルを置くだけの構成よりも、テーブルとして扱いたいケースではかなり便利そうです
Athena や Spark から Iceberg テーブルを素直に扱いたいときに、まず候補に入れてよさそうだと感じました










