4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Livebook から Amazon S3 Tables で表形式のデータを効率的に操作する

4
Last updated at Posted at 2026-06-09

はじめに

Amazon S3 Tables を使うと、Amazon S3 上で Apache Iceberg テーブルを管理できます

普通の S3 バケットにファイルを置いて Athena から読む構成よりも、テーブルとして扱うための仕組みが最初から用意されており、しかも自動的にパフォーマンスを最適化してくれます

S3 Tables では table bucket という専用のバケット種別を使い、その中で namespace ごとにテーブルを整理します
namespace は Athena や Glue では database に相当する概念です

本記事では Elixir の Livebook から Amazon S3 Tables を操作して、以下の流れを順番に試します

  1. サンプルデータを用意する
  2. Glue Data Catalog を準備する
  3. table bucket と namespace を作る
  4. Iceberg テーブルを作る
  5. Athena でデータを投入、検索する
  6. 後片付けをする

実装したノートブックはこちら

セットアップ

必要なモジュールをインストールします

Mix.install([
  {:aws, "~> 1.0"},
  {:hackney, "~> 1.16"},
  {:uuid, "~> 1.1"},
  {:explorer, "~> 0.11"},
  {:kino, "~> 0.19"}
])

Explorer を使ってサンプルデータを扱うので、先に alias も定義しておきます

alias Explorer.DataFrame
alias Explorer.Series
require Explorer.DataFrame

サンプルデータの準備

今回は Explorer に含まれている wine データセットを使います

wine_df = Explorer.Datasets.wine()

Kino.DataTable.new(wine_df)

実行すると、ワインの成分が表形式で表示されます

スクリーンショット 2026-06-09 17.59.22.png

AWS 認証

まずは Livebook 上で AWS の認証情報を入力します

access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")

[
  access_key_id_input,
  secret_access_key_input,
  region_input
]
|> Kino.Layout.grid(columns: 3)

スクリーンショット 2026-06-09 17.55.51.png

表示された入力欄にアクセスキー、シークレットキー、リージョンを入力してください

続いて AWS クライアントを作成します

client =
  AWS.Client.create(
    Kino.Input.read(access_key_id_input),
    Kino.Input.read(secret_access_key_input),
    Kino.Input.read(region_input)
  )

リージョンとアカウント ID も後続で使うので取得しておきます

region = Kino.Input.read(region_input)

account_id =
  client
  |> AWS.STS.get_caller_identity(%{})
  |> elem(1)
  |> Map.get("GetCallerIdentityResponse")
  |> Map.get("GetCallerIdentityResult")
  |> Map.get("Account")

Glue Data Catalog の準備

S3 Tables を Athena から扱うために、Glue Data Catalog 側の準備を行います

ここで作る s3tablescatalog は、Athena から S3 Tables の table bucket を参照するための Glue Catalog (データの置き場所や構造を管理する台帳)です

Athena はこの catalog を入口として、どの table bucket 配下の namespace やテーブルを見るのかを解決します

つまり役割としては、

  • S3 Tables を Athena から見えるように登録する
  • s3tablescatalog/<table_bucket_name> という形で、参照先の table bucket を識別する
  • その配下の namespace を Athena から database のように扱えるようにする

というイメージです

まずは s3tablescatalog がすでに存在するか確認します

exist_s3tablescatalog =
  client
  |> AWS.Glue.get_catalogs(%{})
  |> elem(1)
  |> Map.get("CatalogList")
  |> Enum.any?(fn catalog -> Map.get(catalog, "Name") == "s3tablescatalog" end)

なければ作成します

