LoginSignup
1
0

More than 1 year has passed since last update.

SQLAlchemyを使ってtimedeltaを含む辞書を読み書きする

Last updated at Posted at 2022-07-04

はじめに

ここではtimedeltaを値とするような辞書オブジェクトをSQLAlchemy を使ってO/R Mappingする方法を紹介します。この方法では指定カラムのtimedeltaオブジェクトを秒数(整数)に変換してDBに格納し、またDB指定カラム上の整数をtimedeltaオブジェクトに変換して辞書を取得します。

なお当初公開した方法はよろしくないということに気が付いたため、最後にマシな方法を追加しています。

以下の環境で動作確認しました。

  • Windows 8.1
  • Python 3.9.12
  • SQLAlchemy1.4.3
  • SQLite3

この記事では、まず入出力するデータを説明し、次に通常のJSONにシリアライズ可能なデータの入出力を記載します。その次にtimedeltaを含む辞書オブジェクトをDBに格納する方法についていくつかの試行錯誤を経ながら説明していきます。

どのようなデータを格納しようとしているのか

工場で製造する製品を切り替えるごとに製造機器の清掃が必要だとします。とくに食品や化学製品を製造する工場の場合、その前後の製品の組み合わせによって清掃に必要な人数・時間が異なってくることがあります。

製品同士のこれらの組み合わせは表で記載することができます。

  • 調理器具の清掃に必要な人数(人数は適当)
後製品\前製品 0:回鍋肉 1:天津飯 2ハンバーグ
0:回鍋肉 0人 1人 0人
1:天津飯 3人 0人 3人
2:ハンバーグ 1人 0人 0人
  • 調理器具の清掃に必要な時間(時間は適当)
後製品\前製品 0:回鍋肉 1:天津飯 2:ハンバーグ
0:回鍋肉 0秒 21秒 66秒
1:天津飯 80秒 0秒 36秒
2:ハンバーグ 4秒 112秒 0秒

たとえば回鍋肉を作ったあと天津飯を作る場合は、前製品が回鍋肉、後製品が天津飯であるセルを確認します。調理器具の清掃に必要な人数が3人、時間が80秒であることがよみとれます。

これらの表を以下のように行ごとに後製品のコードと前製品のコードをキーとする辞書の形式で取り扱うことにします。

  • 洗浄人数マトリクス
後製品コード 前製品コード辞書
0 { 0: 0, 1: 1, 2: 0}
1 { 0: 3, 1: 0, 2: 3}
2 { 0: 1, 1: 0, 2: 0}
  • 洗浄時間マトリクス
後製品コード 前製品コード辞書
0 { 0: timedelta(seconds=0), 1: timedelta(seconds=21), 2: timedelta(seconds=66) }
1 { 0: timedelta(seconds=80), 1: timedelta(seconds=0), 2: timedelta(seconds=36) }
2 { 0: timedelta(seconds=4), 1: timedelta(seconds=112), 2: timedelta(seconds=0) }

洗浄人数マトリクスの方は、そのままデータベースに格納すればよいのですが、洗浄時間マトリクスの方は timedelta オブジェクトを要素としてもつため透過的にO/R Mappingできません。

JSONにシリアライズ/デシリアライズ可能な辞書を格納する場合

まず洗浄人数マトリクスをSQLAlchemyを使ってO/R Mappingしてみます。

WashingPersonMatrix.py
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import Column
from sqlalchemy.types import Integer, JSON
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

Base = declarative_base()
class WashingPersonMatrix ( Base ) :
	__tablename__ = 'washing_person_matrix'
	# 後製品コード
	post_product_code = Column ( Integer, primary_key=True )
	# 前製品コード辞書
	pre_product_code_dictionary = Column ( JSON )

