はじめに
ここではtimedeltaを値とするような辞書オブジェクトをSQLAlchemy を使ってO/R Mappingする方法を紹介します。この方法では指定カラムのtimedeltaオブジェクトを秒数(整数)に変換してDBに格納し、またDB指定カラム上の整数をtimedeltaオブジェクトに変換して辞書を取得します。
なお当初公開した方法はよろしくないということに気が付いたため、最後にマシな方法を追加しています。
以下の環境で動作確認しました。
- Windows 8.1
- Python 3.9.12
- SQLAlchemy1.4.3
- SQLite3
この記事では、まず入出力するデータを説明し、次に通常のJSONにシリアライズ可能なデータの入出力を記載します。その次にtimedeltaを含む辞書オブジェクトをDBに格納する方法についていくつかの試行錯誤を経ながら説明していきます。
どのようなデータを格納しようとしているのか
工場で製造する製品を切り替えるごとに製造機器の清掃が必要だとします。とくに食品や化学製品を製造する工場の場合、その前後の製品の組み合わせによって清掃に必要な人数・時間が異なってくることがあります。
製品同士のこれらの組み合わせは表で記載することができます。
- 調理器具の清掃に必要な人数(人数は適当)
後製品\前製品 | 0:回鍋肉 | 1:天津飯 | 2ハンバーグ |
---|---|---|---|
0:回鍋肉 | 0人 | 1人 | 0人 |
1:天津飯 | 3人 | 0人 | 3人 |
2:ハンバーグ | 1人 | 0人 | 0人 |
- 調理器具の清掃に必要な時間(時間は適当)
後製品\前製品 | 0:回鍋肉 | 1:天津飯 | 2:ハンバーグ |
---|---|---|---|
0:回鍋肉 | 0秒 | 21秒 | 66秒 |
1:天津飯 | 80秒 | 0秒 | 36秒 |
2:ハンバーグ | 4秒 | 112秒 | 0秒 |
たとえば回鍋肉を作ったあと天津飯を作る場合は、前製品が回鍋肉、後製品が天津飯であるセルを確認します。調理器具の清掃に必要な人数が3人、時間が80秒であることがよみとれます。
これらの表を以下のように行ごとに後製品のコードと前製品のコードをキーとする辞書の形式で取り扱うことにします。
- 洗浄人数マトリクス
後製品コード | 前製品コード辞書 |
---|---|
0 | { 0: 0, 1: 1, 2: 0} |
1 | { 0: 3, 1: 0, 2: 3} |
2 | { 0: 1, 1: 0, 2: 0} |
- 洗浄時間マトリクス
後製品コード | 前製品コード辞書 |
---|---|
0 | { 0: timedelta(seconds=0), 1: timedelta(seconds=21), 2: timedelta(seconds=66) } |
1 | { 0: timedelta(seconds=80), 1: timedelta(seconds=0), 2: timedelta(seconds=36) } |
2 | { 0: timedelta(seconds=4), 1: timedelta(seconds=112), 2: timedelta(seconds=0) } |
洗浄人数マトリクスの方は、そのままデータベースに格納すればよいのですが、洗浄時間マトリクスの方は timedelta
オブジェクトを要素としてもつため透過的にO/R Mappingできません。
JSONにシリアライズ/デシリアライズ可能な辞書を格納する場合
まず洗浄人数マトリクスをSQLAlchemyを使ってO/R Mappingしてみます。
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import Column
from sqlalchemy.types import Integer, JSON
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
Base = declarative_base()
class WashingPersonMatrix ( Base ) :
__tablename__ = 'washing_person_matrix'
# 後製品コード
post_product_code = Column ( Integer, primary_key=True )
# 前製品コード辞書
pre_product_code_dictionary = Column ( JSON )
DATA = [
( 0, { 0: 0, 1: 1, 2: 0 } )
, ( 1, { 0: 3, 1: 0, 2: 3 } )
, ( 2, { 0: 1, 1: 0, 2: 0 } )
]
if __name__ == "__main__" :
dsl = 'sqlite:///:memory:'
engine = create_engine ( dsl, echo=True )
# テーブル作成
Base.metadata.create_all ( bind=engine )
# テーブルにデータ投入
with Session ( engine ) as session :
for row in DATA :
row_obj = WashingPersonMatrix ( post_product_code=row[0], pre_product_code_dictionary=row[1] )
session.add ( row_obj )
session.commit()
# 投入データ確認
with Session ( engine ) as session :
for row in session.query ( WashingPersonMatrix ).all() :
print ( row.post_product_code, row.pre_product_code_dictionary )
このコードは、1つ目のカラムがIntegerの後製品コード(post_product_code
)、2つ目のカラムがJSONの前製品コード辞書(pre_product_code_dictionary
)であるようなテーブル washing_person_matrix
を作成して上述のデータを投入しまたそのデータを表示します。
前章で記載の通りここで投入する辞書データは { 0: 0, 1: 1, 2: 0 }
のようにそのままJSONにシリアライズ可能なため、単純に sqlalchemy.types.JSON
を使用することが可能です。
実行結果は以下のようになります。
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("washing_person_matrix")
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine [raw sql] ()
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("washing_person_matrix")
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine [raw sql] ()
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine
CREATE TABLE washing_person_matrix (
post_product_code INTEGER NOT NULL,
pre_product_code_dictionary JSON,
PRIMARY KEY (post_product_code)
)
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine [no key 0.00012s] ()
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine COMMIT
- CREATE TABLE していることがわかります。
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine INSERT INTO washing_person_matrix (post_product_code, pre_product_code_dictionary) VALUES (?, ?)
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine [generated in 0.00020s] ((0, '{"0": 0, "1": 1, "2": 0}'), (1, '{"0": 3, "1": 0, "2": 3}'), (2, '{"0": 1, "1": 0, "2": 0}'))
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine COMMIT
- INSERT INTOしていることがわかります。
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine SELECT washing_person_matrix.post_product_code AS washing_person_matrix_post_product_code, washing_person_matrix.pre_product_code_dictionary AS washing_person_matrix_pre_product_code_dictionary
FROM washing_person_matrix
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine [generated in 0.00013s] ()
- SELECTしていることがわかります。
0 {'0': 0, '1': 1, '2': 0}
1 {'0': 3, '1': 0, '2': 3}
2 {'0': 1, '1': 0, '2': 0}
- 想定通りの値を得られることがわかります。
JSONにシリアライズ/デシリアライズ不可能な辞書を格納する場合
洗浄時間マトリクスをSQLAlchemyを使ってO/R Mappingしてみます。
【失敗】sqlalchemy.types.JSONを使用する
まずは前述の WashingPersonMatrix
と同じようにそのまま sqlalchemy.types.JSON
を使ってみます。
from datetimes import timedelta
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import Column
from sqlalchemy.types import Integer, JSON
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
Base = declarative_base()
class WashingTimeMatrix ( Base ) :
__tablename__ = 'washing_time_matrix'
# 後製品コード
post_product_code = Column ( Integer, primary_key=True )
# 前製品コード辞書
pre_product_code_dictionary = Column ( JSON )
DATA = [
( 0, { 0: timedelta(seconds=0), 1: timedelta(seconds=21), 2: timedelta(seconds=66) } )
, ( 1, { 0: timedelta(seconds=80), 1: timedelta(seconds=0), 2: timedelta(seconds=36) } )
, ( 2, { 0: timedelta(seconds=4), 1: timedelta(seconds=112), 2: timedelta(seconds=0) } )
]
if __name__ == "__main__" :
dsl = 'sqlite:///:memory:'
engine = create_engine ( dsl, echo=True )
# テーブル作成
Base.metadata.create_all ( bind=engine )
# テーブルにデータ投入
with Session ( engine ) as session :
for row in DATA :
row_obj = WashingTimeMatrix ( post_product_code=row[0], pre_product_code_dictionary=row[1] )
session.add ( row_obj )
session.commit()
# 投入データ確認
with Session ( engine ) as session :
for row in session.query ( WashingTimeMatrix ).all() :
print ( row.post_product_code, row.pre_product_code_dictionary )
このコードは、1つ目のカラムがIntegerの後製品コード(post_product_code
)、2つ目のカラムがJSONの前製品コード辞書(pre_product_code_dictionary
)であるようなテーブル washing_time_matrix
を作成して上述のデータを投入したり、また、そのデータを表示したりしようとします。
ここで投入する辞書オブジェクトは { 0: timedelta(seconds=0), 1: timedelta(seconds=21), 2: timedelta(seconds=66) }
でありそのままJSONにシリアライズできません。そのため、実行するとデータ投入時に以下のような例外が発生します。
TypeError: Object of type timedelta is not JSON serializable
【失敗】sqlalchemy.types.UserDefinedTypeを使用する
sqlalchemy.types.UserDefinedTypeというのがあるので使ってみた(が失敗した)。
from sqlalchemy.types import UserDefinedType
class JsonTimedelta ( UserDefinedType ) :
def get_col_spec(self, **kw):
return "JsonTimedelta"
def bind_processor ( self, dialect ) :
def process ( value ) :
value = { k: v / timedelta ( seconds=1 ) if v is not None else None for k, v in value.items() }
serialized = json_serializer ( value )
return serialized
def result_processor ( self, dialect, coltype ) :
def process ( value ) :
value = { k: timedelta ( seconds=v ) if v is not None else None for k, v in value.items() }
return value
ここに記載したように sqlalchemy.types.UserDefinedType
を継承する JsonTimedelta
クラスを定義しました。bind時(DB書き込み時:bind_processor
内のprocess
関数)にtimedeltaオブジェクトを秒数単位に変換し、取得時(result_processor
内のprocess
関数)にtimedeltaオブジェクトに変換しています。そして以下のようにこの型をWashingTimeMatrix
の post_product_code
カラムで使ってみます。
class WashingTimeMatrix ( Base ) :
__tablename__ = 'washing_time_matrix'
# 後製品コード
post_product_code = Column ( Integer, primary_key=True )
# 前製品コード辞書 ; ここをJSONからJsonTimedeltaに変更
pre_product_code_dictionary = Column ( JsonTimedelta )
実行すると以下のようなエラーとなりダメでした。おそらくJsonTimedelta
型への変換がsqliteで定義されていないのが原因と思います(深くは追及してないので違うかもしれません)。
sqlite3.InterfaceError: Error binding parameter 1 - probably unsupported type.
[SQL: INSERT INTO washing_time_matrix (post_product_code, pre_product_code_dictionary) VALUES (?, ?)]
[parameters: ((0, {0: datetime.timedelta(0), 1: datetime.timedelta(seconds=21), 2: datetime.timedelta(seconds=66)}), (1, {0: datetime.timedelta(seconds=80), 1: datetime.timedelta(0), 2: datetime.timedelta(seconds=36)}), (2, {0: datetime.timedelta(seconds=4), 1: datetime.timedelta(seconds=112), 2: datetime.timedelta(0)}))]
【失敗】sqlalchemy.types.JSONを継承する
slalchemy.types.UserDefinedType
を使った例から、bind_processor
とresult_processor
をオーバーライドすればよいのかもしれないと思い至ります。
sqlalchemy.types.JSON
のコードから bind_processor
メソッドおよび result_processor
メソッドをコピーして以下 #追加したコード
に記載のコードを追加してみます。
from datetime import timedelta
from sqlalchemy.types import JSON
import sqlalchemy.sql.elements as elements
class JsonTimedelta ( JSON ) :
def bind_processor ( self, dialect ) :
string_process = self._str_impl.bind_processor ( dialect )
json_serializer = dialect._json_serializer or json.dumps
def process ( value ) :
if value is self.NULL:
value = None
elif isinstance ( value, elements.Null ) or (
value is None and self.none_as_null
) :
return None
# 追加したコード
value = { k: v / timedelta ( seconds=1 ) if v is not None else None for k, v in value.items() }
serialized = json_serializer ( value )
if string_process :
serialized = string_process ( serialized )
return serialized
return process
def result_processor ( self, dialect, coltype ) :
string_process = self._str_impl.result_processor ( dialect, coltype )
json_deserializer = dialect._json_deserializer or json.loads
def process ( value ) :
if value is None:
return None
if string_process:
value = string_process(value)
value = json_deserializer ( value )
# 追加したコード
value = { k: timedelta ( seconds=v ) if v is not None else None for k, v in value.items() }
return value
return process
実行してみるとなぜか指定したJsonTimedelta
クラスではなくsqlalchemy.types.JSON
クラスで例外が発生しました。
TypeError: Object of type timedelta is not JSON serializable
調べてみると(ステップ実行してやっとわかった)sqlalchemy.types.JSON
クラスは、DBエンジン別の実装のファサードクラスだということが分かり、また SQLiteエンジン使用時のJSON
型処理時の実体はdialect/sqlite/json.pyであることが分かりました。
このdialect/sqlite/json.py
モジュールでのJSONクラス定義をみてみると以下のようになっています。
class JSON(sqltypes.JSON):
つまり、sqlalchemy.types.JSON
ファサードが実行時に選ぶクラスは sqlalchemy.dialects.sqlite.json
であり、作成したJsonTimedelta
ではないということが分かります。
【非推奨】sqlalchemy.types.JSONを継承しprocessor取得メソッドをオーバーライド
この方法はお勧めできません、次の節の方法を取った方が良いと思います。
要は実行時に処理するクラスを選択する際にJsonTimedelta
を推せばよいということになります。
そこでsqlalchemy.types.JSON
ファサードでのクラス選択の処理を JsonTimedelta
クラスでオーバーライドすることにしました。ただ、これが本質的な解決だとは思わない(このあたりが場当たり的な対応というわけです)。
今回は_cached_bind_processor
メソッドと_cached_result_processor
メソッドとをオーバーライドしてJsonTimedelta
自身のprocessorを取得するようにしました。
from datetime import timedelta
from sqlalchemy.types import JSON
import sqlalchemy.sql.elements as elements
class JsonTimedelta ( JSON ) :
def _cached_bind_processor ( self, dialect ) :
return self.bind_processor ( dialect )
def _cached_result_processor ( self, dialect, coltype ) :
return self.result_processor ( dialect, coltype )
def bind_processor ( self, dialect ) :
string_process = self._str_impl.bind_processor ( dialect )
json_serializer = dialect._json_serializer or json.dumps
def process ( value ) :
if value is self.NULL:
value = None
elif isinstance ( value, elements.Null ) or (
value is None and self.none_as_null
) :
return None
# convert timedelta to int
value = { k: v / timedelta ( minutes=1 ) if v is not None else None for k, v in value.items() }
serialized = json_serializer ( value )
if string_process :
serialized = string_process ( serialized )
return serialized
return process
def result_processor ( self, dialect, coltype ) :
string_process = self._str_impl.result_processor ( dialect, coltype )
json_deserializer = dialect._json_deserializer or json.loads
def process ( value ) :
if value is None:
return None
if string_process:
value = string_process(value)
value = json_deserializer ( value )
value = { k: timedelta ( minutes=v ) if v is not None else None for k, v in value.items() }
return value
return process
JsonTimedelta型を使うとObjectからModelに変換する際はtimedelta型からその秒数に変換し、ModelからObjectに変換する際は数値からtimedeltaに変換できます。実行結果は以下のようになります。
2022-07-04 11:18:35,806 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-04 11:18:35,806 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("washing_time_matrix")
2022-07-04 11:18:35,806 INFO sqlalchemy.engine.Engine [raw sql] ()
2022-07-04 11:18:35,806 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("washing_time_matrix")
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine [raw sql] ()
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine
CREATE TABLE washing_time_matrix (
post_product_code INTEGER NOT NULL,
pre_product_code_dictionary JSON,
PRIMARY KEY (post_product_code)
)
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine [no key 0.00028s] ()
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine COMMIT
- CREATE TABLEでpre_product_code_dictionary列をJSON型として作成していることがわかる。
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine INSERT INTO washing_time_matrix (post_product_code, pre_product_code_dictionary) VALUES (?, ?)
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine [generated in 0.00032s] ((0, '{"0": 0.0, "1": 21.0, "2": 66.0}'), (1, '{"0": 80.0, "1": 0.0, "2": 36.0}'), (2, '{"0": 4.0, "1": 112.0, "2": 0.0}'))
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine COMMIT
- INSERT INTO時にはtimedeltaの値が数値になっていることが分かる。
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine SELECT washing_time_matrix.post_product_code AS washing_time_matrix_post_product_code, washing_time_matrix.pre_product_code_dictionary AS washing_time_matrix_pre_product_code_dictionary
FROM washing_time_matrix
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine [generated in 0.00026s] ()
- 投入したデータをSELECTで取得しています。
0 {'0': datetime.timedelta(0), '1': datetime.timedelta(seconds=21), '2': datetime.timedelta(seconds=66)}
1 {'0': datetime.timedelta(seconds=80), '1': datetime.timedelta(0), '2': datetime.timedelta(seconds=36)}
2 {'0': datetime.timedelta(seconds=4), '1': datetime.timedelta(seconds=112), '2': datetime.timedelta(0)}
- 取得した結果辞書の値がtimedeltaオブジェクトになっていることが分かります。
【成功】sqlalchemy.ext.hybrid.hybrid_propertyを使ってgetter, setterを定義
前節の方法はプライベートメソッドをオーバーライドしており、ハッキリ言って悪いということに気が付きました。
公式ドキュメントによればもっと良い方法があるのでそちらを記載します。
from sqlalchemy.ext.hybrid import hybrid_property
Base = declarative_base()
class WashingTimeMatrix ( Base ) :
__tablename__ = 'washing_time_matrix'
# 後製品コード
post_product_code = Column ( Integer, primary_key=True )
# 前製品コード辞書
_pre_product_code_dictionary = Column ( 'pre_product_code_dictionary', JSON )
@hybrid_property
def pre_product_code_dictionary ( self ) :
value = self._pre_product_code_dictionary
if isinstance ( value, dict ) :
value = { k: timedelta ( seconds=v ) for k, v in value.items() }
return value
@pre_product_code_dictionary.setter
def pre_product_code_dictionary ( self, value ) :
value = { k: v / timedelta ( seconds=1 ) if v is not None else None for k, v in value.items() }
self._pre_product_code_dictionary = value
getterである @hybrid_property
を付与したメソッドでDBの値をtimedelta()に変換し、@pre_product_code_dictionary.setter
を付与したsetterでtimedeltaの値を数値に変換しています。
まとめ
dialect
の_json_serializer
を指定すれば、上述のような作業は不要かもしれないがこの場合、すべてのJSON処理が変わってしまう。
通常のJSON処理と今回のように疑似的にユーザ定義のJSON型を作成するような処理を混在する場合は、ここで紹介したような方法もよいかもしれない。
なおMySQL8.0.23の場合idカラムの定義を以下のようにpost_product_code
カラムの低位gにautoincremnt=False
とオプションを追加することで同様の結果が得られることも確認している。
post_product_code = Column ( Integer, primary_key=True, autoincrement=False )