15
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ElixirAdvent Calendar 2022

Day 21

Livebook から Amazon DynamoDB の NoSQL データベースを操作する

Last updated at Posted at 2022-12-12

はじめに

Livebook から AWS のサービスを操作するシリーズです

今回は Amazon DynamoDB を操作します

Amazon DynamoDB は NoSQL データベースサービスで、超格安でデータベースを運用できます

フルマネージドでサーバレスなので、環境構築とかは気にせず、使った分だけ料金を払えばOKです

今回はテーブル作成・削除とデータ追加、データ取得を Livebook からやってみます

実装したノートブックはこちら

事前作業

AWS のアカンウトと、 DynamoDB の権限を持った IAM ユーザーと、その認証情報(ACCESS_KEY_ID と SECRET_ACCESS_KEY)が必要です

実行環境

Livebook 0.8.0 の Docker イメージを元にしたコンテナで動かしました

コンテナ定義はこちらを参照

セットアップ

ex_aws_dynamo を中心に、必要なモジュールをインストールします

また、データ分析のために Explorer もインストールします

Mix.install([
  {:ex_aws, "~> 2.0"},
  {:ex_aws_dynamo, "~> 4.2"},
  {:poison, "~> 5.0"},
  {:hackney, "~> 1.18"},
  {:sweet_xml, "~> 0.7"},
  {:explorer, "~> 0.4"},
  {:kino, "~> 0.8"}
])

エイリアス等の準備をします

alias ExAws.Dynamo
alias Explorer.DataFrame
alias Explorer.Series
require Explorer.DataFrame

認証

入力エリアを用意し、そこに IAM ユーザーの認証情報を入力します

ACCESS_KEY_ID と SECRET_ACCESS_KEY は秘密情報なので、値が見えないように Kino.Input.password を使います

access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")

操作対象にする DynamoDB テーブルのリージョンもここで入力しておきましょう

region_input = Kino.Input.text("REGION")

スクリーンショット 2022-12-03 0.01.54.png

各認証情報を ExAws に渡すためにまとめておきます

秘密情報が実行結果に現れないよう、セルの最後には "dummy" を入れておきましょう

auth_config = [
  access_key_id: Kino.Input.read(access_key_id_input),
  secret_access_key: Kino.Input.read(secret_access_key_input),
  region: Kino.Input.read(region_input)
]

"dummy"

テーブル作成

今回は Explorer から化石燃料データセットを取得し、そのデータを DynamoDB に入れてみます

df = Explorer.Datasets.fossil_fuels()

df
|> Kino.DataTable.new()

スクリーンショット 2022-12-12 11.09.36.png

作成するテーブルの名前を定義しておきます

table_name = "fossil_fuels"

テーブルのキーを定義します

DynamoDB においては RDB と違い、キー項目だけ ちゃんと 定義しておけば、あとはどんな項目が来てもデータを追加できます

この ちゃんと が難しいわけですが、、、

DynamoDB のキーについては以下の記事を参照してください

非常にややこしいですが、今回は深いことは考えず、 fossil_fuels データセットは年毎、国毎の化石燃料データなので、 year をハッシュキー、 country をレンジキーにします

key_schema = [
  year: :hash,
  country: :range
]

それぞれの型も指定します

key_types = [
  year: :number,
  country: :string
]

テーブルの性能を指定しておきます

これもかなり分かりにくいですが、オンデマンドキャパシティーモードにしておけば、超大量データを高速に書いたり読んだりしていても、基本的にエラーになりません

その代わり、料金は青天井で、読み書きの回数に応じてどんどん高くなります

どれだけのリクエストが来るか想定できない場合に使います

それに対してプロビジョニング済みキャパシティーモードはある程度時間あたりのリクエスト数が想定できる場合に使用します

読み書きにキャパシティーユニットという数値を指定しておけば、それに応じた時間あたりの料金になります

そして読み書きそれぞれ 25 キャパシティーユニットまでは無料利用枠が適用されます

小規模データベースだったり、一時的に使うだけのデータベースならほぼ無料で使えてしまいます