DATA = [
	( 0, { 0: 0, 1: 1, 2: 0 } )
	, ( 1, { 0: 3, 1: 0, 2: 3 } )
	, ( 2, { 0: 1, 1: 0, 2: 0 } )
]
if __name__ == "__main__" :
	dsl = 'sqlite:///:memory:'
	engine = create_engine ( dsl, echo=True )
	# テーブル作成
	Base.metadata.create_all ( bind=engine )
	# テーブルにデータ投入
	with Session ( engine ) as session :
		for row in DATA :
			row_obj = WashingPersonMatrix ( post_product_code=row[0], pre_product_code_dictionary=row[1] )
			session.add ( row_obj )
		session.commit()
	# 投入データ確認
	with Session ( engine ) as session :
		for row in session.query ( WashingPersonMatrix ).all() :
			print ( row.post_product_code, row.pre_product_code_dictionary )

このコードは、1つ目のカラムがIntegerの後製品コード(post_product_code)、2つ目のカラムがJSONの前製品コード辞書(pre_product_code_dictionary)であるようなテーブル washing_person_matrixを作成して上述のデータを投入しまたそのデータを表示します。

前章で記載の通りここで投入する辞書データは { 0: 0, 1: 1, 2: 0 } のようにそのままJSONにシリアライズ可能なため、単純に sqlalchemy.types.JSONを使用することが可能です。

実行結果は以下のようになります。

テーブル作成部のログ
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("washing_person_matrix")
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine [raw sql] ()
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("washing_person_matrix")
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine [raw sql] ()
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine 
CREATE TABLE washing_person_matrix (
	post_product_code INTEGER NOT NULL, 
	pre_product_code_dictionary JSON, 
	PRIMARY KEY (post_product_code)
)


2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine [no key 0.00012s] ()
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine COMMIT
  • CREATE TABLE していることがわかります。
テーブルにデータ投入部のログ
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine INSERT INTO washing_person_matrix (post_product_code, pre_product_code_dictionary) VALUES (?, ?)
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine [generated in 0.00020s] ((0, '{"0": 0, "1": 1, "2": 0}'), (1, '{"0": 3, "1": 0, "2": 3}'), (2, '{"0": 1, "1": 0, "2": 0}'))
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine COMMIT
  • INSERT INTOしていることがわかります。
投入データ確認部のログ
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine SELECT washing_person_matrix.post_product_code AS washing_person_matrix_post_product_code, washing_person_matrix.pre_product_code_dictionary AS washing_person_matrix_pre_product_code_dictionary 
FROM washing_person_matrix
2022-07-01 15:38:13,529 INFO sqlalchemy.engine.Engine [generated in 0.00013s] ()
  • SELECTしていることがわかります。
0 {'0': 0, '1': 1, '2': 0}
1 {'0': 3, '1': 0, '2': 3}
2 {'0': 1, '1': 0, '2': 0}
  • 想定通りの値を得られることがわかります。

JSONにシリアライズ/デシリアライズ不可能な辞書を格納する場合

洗浄時間マトリクスをSQLAlchemyを使ってO/R Mappingしてみます。

【失敗】sqlalchemy.types.JSONを使用する

まずは前述の WashingPersonMatrix と同じようにそのまま sqlalchemy.types.JSON を使ってみます。

WashingPersonMatrix1.py
from datetimes import timedelta
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.schema import Column
from sqlalchemy.types import Integer, JSON
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

Base = declarative_base()
class WashingTimeMatrix ( Base ) :
	__tablename__ = 'washing_time_matrix'
	# 後製品コード
	post_product_code = Column ( Integer, primary_key=True )
	# 前製品コード辞書
	pre_product_code_dictionary = Column ( JSON )

DATA = [
	( 0, { 0: timedelta(seconds=0), 1: timedelta(seconds=21), 2: timedelta(seconds=66) } )
	, ( 1,  { 0: timedelta(seconds=80), 1: timedelta(seconds=0), 2: timedelta(seconds=36) } )
	, ( 2,  { 0: timedelta(seconds=4), 1: timedelta(seconds=112), 2: timedelta(seconds=0) } )
]
if __name__ == "__main__" :
	dsl = 'sqlite:///:memory:'
	engine = create_engine ( dsl, echo=True )
	# テーブル作成
	Base.metadata.create_all ( bind=engine )
	# テーブルにデータ投入
	with Session ( engine ) as session :
		for row in DATA :
			row_obj = WashingTimeMatrix ( post_product_code=row[0], pre_product_code_dictionary=row[1] )
			session.add ( row_obj )
		session.commit()
	# 投入データ確認
	with Session ( engine ) as session :
		for row in session.query ( WashingTimeMatrix ).all() :
			print ( row.post_product_code, row.pre_product_code_dictionary )

