2
2

More than 1 year has passed since last update.

PythonでDynamoDBの単一テーブル設計を扱う

Last updated at Posted at 2022-10-01

PythonでDynamoDBを単一テーブル設計で使うためのライブラリを作りました。
https://github.com/medaka0213/DynamoDB-SingleTable

概要

DynamoDBは高パフォーマンスでサーバーレスなNoSQLデータベースですが、テーブル設計が大変です。
単一テーブル設計を使えば、一個のテーブルと数個のGSIで全てのデータモデルを効率的に管理することができます。

今回紹介するライブラリは、以下のブログを参考に作ってみたものになります。

使い方

インストール

pip install ddb_single

テーブルを作る

DynamoDB-Localを使う場合、endpoint_url="http://localhost:8000" でローカルのエンドポイントを設定できる

from ddb_single.table import Table
from ddb_single.query import Query

table = Table(
    table_name="sample",
    endpoint_url="http://localhost:8000",
)
table.init()

データモデルを作る

少なくとも3つのキーを使う

  • primary_key ... バッシュキー。 デフォルト: {__model_name__}_{uuid}
  • seconday_key ... レンジキー。デフォルト: {__model_name__}_item
  • unique_key ... 一意の値を指定する。アイテムの更新とかに便利

GSIを使って検索したい項目には、serch_key を設定

from ddb_single.model import BaseModel, DBField
from ddb_single.table import FieldType

class User(BaseModel):
    __table__=table
    __model_name__ = "user"
    name = DBField(unique_key=True)
    email = DBField(search_key=True)
    age = DBField(type=FieldType.NUMBER, search_key=True)
    description=DBField()

CRUD操作

CRUD操作には、Query オブジェクトが必要

  • query.model(foo).create
  • query.model(foo).get
  • query.model(foo).search
  • query.model(foo).update
  • query.model(foo).delete
query = Query(table)

アイテムを作成する

unique_key が同じアイテムがあった場合、既存のアイテムが更新される

user = User(name="John", email="john@example.com", description="test")
query.model(user).create()

こんな感じで複数のアイテムが作成される

pk sk data name email description
user_xxxx user_item John john@example.com test
user_xxxx search_user_name John
user_xxxx search_user_email new-john@example.com

メインのアイテム (sk=user_itemのもの)と、複数の検索用アイテム(sk=search_{__model_name__}_{field_name}) がテーブルに追加される。

このとき、検索用のGSI DataSearchIndexpk を抽出するのに使う。
あとはbatch_get でメインのアイテムを取得するだけ。

sk = hash data = range pk
search_user_name John user_xxxx
search_user_email new-john@example.com user_xxxx

アイテムを検索する

user = query.model(User).search(User.name.eq("John"))
print(user)
# -> [{"pk":"user_xxxx", "sk":"user_item", "name":"John", "email":"john@example.com"}]

pk_only=Truebatch_get を行わずに、primary key だけ抽出する

user_pks = query.model(User).search(User.name.eq("John"), pk_only=True)
print(user_pks)
# -> ["user_xxxx"]

単一のアイテムを取得する

primary key を指定して、単一のアイテムを取得できる

user = query.model(User).get("user_xxxx")
print(user)
# -> {"pk":"user_xxxx", "sk":"user_item", "name":"John", "email":"john@example.com"}

unique_key でも取得できる

user = query.model(User).get_by_unique("John")
print(user)
# -> {"pk":"user_xxxx", "sk":"user_item", "name":"John", "email":"john@example.com"}

pk_only=Trueunique_key からprimary key を抽出する

pk = query.model(User).get_by_unique("John", pk_only=True)
print(pk)
# -> "user_xxxx"

アイテムを更新する

検索してから、指定のアイテムを更新する場合

user = query.model(User).search(User.email.eq("john@example.com"))
new_user = User(**user[0])
new_user.email = "new-john@example.com"
query.model(new_user).update()

unique_key を指定すれば、既存のアイテムが識別されて更新される

new_user = User(name="John", email="new-john@example.com")
query.model(new_user).update()

メインのアイテムと検索用のアイテムが更新される

pk sk data name email description
user_xxxx user_item John new-john@example.com test
user_xxxx search_user_name John
user_xxxx search_user_email new-john@example.com

アイテムを削除する

user = query.model(User).search(User.email.eq("new-john@example.com"))
query.model(user[0]).delete()

primary key を指定することもできる

query.model(User).delete_by_pk("user_xxxx")

unique_key でも可能

query.model(User).delete_by_unique("John")

バッチ処理

table.batch_writer() で複数のアイテムをバッチ処理できる

  • query.model(foo).create(batch=batch)
  • query.model(foo).update(batch=batch)
  • query.model(foo).delete(batch=batch)