if not exist_s3tablescatalog do
  client
  |> AWS.Glue.create_catalog(%{
    "Name" => "s3tablescatalog",
    "CatalogInput" => %{
      "Description" => "Federated catalog for S3 Tables",
      "FederatedCatalog" => %{
        "Identifier" => "arn:aws:s3tables:#{region}:#{account_id}:bucket/*",
        "ConnectionName" => "aws:s3tables"
      },
      "CreateDatabaseDefaultPermissions" => [%{
        "Principal" => %{
          "DataLakePrincipalIdentifier" => "IAM_ALLOWED_PRINCIPALS"
        },
        "Permissions" => ["ALL"]
      }],
      "CreateTableDefaultPermissions" => [%{
        "Principal" => %{
          "DataLakePrincipalIdentifier" => "IAM_ALLOWED_PRINCIPALS"
        },
        "Permissions" => ["ALL"]
      }]
    }
  })
end

一覧を表示して確認します

client
|> AWS.Glue.get_catalogs(%{})
|> elem(1)
|> Map.get("CatalogList")

"Name" => "s3tablescatalog" のカタログが見えていれば OK です

スクリーンショット 2026-06-09 17.53.57.png

Athena の結果出力先バケットを作成する

Athena でクエリを実行すると、結果の保存先 S3 バケットが必要です

まずは名前を入力します

output_bucket_name_input = Kino.Input.text("OUTPUT_BUCKET_NAME")

スクリーンショット 2026-06-09 17.57.20.png

読み取ってから作成します

output_bucket_name = Kino.Input.read(output_bucket_name_input)

AWS.S3.create_bucket(client, output_bucket_name, %{
  "CreateBucketConfiguration" => %{
    "LocationConstraint" => region
  }
})

table bucket を作成する

次は S3 Tables 本体の保存先になる table bucket を作ります

table_bucket_name_input = Kino.Input.text("TABLE_BUCKET_NAME")

スクリーンショット 2026-06-09 17.58.20.png

こちらも名前を入力します

table_bucket_name = Kino.Input.read(table_bucket_name_input)

table bucket を作成します

buckets_res =
  client
  |> AWS.S3Tables.create_table_bucket(%{
    "name" => table_bucket_name
  })

返ってきたレスポンスから ARN を取り出します

buckets_arn =
  buckets_res
  |> elem(1)
  |> Map.get("arn")

一覧も確認してみましょう

client
|> AWS.S3Tables.list_table_buckets()
|> elem(1)
|> Map.get("tableBuckets")

ここで、作成した table bucket が表示されれば大丈夫です

namespace を作成する

S3 Tables では、テーブルを namespace 単位で整理します
Athena や Glue の感覚だと database に近いです

今回は sample_namespace という名前で作成します

namespace_res =
  client
  |> AWS.S3Tables.create_namespace(buckets_arn, %{
    "namespace" => ["sample_namespace"]
  })

作成後に一覧を見てみます

client
|> AWS.S3Tables.list_namespaces(buckets_arn)
|> elem(1)
|> Map.get("namespaces")

テーブルを作成する

S3 Tables にテーブルを作るには、スキーマ情報を渡す必要があります
Explorer の型情報をもとに、フィールド定義へ変換します

fields =
  wine_df
  |> Map.get(:dtypes)
  |> Enum.map(fn {name, type} ->
    %{
      "name" => name,
      "type" =>
        case type do
          {:f, _} -> "float"
          {:s, _} -> "int"
          _ -> "string"
        end
    }
  end)

実行結果

[
  %{"name" => "alcalinity_of_ash", "type" => "float"},
  %{"name" => "alcohol", "type" => "float"},
  %{"name" => "ash", "type" => "float"},
  ...
]

そのまま wine テーブルを作成します
フォーマットは ICEBERG を指定します

table_res =
  client
  |> AWS.S3Tables.create_table("sample_namespace", buckets_arn, %{
    "name" => "wine",
    "format" => "ICEBERG",
    "metadata" => %{
      "iceberg" => %{
        "schema" => %{
          "fields" => fields
        }
      }
    }
  })

一覧で確認します

client
|> AWS.S3Tables.list_tables(buckets_arn)
|> elem(1)
|> Map.get("tables")

Athena 実行用の関数を準備する

ここからは Athena 経由で SQL を流していきます