このコードは、1つ目のカラムがIntegerの後製品コード(post_product_code)、2つ目のカラムがJSONの前製品コード辞書(pre_product_code_dictionary)であるようなテーブル washing_time_matrixを作成して上述のデータを投入したり、また、そのデータを表示したりしようとします。

ここで投入する辞書オブジェクトは { 0: timedelta(seconds=0), 1: timedelta(seconds=21), 2: timedelta(seconds=66) } でありそのままJSONにシリアライズできません。そのため、実行するとデータ投入時に以下のような例外が発生します。

TypeError: Object of type timedelta is not JSON serializable

【失敗】sqlalchemy.types.UserDefinedTypeを使用する

sqlalchemy.types.UserDefinedTypeというのがあるので使ってみた(が失敗した)。

from sqlalchemy.types import UserDefinedType
class JsonTimedelta ( UserDefinedType ) :
	def get_col_spec(self, **kw):
		return "JsonTimedelta"
	def bind_processor ( self, dialect ) :
		def process ( value ) :
			value = { k: v / timedelta ( seconds=1 ) if v is not None else None for k, v in value.items() }
			serialized = json_serializer ( value )
			return serialized
	def result_processor ( self, dialect, coltype ) :
		def process ( value ) :
			value = { k: timedelta ( seconds=v ) if v is not None else None for k, v in value.items() }
			return value

ここに記載したように sqlalchemy.types.UserDefinedType を継承する JsonTimedelta クラスを定義しました。bind時(DB書き込み時:bind_processor内のprocess関数)にtimedeltaオブジェクトを秒数単位に変換し、取得時(result_processor内のprocess関数)にtimedeltaオブジェクトに変換しています。そして以下のようにこの型をWashingTimeMatrixpost_product_codeカラムで使ってみます。

class WashingTimeMatrix ( Base ) :
	__tablename__ = 'washing_time_matrix'
	# 後製品コード
	post_product_code = Column ( Integer, primary_key=True )
	# 前製品コード辞書 ; ここをJSONからJsonTimedeltaに変更
	pre_product_code_dictionary = Column ( JsonTimedelta )

実行すると以下のようなエラーとなりダメでした。おそらくJsonTimedelta型への変換がsqliteで定義されていないのが原因と思います(深くは追及してないので違うかもしれません)。

sqlite3.InterfaceError: Error binding parameter 1 - probably unsupported type.
[SQL: INSERT INTO washing_time_matrix (post_product_code, pre_product_code_dictionary) VALUES (?, ?)]
[parameters: ((0, {0: datetime.timedelta(0), 1: datetime.timedelta(seconds=21), 2: datetime.timedelta(seconds=66)}), (1, {0: datetime.timedelta(seconds=80), 1: datetime.timedelta(0), 2: datetime.timedelta(seconds=36)}), (2, {0: datetime.timedelta(seconds=4), 1: datetime.timedelta(seconds=112), 2: datetime.timedelta(0)}))]

【失敗】sqlalchemy.types.JSONを継承する

slalchemy.types.UserDefinedTypeを使った例から、bind_processorresult_processorをオーバーライドすればよいのかもしれないと思い至ります。
sqlalchemy.types.JSONのコードから bind_processorメソッドおよび result_processorメソッドをコピーして以下 #追加したコード に記載のコードを追加してみます。

