はじめに
Cassandra Day Tokyo
今年、2023年6月1日に、Cassandra Dayが日本でも開催されます。
Cassandra Dayは、昨年、ベルリン、ロンドン、アムステルダム、ハノイ、ジャカルタ、ヒューストン、サンタクララ、シアトル、シンガポールでも開催されました。
今回の東京での開催に向けて、Apache Cassandraに関する記事を発表していきます。
Apache Cassandraについて
Apache Cassandraとは、一言でいうなら、オープンソースの分散データベース管理システムです。
他の分散データベース管理システム同様、複数の汎用サーバーを用いて、ひとつのデータベースを構築します(開発などの目的のため、一つのサーバーのみで構成することも可能です)。
ここでは、詳しい説明は割愛し、興味のある方へのご紹介の役割は、公式サイトやWikipediaに譲ります。
オブジェクトマッピングによるデータベースプログラミング
データベースとオブジェクトマッピング
データベースで管理されているデータ(レコード)と、オブジェクト指向プログラミングで使われるデータ(オブジェクト)との間の変換は、(特にリレーショナルデータベースでは)専用のライブラリを習得することが通常です。そのような機能はオブジェクト(リレーショナル)マッピングと呼ばれます。
Cassandraとオブジェクトマッピング
Cassandraでは、オブジェクトマッピング機能は、cassandra.cqlengine
パッケージにより提供されます。
本稿では、このパッケージによって提供されるオブジェクトマッピングを利用したプログラミングの概略を紹介します。
Cassandraで扱うデータモデルの詳細については、上述のドキュメントなどを参照ください。とはいえ、Cassandraは、ネイティブアクセスのためにCQL(Cassandra Query Language)というプロトコルが使われており、RDB経験者であれば、以下のプログラムは、RDBのテーブルとの類似でほとんど理解することができると思われます(「クラスタリングキー」など、耳慣れない言葉が出てきた場合は、適宜ドキュメントをご確認ください。とはいえ、全体のイメージを掴むのに問題はないと思われます)。
サンプルプログラム
まずはサンプルプログラムの全体像を示します。
import uuid
from cassandra.cqlengine import columns
from cassandra.cqlengine import connection
from datetime import datetime
from cassandra.cqlengine.management import sync_table
from cassandra.cqlengine.models import Model
# まず、モデルを定義します。
class ExampleModel(Model):
example_id = columns.UUID(primary_key=True, default=uuid.uuid4)
example_type = columns.Integer(index=True)
created_at = columns.DateTime()
description = columns.Text(required=False)
# 次にCassandraデータベースとの接続を行います。
# 利用できるオプションについては、次を参照: http://datastax.github.io/python-driver/api/cassandra/cluster.html
# 配列として、接続先ホストのリストをClusterインスタンス作成時に渡します(ここでは開発環境を想定し、ローカルホスト一つを利用)。
connection.setup(['127.0.0.1'], "cqlengine", protocol_version=3)
以下では、Pythonコマンドラインインターフェイス上での実行イメージを出力結果と共に示します。
# 以下の操作により、定義したモデルのテーブルがCassandraに作成されます。
>>> sync_table(ExampleModel)
# これでテーブルにレコード(列)を追加することができます。
>>> em1 = ExampleModel.create(example_type=0, description="example1", created_at=datetime.now())
>>> em2 = ExampleModel.create(example_type=0, description="example2", created_at=datetime.now())
>>> em3 = ExampleModel.create(example_type=0, description="example3", created_at=datetime.now())
>>> em4 = ExampleModel.create(example_type=0, description="example4", created_at=datetime.now())
>>> em5 = ExampleModel.create(example_type=1, description="example5", created_at=datetime.now())
>>> em6 = ExampleModel.create(example_type=1, description="example6", created_at=datetime.now())
>>> em7 = ExampleModel.create(example_type=1, description="example7", created_at=datetime.now())
>>> em8 = ExampleModel.create(example_type=1, description="example8", created_at=datetime.now())
# テーブルに対してクエリを実行することもできます。
>>> ExampleModel.objects.count()
8
>>> q = ExampleModel.objects(example_type=1)
>>> q.count()
4
>>> for instance in q:
>>> print instance.description
example5
example6
example7
example8
# ここでは、追加のフィルターを適用します。
# queryオブジェクトはイミュータブルです。つまり、ここで戻されるオブジェクト(q2)は、新しいオブジェクトです。
>>> q2 = q.filter(example_id=em5.example_id)
>>> q2.count()
1
>>> for instance in q2:
>>> print instance.description
example5
モデル概要
モデルは、CQL テーブルを表す Python クラスです。モデルはModel
から派生し(を継承して定義し)、基本的なテーブル プロパティとテーブルの列を定義します。
モデルの列は、CQL テーブルの列にマップされます。モデル クラスで列の属性を定義することにより、CQL 列を定義します。モデルが有効であるためには、少なくとも 1 つの主キー列と 1 つの非主キー列が必要です。CQL と同様に、列を定義する順序は意味を持ちます。これはモデルに対応するテーブルで列を定義する順序と同じになります。
モデルを定義する場合の基本的な例を以下にいくつか示します。詳細については、ドキュメントを参照してください。
モデル定義例
この例では、列first_name
とlast_name
を持つModel
クラスを継承したPerson
クラスを定義します。
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
class Person(Model):
id = columns.UUID(primary_key=True)
first_name = columns.Text()
last_name = columns.Text()
このPerson
クラスは、次の CQL テーブルに対応します。create table
句で示します。
CREATE TABLE cqlengine.person (
id uuid,
first_name text,
last_name text,
PRIMARY KEY (id)
);
次に他のモデル定義方法の例として、Comment
クラスを紹介します。
ここで登場する新しい要素として、clustering_order="DESC"
という降順(DESC)のクラスタリング キーの利用があります。
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
class Comment(Model):
photo_id = columns.UUID(primary_key=True)
comment_id = columns.TimeUUID(primary_key=True, clustering_order="DESC")
comment = columns.Text()
上記のComment
モデルに対応するテーブルは次のようになります。create table
句で示します。
CREATE TABLE comment (
photo_id uuid,
comment_id timeuuid,
comment text,
PRIMARY KEY (photo_id, comment_id)
) WITH CLUSTERING ORDER BY (comment_id DESC);
これらのモデルをデータベースに同期するには、次の手順で実行します。(ここでのPerson
、Comment
は、オブジェクトではなくクラスであることに注意)
from cassandra.cqlengine.management import sync_table
sync_table(Person)
sync_table(Comment)
モデル同期には、スキーマの変更が伴います。データベースへのインパクトを考慮して行う必要があります。考慮事項については、ドキュメント中のスキーマ管理の説明を参照してください。
モデル操作
モデル インスタンスは、以下のように、ディクショナリ操作に類似した方法でアクセスできます。
class Person(Model):
first_name = columns.Text()
last_name = columns.Text()
kevin = Person.create(first_name="Kevin", last_name="Deldycke")
dict(kevin) # returns {'first_name': 'Kevin', 'last_name': 'Deldycke'}
kevin['first_name'] # returns 'Kevin'
kevin.keys() # returns ['first_name', 'last_name']
kevin.values() # returns ['Kevin', 'Deldycke']
kevin.items() # returns [('first_name', 'Kevin'), ('last_name', 'Deldycke')]
kevin['first_name'] = 'KEVIN5000' # changes the models first name
モデル検証の拡張
モデル インスタンスを cqlengine
に保存するたびに、モデル内のデータは、モデル用に定義したスキーマに対して検証されます。ここで行われる検証は基本的には自明なものと考えることができます。例えば、異なるデータ型の混同、つまり例えば、テキストを整数列に保存しようとしていないことを確認します。また、データを適切に保存するために必要な変換も実行します。
ただし、データを挿入しようとしたときに Cassandraデータベースでエラーが発生しないようにするだけでなく、追加の制約や変換を課したい場合があります。
モデルに追加の検証ロジックを定義するには、そのモデルのvalidate
メソッドを拡張します。
class Member(Model):
person_id = UUID(primary_key=True)
name = Text(required=True)
def validate(self):
super(Member, self).validate()
if self.name == 'jon':
raise ValidationError('no jon\'s allowed')
検証が失敗した場合に、(cassandra.cqlengine.
) ValidationError
を発生させることができます。とはいえ、これは、必須ではありません。検証に失敗した場合のデフォルト値に変換するなどの対応も考えられます。
モデルの継承
単一の CQL テーブルを使用して、さまざまなモデル クラスを保存およびロードできます。これは、単一の Cassandra 行に格納したいさまざまなオブジェクト タイプがある場合に役立ちます。
たとえば、猫や犬についての情報を格納するペットテーブルがあるとします。
class Pet(Model):
__table_name__ = 'pet'
owner_id = UUID(primary_key=True)
pet_id = UUID(primary_key=True)
pet_type = Text(discriminator_column=True)
name = Text()
def eat(self, food):
pass
def sleep(self, time):
pass
class Cat(Pet):
__discriminator_value__ = 'cat'
cuteness = Float()
def tear_up_couch(self):
pass
class Dog(Pet):
__discriminator_value__ = 'dog'
fierceness = Float()
def bark_all_night(self):
pass
sync_table
を呼び出した後、各モデルで定義された列がpet
テーブルに追加されます。さらに、モデルを保存するCat
と、Dog
各行を猫または犬として識別するために必要なメタデータが保存されます。
継承を使用してモデル構造をセットアップするには、次の手順に従います。
- 識別子(ここでは
pet_type
)として設定された列を持つ基本モデルを作成する(distriminator_column=True
により指定)。 - サブクラス モデルを(複数)定義し、
__discriminator_value__
フィールドに(それぞれ)一意の値を定義する -
sync_table
を各サブテーブルで実行する
ユーザー定義型
cqlengine
は、ユーザー定義型 (UDT: User Defined Type) をモデル化することもできます。
UDTは、(単体で利用するのではなく)モデル(テーブル)を構成する、フィールドとして(のみ)利用できます。
UDT インスタンスは、テーブルに対応するモデルを介して作成、保存、クエリされます。
from cassandra.cqlengine.columns import *
from cassandra.cqlengine.models import Model
from cassandra.cqlengine.usertype import UserType
class address(UserType):
street = Text()
zipcode = Integer()
class users(Model):
__keyspace__ = 'account'
name = Text(primary_key=True)
addr = UserDefinedType(address)
users.create(name="Joe", addr=address(street="Easy St.", zipcode=99999))
user = users.objects(name="Joe")[0]
print user.name, user.addr
# Joe address(street=u'Easy St.', zipcode=99999)
UDT は、 UserType
を継承するクラスとして定義し、その属性フィールドのそれぞれに、対応するデータ型のインスタンスを設定することによってモデル化されます。
そして、次に、UserDefinedType
クラスの引数に、そのUDTクラスをパラメーターとして宣言することにより、モデルの(フィールド/行)定義にUDTが使用されます。
sync_table
は、テーブルに含まれるすべてのUDTを暗黙的に同期します。または、sync_type()
をUDTを明示的に作成/変更するために使用できます。
宣言すると、型は自動的にドライバーに登録されるため、クエリの結果はUserType
クラスのインスタンスを返します。
最後に
今回は、Cassandraを使ったPythonプログラミングにおいて、オブジェクトマッピングがどのように機能するかを紹介しました。もちろん、オブジェクトマッピングを使わず、CQLクエリを使ってプログラミングすることも可能です。ここでは、実際にプロジェクトでCassandraを使うにあたって、利用することのできるオプションとして、オブジェクトマッピングについて紹介しました。