まずはクエリを実行して QueryExecutionId を返す関数を作ります

exec_query = fn query ->
  client
  |> AWS.Athena.start_query_execution(%{
    "QueryString" => query,
    "ClientRequestToken" => UUID.uuid1(),
    "QueryExecutionContext" => %{
      "Catalog" => "s3tablescatalog/#{table_bucket_name}",
      "Database" => "sample_namespace"
    },
    "ResultConfiguration" => %{
      "OutputLocation" => "s3://#{output_bucket_name}"
    }
  })
  |> elem(1)
  |> Map.get("QueryExecutionId")
end

Athena で S3 Tables を使うときは、catalog に s3tablescatalog/<table_bucket_name> を指定します

続いて実行状態を取得する関数です

get_state = fn exec_id ->
  client
  |> AWS.Athena.get_query_execution(%{"QueryExecutionId" => exec_id})
  |> elem(1)
  |> then(& &1["QueryExecution"]["Status"]["State"])
end

最後に Athena の結果を DataFrame に変換する関数も用意しておきます

get_results = fn exec_id ->
  results =
    client
    |> AWS.Athena.get_query_results(%{"QueryExecutionId" => exec_id})
    |> elem(1)

  results_df =
    results
    |> then(& &1["ResultSet"]["Rows"])
    |> Enum.map(& &1["Data"])
    |> then(fn columns ->
      [headers | values] = columns

      headers
      |> Enum.map(& &1["VarCharValue"])
      |> Enum.with_index()
      |> Enum.reduce(%{}, fn {col_name, index}, acc ->
        col_values =
          Enum.map(values, fn each_values ->
            each_values
            |> Enum.at(index)
            |> then(& &1["VarCharValue"])
          end)

        %{col_name => col_values}
        |> Map.merge(acc)
      end)
    end)
    |> DataFrame.new()

  results
  |> then(& &1["ResultSet"]["ResultSetMetadata"]["ColumnInfo"])
  |> Enum.map(fn info ->
    {
      info["Name"],
      case info["Type"] do
        "integer" -> :integer
        "float" -> :float
        _ -> :string
      end
    }
  end)
  |> Enum.reduce(results_df, fn {col_name, col_type}, df ->
    DataFrame.mutate_with(df, &%{col_name => Series.cast(&1[col_name], col_type)})
  end)
end

データを投入する

まずは再実行しやすいように、既存データを削除しておきます

exec_id = exec_query.("DELETE FROM \"wine\";")

状態を見てみます。

get_state.(exec_id)

実行直後は "RUNNING" になります

スクリーンショット 2026-06-09 18.03.58.png

実行完了していると "SUCCEEDED" になるので、それまで繰り返し状態を確認します

スクリーンショット 2026-06-09 18.04.48.png

続いて、本当に空になっているか確認します

exec_id = exec_query.("SELECT * FROM \"wine\";")
exec_id
|> get_results.()
|> Kino.DataTable.new()

スクリーンショット 2026-06-09 18.06.57.png

問題なさそうなら、DataFrame の内容から INSERT 文を組み立てます

cols = Explorer.DataFrame.names(wine_df)

vals =
  wine_df
  |> Explorer.DataFrame.to_rows()
  |> Enum.map(fn row ->
    Enum.map(cols, fn name ->
      Map.get(row, name) |> then(fn val -> "#{val}" end)
    end)
  end)

各行ごとの INSERT 文を作ります

queries =
  vals
  |> Enum.map(fn row ->
    "INSERT INTO wine (#{Enum.join(cols, ",")}) VALUES (#{Enum.join(row, ",")})"
  end)

実行結果