というわけで、今回は読み書きそれぞれ 1 のキャパシティーユニットでテーブルを作成します

read_capacity = 1
write_capacity = 1

Dynamo.create_table でテーブルを作成します

table_name
|> Dynamo.create_table(key_schema, key_types, read_capacity, write_capacity)
|> ExAws.request(auth_config)

スクリーンショット 2022-12-12 11.23.30.png

Dynamo.list_tables で作成したテーブルを確認してみましょう

Dynamo.list_tables()
|> ExAws.request(auth_config)

スクリーンショット 2022-12-12 11.24.26.png

データ追加

まず一つだけデータを追加してみましょう

データフレームから1件データを取得します

item =
  df
  |> DataFrame.to_rows()
  |> Enum.at(0)

スクリーンショット 2022-12-12 11.26.38.png

Dynamo.put_item にそのまま渡せば追加できます

table_name
|> Dynamo.put_item(item)
|> ExAws.request(auth_config)

スクリーンショット 2022-12-12 11.27.32.png

では全件追加しましょう、と言いたいところですが、書込キャパシティー 1 で全件追加するとすごく時間がかかるため、 2012 年以降の A で始まる国のデータだけに限定します

対象データを確認します

df
|> DataFrame.put(
  :initial,
  Series.transform(df["country"], fn country -> String.first(country) end)
)
|> DataFrame.filter(initial == "A")
|> DataFrame.filter(year >= 2012)
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()

スクリーンショット 2022-12-12 11.29.41.png

対象データを逐次追加します

速すぎると書き込みキャパシティーを超えてエラーになってしまうため、 Process.sleep(500) で速度を抑えます

df
|> DataFrame.put(
  :initial,
  Series.transform(df["country"], fn country -> String.first(country) end)
)
|> DataFrame.filter(initial == "A")
|> DataFrame.filter(year >= 2012)
|> DataFrame.select(DataFrame.names(df))
|> DataFrame.to_rows()
|> Enum.with_index()
|> Enum.map(fn {row, index} ->
  IO.inspect(index)

  Process.sleep(500)

  table_name
  |> Dynamo.put_item(row)
  |> ExAws.request(auth_config)
  |> IO.inspect()
end)

スクリーンショット 2022-12-12 11.39.07.png

ゆっくり全件追加できました

データ取得

DynamoDB のデータ取得には scanqueryget_item があります

scan はキー項目による絞り込みができないときに使います

DynamoDB 上で全件読み込みが行われるため、読込キャパシティーが無駄に消費されるため推奨されません

全件取得する場合は使っても良いです

res =
  table_name
  |> Dynamo.scan()
  |> ExAws.request!(auth_config)

スクリーンショット 2022-12-12 11.42.00.png

先に入れていた1件 + まとめて入れた39件で合わせて40件取得できています

ただし、取得した形式が "bunker_fuels" => %{"N" => "9"}, というように型と値をキーバリューにした特殊な形式になっているため、整形してからデータフレームに入れます

デーコード用の関数を定義します

decode = fn item ->
  item
  |> Enum.reduce(%{}, fn {key, type_value}, merged ->
    type =
      type_value
      |> Map.keys()
      |> Enum.at(0)

    value =
      type_value
      |> Map.values()
      |> Enum.at(0)

    parsed =
      case type do
        "N" ->
          if String.contains?(value, ".") do
            String.to_float(value)
          else
            String.to_integer(value)
          end

        "S" ->
          value
      end

    %{key => parsed}
    |> Map.merge(merged)
  end)
end
decoded_df =
  res["Items"]
  |> Enum.map(&decode.(&1))
  |> DataFrame.new()

decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()

スクリーンショット 2022-12-12 11.44.42.png

scan に検索条件や上限件数を指定することもできます

decoded_df =
  table_name
  |> Dynamo.scan(
    limit: 3,
    expression_attribute_values: [value: 1_000],
    expression_attribute_names: %{"#name" => "total"},
    filter_expression: "#name > :value"
  )
  |> ExAws.request!(auth_config)
  |> then(&(&1["Items"]))
  |> Enum.map(&decode.(&1))
  |> DataFrame.new()

decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()

スクリーンショット 2022-12-12 11.55.15.png

query はキー項目によって絞り込みを行います

このとき、 DynamoDB はキーによって絞り込まれたパーティションだけを読み込むため、消費される読込キャパシティーが少なくなります

decoded_df =
  table_name
  |> Dynamo.query(
    limit: 5,
    expression_attribute_values: [value: 2013],
    expression_attribute_names: %{"#name" => "year"},
    key_condition_expression: "#name = :value"
  )
  |> ExAws.request!(auth_config)
  |> then(&(&1["Items"]))
  |> Enum.map(&decode.(&1))
  |> DataFrame.new()

decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()

スクリーンショット 2022-12-12 12.00.01.png

get_item はキー項目を全て指定して特定のデータ1件だけを取得します

decoded_df =
  table_name
  |> Dynamo.get_item(%{year: 2012, country: "AFGHANISTAN"})
  |> ExAws.request!(auth_config)
  |> then(&(&1["Item"]))
  |> decode.()
  |> List.wrap()
  |> DataFrame.new()

decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()

スクリーンショット 2022-12-12 12.01.33.png

データ更新

データ更新には Dynamo.update_item を使います

テーブル名、キー項目、更新内容を指定します

update_expression の場合は set を付けます

table_name
|> Dynamo.update_item(
  %{year: 2012, country: "AFGHANISTAN"},
  expression_attribute_values: [cement_value: 6],
  expression_attribute_names: %{"#cement_name" => "cement"},
  update_expression: "set #cement_name = :cement_value"
)
|> ExAws.request!(auth_config)

スクリーンショット 2022-12-12 13.11.22.png

decoded_df =
  table_name
  |> Dynamo.get_item(%{year: 2012, country: "AFGHANISTAN"})
  |> ExAws.request!(auth_config)
  |> then(&(&1["Item"]))
  |> decode.()
  |> List.wrap()
  |> DataFrame.new()

decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()

スクリーンショット 2022-12-12 13.11.50.png

データ削除

削除は  Dynamo.delete_item です

table_name
|> Dynamo.delete_item(
  %{year: 2012, country: "AFGHANISTAN"}
)
|> ExAws.request!(auth_config)

スクリーンショット 2022-12-12 13.11.22.png

decoded_df =
  table_name
  |> Dynamo.scan()
  |> ExAws.request!(auth_config)
  |> then(&(&1["Items"]))
  |> Enum.map(&decode.(&1))
  |> DataFrame.new()

decoded_df
|> DataFrame.select(DataFrame.names(df))
|> Kino.DataTable.new()

スクリーンショット 2022-12-12 13.13.58.png

テーブル設定更新

一時的に読込キャパシティーや書込キャパシティーを変更したくなった場合は以下のようにしてテーブル設定を更新します

table_name
|> Dynamo.update_table(%{
  provisioned_throughput: %{
    read_capacity_units: 3,
    write_capacity_units: 3,
  }
})
|> ExAws.request(auth_config)

スクリーンショット 2022-12-12 13.28.08.png

返ってくる値は更新前のテーブル設定になっています

更新後の(最新の)テーブル設定は Dynamo.describe_table で取得します

table_name
|> Dynamo.describe_table()
|> ExAws.request(auth_config)

スクリーンショット 2022-12-12 13.30.12.png

テーブル削除

テーブルを削除します

table_name
|> Dynamo.delete_table()
|> ExAws.request(auth_config)

スクリーンショット 2022-12-12 13.31.01.png

Dynamo.list_tables()
|> ExAws.request(auth_config)

スクリーンショット 2022-12-12 13.31.45.png

まとめ

わざわざデータ解析のためだけに DynamoDB を使うことはありませんが、何らかのデータ(IoT機器から送られてくるセンサー値など)を DynamoDB に溜めていて、それを解析することはあると思います

DynamoDB から scan もしくは query してきたデータを Livebook 上で解析する、という用途になりそうです

15
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?