from datetime import timedelta
from sqlalchemy.types import JSON
import sqlalchemy.sql.elements as elements
class JsonTimedelta ( JSON ) :
	def bind_processor ( self, dialect ) :
		string_process = self._str_impl.bind_processor ( dialect )

		json_serializer = dialect._json_serializer or json.dumps

		def process ( value ) :
			if value is self.NULL:
				value = None
			elif isinstance ( value, elements.Null ) or (
				value is None and self.none_as_null
			) :
				return None

			# 追加したコード
			value = { k: v / timedelta ( seconds=1 ) if v is not None else None for k, v in value.items() }
			serialized = json_serializer ( value )
			if string_process :
				serialized = string_process ( serialized )
			return serialized

		return process

	def result_processor ( self, dialect, coltype ) :
		string_process = self._str_impl.result_processor ( dialect, coltype )
		json_deserializer = dialect._json_deserializer or json.loads

		def process ( value ) :
			if value is None:
				return None
			if string_process:
				value = string_process(value)
			value = json_deserializer ( value )
			# 追加したコード
			value = { k: timedelta ( seconds=v ) if v is not None else None for k, v in value.items() }
			return value

		return process

実行してみるとなぜか指定したJsonTimedeltaクラスではなくsqlalchemy.types.JSONクラスで例外が発生しました。

TypeError: Object of type timedelta is not JSON serializable

調べてみると(ステップ実行してやっとわかった)sqlalchemy.types.JSONクラスは、DBエンジン別の実装のファサードクラスだということが分かり、また SQLiteエンジン使用時のJSON 型処理時の実体はdialect/sqlite/json.pyであることが分かりました。

このdialect/sqlite/json.pyモジュールでのJSONクラス定義をみてみると以下のようになっています。

class JSON(sqltypes.JSON):

つまり、sqlalchemy.types.JSON ファサードが実行時に選ぶクラスは sqlalchemy.dialects.sqlite.jsonであり、作成したJsonTimedeltaではないということが分かります。

【非推奨】sqlalchemy.types.JSONを継承しprocessor取得メソッドをオーバーライド

この方法はお勧めできません、次の節の方法を取った方が良いと思います。
要は実行時に処理するクラスを選択する際にJsonTimedeltaを推せばよいということになります。
そこでsqlalchemy.types.JSONファサードでのクラス選択の処理を JsonTimedeltaクラスでオーバーライドすることにしました。ただ、これが本質的な解決だとは思わない(このあたりが場当たり的な対応というわけです)。

今回は_cached_bind_processorメソッドと_cached_result_processorメソッドとをオーバーライドしてJsonTimedelta自身のprocessorを取得するようにしました。

from datetime import timedelta
from sqlalchemy.types import JSON
import sqlalchemy.sql.elements as elements
class JsonTimedelta ( JSON ) :
	def _cached_bind_processor ( self, dialect ) :
		return self.bind_processor ( dialect )
	def _cached_result_processor ( self, dialect, coltype ) :
		return self.result_processor ( dialect, coltype )
	def bind_processor ( self, dialect ) :
		string_process = self._str_impl.bind_processor ( dialect )

		json_serializer = dialect._json_serializer or json.dumps

		def process ( value ) :
			if value is self.NULL:
				value = None
			elif isinstance ( value, elements.Null ) or (
				value is None and self.none_as_null
			) :
				return None

			# convert timedelta to int
			value = { k: v / timedelta ( minutes=1 ) if v is not None else None for k, v in value.items() }
			serialized = json_serializer ( value )
			if string_process :
				serialized = string_process ( serialized )
			return serialized

		return process

	def result_processor ( self, dialect, coltype ) :
		string_process = self._str_impl.result_processor ( dialect, coltype )
		json_deserializer = dialect._json_deserializer or json.loads

		def process ( value ) :
			if value is None:
				return None
			if string_process:
				value = string_process(value)
			value = json_deserializer ( value )
			value = { k: timedelta ( minutes=v ) if v is not None else None for k, v in value.items() }
			return value

		return process

JsonTimedelta型を使うとObjectからModelに変換する際はtimedelta型からその秒数に変換し、ModelからObjectに変換する際は数値からtimedeltaに変換できます。実行結果は以下のようになります。