複数のアイテムを作成

with table.batch_writer() as batch:
    for i in range(3):
        user = User(name=f"test{i}", age=i+10)
        query.model(user).create(batch=batch)
res = query.model(User).search(User.name.begins_with("test"))
print([(r["name"], r["age"]) for r in res])
# -> [("test0", 10), ("test1", 11), ("test2", 12)]

複数のアイテムを更新

with table.batch_writer() as batch:
    for i in range(3):
        user = User(name=f"test{i}", age=i+20)
        query.model(user).update(batch=batch)
res = query.model(User).search(User.name.begins_with("test"))
print([(r["name"], r["age"]) for r in res])
# -> [("test0", 20), ("test1", 21), ("test2", 22)]

複数のアイテムを削除

pks = query.model(User).search(User.name.begins_with("test"), pk_only=True)
with table.batch_writer() as batch:
    for pk in pks:
        query.model(user).delete_by_pk(pk, batch=batch)
res = query.model(User).search(User.name.begins_with("test"))
print(res)
# -> []

アイテムの関連付け

SQLでいうrelationみたいなやつ

モデルを作成する

relation=BaseModel で他のモデルとの関連を設定したモデルを作成できる
ここでは、ブログの作者にUserを指定してみる

class BlogPost(BaseModel):
    __model_name__ = "blogpost"
    __table__=table
    name = DBField(unique_key=True)
    content = DBField()
    author = DBField(reletion=User)

アイテムを新規作成する

blogpost = BlogPost(
    name="Hello",
    content="Hello world",
    author=self.user
)
query.model(blogpost).create()
pk sk data name author content
user_xxxx user_item John
user_xxxx search_user_name John
blogpost_xxxx blogpost_item Hello John Hello world
blogpost_xxxx search_blogpost_title Hello
blogpost_xxxx rel_user_xxxx author

メインのアイテム (sk=blogpost_item) に加えて関連付けのためのアイテム(sk=rel_{primary_key}) が追加される
このアイテムはDataSearchIndex を使って対象のpk を取得するのに使われる

sk = hash data = range pk
rel_user_xxxx author blogpost_xxxx

関連付けられたアイテムを検索

get_relation(model=Basemodel) で関連付けたアイテムを検索できる

blogpost = query.model(BlogPost).get_by_unique("Hello")
blogpost = BlogPost(**blogpost)

user = query.model(blogpost).get_relation(model=User)
print(user)
# -> [{"pk":"user_xxxx", "sk":"user_item", "name":"John"}]

get_relation(field=DBField) でフィールドを指定

user = query.model(blogpost).get_relation(field=BlogPost.author)
print(user)
# -> [{"pk":"user_xxxx", "sk":"user_item", "name":"John"}]

参照を検索

参照reference は、ここでは関連relation の対義語
(もっと適当な単語は無いんか)

get_reference(model=Basemodel) で、参照している (関連付けられた) アイテムを検索できる

user = query.model(User).get_by_unique("John")
user = User(**blogpost)

blogpost = query.model(blogpost).get_reference(model=BlogPost)
print(blogpost)
# -> [{"pk":"blogpost_xxxx", "sk":"blogpost_item", "name":"Hello"}]

get_reference(field=DBField) でフィールドを指定

blogpost = query.model(user).get_reference(field=BlogPost.author)
print(blogpost)
# -> [{"pk":"blogpost_xxxx", "sk":"blogpost_item", "name":"Hello"}]

関連付けを更新

関連付けのための値が変わると、関連付けも更新される

new_user = User(name="Michael")
blogpost = query.model(BlogPost).get_by_unique("Hello")
blogpost["author"] = new_user
blogpost = BlogPost(**blogpost)

query.model(blogpost).update()

関連付けのためのアイテム (sk=rel_user_yyyy) が更新されている

pk sk data name author content
user_xxxx user_item John
user_xxxx search_user_name John
user_yyyy user_item Michael
user_yyyy search_user_name Michael
blogpost_xxxx blogpost_item Hello Michael Hello world
blogpost_xxxx search_blogpost_title Hello
blogpost_xxxx rel_user_yyyy author

関連付けを削除

関連付けられたアイテムが削除されると、関連付けも解消される

query.model(user).delete_by_unique("Michael")

関連付けのためのアイテムが削除されているが、メインのアイテムの値は変化しない

pk sk data name author content
user_xxxx user_item John
user_xxxx search_user_name John
blogpost_xxxx blogpost_item Hello Michael Hello world
blogpost_xxxx search_blogpost_title Hello

今後の予定

SQLでいう連結テーブルみたいなやつが欲しい

終わりに

10/2追記: アイテムの隣接関係、バッチ処理を追加しました

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2