["INSERT INTO wine (class,alcohol,malic_acid,ash,alcalinity_of_ash,magnesium,total_phenols,flavanoids,nonflavanoid_phenols,proanthocyanins,color_intensity,hue,od280_over_od315_of_diluted_wines,proline) VALUES (1,14.23,1.71,2.43,15.6,127,2.8,3.06,0.28,2.29,5.64,1.04,3.92,1065)",
 "INSERT INTO wine (class,alcohol,malic_acid,ash,alcalinity_of_ash,magnesium,total_phenols,flavanoids,nonflavanoid_phenols,proanthocyanins,color_intensity,hue,od280_over_od315_of_diluted_wines,proline) VALUES (1,13.2,1.78,2.14,11.2,100,2.65,2.76,0.26,1.28,4.38,1.05,3.4,1050)",
 "INSERT INTO wine (class,alcohol,malic_acid,ash,alcalinity_of_ash,magnesium,total_phenols,flavanoids,nonflavanoid_phenols,proanthocyanins,color_intensity,hue,od280_over_od315_of_diluted_wines,proline) VALUES (1,13.16,2.36,2.67,18.6,101,2.8,3.24,0.3,2.81,5.68,1.03,3.17,1185)",
  ...
]

順番に実行します

Enum.map(queries, fn query ->
  exec_id = exec_query.(query)

  Process.sleep(1_000)

  exec_id
  |> get_state.()
  |> IO.inspect()
end)

SUCCEEDED が並べば投入完了です。

データを取得する

まずは全件取得してみます

exec_id = exec_query.("SELECT * FROM \"wine\"")
exec_id
|> get_results.()
|> Kino.DataTable.new()

スクリーンショット 2026-06-09 18.15.10.png

ちゃんとテーブルとして読めています

次に、条件付き検索も試してみます

exec_id = exec_query.("
  SELECT
    class,
    color_intensity,
    flavanoids
  FROM
    wine
  WHERE
    color_intensity < 5.0
    AND flavanoids >= 2.0
  ORDER BY
    class
  ")
exec_id
|> get_results.()
|> Kino.DataTable.new()

スクリーンショット 2026-06-09 18.16.08.png

普通に SQL で絞り込みできます

さらに集計もやってみます

exec_id = exec_query.("
  SELECT
    class,
    max(alcohol) AS alcohol
  FROM
    wine
  GROUP BY
    class
  ORDER BY
    alcohol DESC
  ")
exec_id
|> get_results.()
|> Kino.DataTable.new()

スクリーンショット 2026-06-09 18.16.53.png

Athena から S3 Tables を操作しているので、使い心地はかなり素直です

後片付け

作成したリソースは最後に削除しておきます

まずはテーブルです。

AWS.S3Tables.delete_table(client, "wine", "sample_namespace", buckets_arn, %{})

次に namespace を削除します

AWS.S3Tables.delete_namespace(client, "sample_namespace", buckets_arn, %{})

table bucket も削除します

AWS.S3Tables.delete_table_bucket(client, buckets_arn, %{})

Athena の出力先バケットは、オブジェクトを消してから削除します

keys =
  client
  |> AWS.S3.list_objects(output_bucket_name)
  |> elem(1)
  |> Map.get("ListBucketResult")
  |> Map.get("Contents", [])
  |> Enum.map(fn content -> Map.get(content, "Key") end)
keys
|> Enum.map(fn key ->
  AWS.S3.delete_object(client, output_bucket_name, key, %{})
end)
AWS.S3.delete_bucket(client, output_bucket_name, %{})

まとめ

Amazon S3 Tables を使うと、S3 上に Apache Iceberg テーブルをかなり素直に置けます

特に、

  • table bucket という専用の入れ物がある
  • namespace でテーブルを整理できる
  • Glue Data Catalog と統合すると Athena からそのまま SQL で扱える

あたりが分かりやすくて良いですね

今回の例では Elixir の Livebook から、

  • table bucket の作成
  • namespace の作成
  • テーブル作成
  • Athena での INSERT
  • Athena での SELECT

まで一通り試せました

S3 にファイルを置くだけの構成よりも、テーブルとして扱いたいケースではかなり便利そうです
Athena や Spark から Iceberg テーブルを素直に扱いたいときに、まず候補に入れてよさそうだと感じました

4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?