テーブル作成部のログ
2022-07-04 11:18:35,806 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-04 11:18:35,806 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("washing_time_matrix")
2022-07-04 11:18:35,806 INFO sqlalchemy.engine.Engine [raw sql] ()
2022-07-04 11:18:35,806 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("washing_time_matrix")
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine [raw sql] ()
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine 
CREATE TABLE washing_time_matrix (
	post_product_code INTEGER NOT NULL, 
	pre_product_code_dictionary JSON, 
	PRIMARY KEY (post_product_code)
)


2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine [no key 0.00028s] ()
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine COMMIT
  • CREATE TABLEでpre_product_code_dictionary列をJSON型として作成していることがわかる。
テーブルにデータ投入部のログ
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine INSERT INTO washing_time_matrix (post_product_code, pre_product_code_dictionary) VALUES (?, ?)
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine [generated in 0.00032s] ((0, '{"0": 0.0, "1": 21.0, "2": 66.0}'), (1, '{"0": 80.0, "1": 0.0, "2": 36.0}'), (2, '{"0": 4.0, "1": 112.0, "2": 0.0}'))
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine COMMIT
  • INSERT INTO時にはtimedeltaの値が数値になっていることが分かる。
投入データ確認部のログ
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine SELECT washing_time_matrix.post_product_code AS washing_time_matrix_post_product_code, washing_time_matrix.pre_product_code_dictionary AS washing_time_matrix_pre_product_code_dictionary 
FROM washing_time_matrix
2022-07-04 11:18:35,821 INFO sqlalchemy.engine.Engine [generated in 0.00026s] ()
  • 投入したデータをSELECTで取得しています。
0 {'0': datetime.timedelta(0), '1': datetime.timedelta(seconds=21), '2': datetime.timedelta(seconds=66)}
1 {'0': datetime.timedelta(seconds=80), '1': datetime.timedelta(0), '2': datetime.timedelta(seconds=36)}
2 {'0': datetime.timedelta(seconds=4), '1': datetime.timedelta(seconds=112), '2': datetime.timedelta(0)}
  • 取得した結果辞書の値がtimedeltaオブジェクトになっていることが分かります。

【成功】sqlalchemy.ext.hybrid.hybrid_propertyを使ってgetter, setterを定義

前節の方法はプライベートメソッドをオーバーライドしており、ハッキリ言って悪いということに気が付きました。
公式ドキュメントによればもっと良い方法があるのでそちらを記載します。

from sqlalchemy.ext.hybrid import hybrid_property

Base = declarative_base()
class WashingTimeMatrix ( Base ) :
	__tablename__ = 'washing_time_matrix'
	# 後製品コード
	post_product_code = Column ( Integer, primary_key=True )
	# 前製品コード辞書
	_pre_product_code_dictionary = Column ( 'pre_product_code_dictionary', JSON )

	@hybrid_property
	def pre_product_code_dictionary ( self ) :
		value = self._pre_product_code_dictionary
		if isinstance ( value, dict ) :
			value = { k: timedelta ( seconds=v ) for k, v in value.items() }
		return value
	@pre_product_code_dictionary.setter
	def pre_product_code_dictionary ( self, value ) : 
		value = { k: v / timedelta ( seconds=1 ) if v is not None else None for k, v in value.items() }
		self._pre_product_code_dictionary = value

getterである @hybrid_propertyを付与したメソッドでDBの値をtimedelta()に変換し、@pre_product_code_dictionary.setterを付与したsetterでtimedeltaの値を数値に変換しています。

まとめ

dialect_json_serializerを指定すれば、上述のような作業は不要かもしれないがこの場合、すべてのJSON処理が変わってしまう。
通常のJSON処理と今回のように疑似的にユーザ定義のJSON型を作成するような処理を混在する場合は、ここで紹介したような方法もよいかもしれない。

なおMySQL8.0.23の場合idカラムの定義を以下のようにpost_product_codeカラムの低位gにautoincremnt=Falseとオプションを追加することで同様の結果が得られることも確認している。

	post_product_code = Column ( Integer, primary_key=True, autoincrement=False )

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0