この記事は、Pythonその2 Advent Calendar 2020 23日目の記事です。
自己紹介
バックエンドのエンジニアとしてそろそろ8年が経ちます、星光輝と申します。
守備範囲はサーバーサイドですが、アプリ・インフラ・テスト・開発環境改善あたりの経験もある、広く浅い系エンジニアです。
最近は chalice というフレームワークを使い、およそ1年近く python に関わっています。
今回書くこと
API を作る時に、入力として受け取った JSON を意図したクラスにマッピングして欲しいなぁ...と思うことがあり、
python にそれっぽいライブラリがなかったので、自作したお話です。
python のバージョンは 3.8 です (from typing import get_origin get_args /できるのが3.8以降)。
なぜ作るのか?
汎用ライブラリの json の json.loads でも簡易的にはできるようなんですが、
そのために設定を頑張って書かないといけなさそうだし、汎用的に書くのは難しそうな感じがしました。
Java でも 古くから Jackson という有名なライブラリがあるのだから、探せばあるだろうと思いましたが、それらしい記載は出てきません。
そこで、いっそのこと作ってしまおうということで作りました。
作っていく
全体的な設計方針
クラスのインスタンスを何らかのデータ形式に落とすことをシリアライズと言い、
逆に何らかのデータ形式をクラスのインスタンスに変換することをデシリアライズと言います。
ここでは、下記のような使い方をするとしましょう。
@dataclass
class Hello:
hello: str
objectMapper = ObjectMapper()
instance = objectMapper.deserialize('{"hello": "mapper"}', Hello)
print(instance.hello) ## 出力: mapper
まずはインタフェースを書く
import json
from typing import Type, TypeVar, List
from abc import ABCMeta, abstractmethod
class NotImplementedError(Exception):
def __init__(self, message):
super().__init__(message)
class JsonDeserializer(metaclass=ABCMeta):
@abstractmethod
def canDeserialize(self, json: object, mappingClass: type) -> bool:
pass
@abstractmethod
def deserialize(self, json: object, mappingClass: type) -> object:
pass
T = TypeVar('T')
class ObjectMapper:
deserializers: List[JsonDeserializer] = []
def deserialize(self, jsonText: str, mappingClass: Type[T]) -> T:
jsonData = json.loads(jsonText)
for deserializer in ObjectMapper.deserializers:
if deserializer.canDeserialize(jsonData, mappingClass):
return deserializer.deserialize(jsonData, mappingClass)
raise NotImplementedError(f'Cannot deserialize json({jsonData}) to class({mappingClass}).')
このようにして、ObjectMapper 内に JsonDeserializer のオブジェクトをリスト形式で持ち、
対応できる JsonDeserializer が見つかったら(canDeserialize(json, mappingClass) == true)、
それを使って様々なクラスに対して利用できるようにします。
単純ケースの攻略
まずは、リテラルに対する単体テストを書きます。
from main.json_deserializer import ObjectMapper
class TestObjectMapper:
def test_deserializeInt(self):
actual = ObjectMapper().deserialize('1', int)
assert actual == 1
def test_deserializeStr(self):
actual = ObjectMapper().deserialize('"1"', str)
assert actual == '1'
def test_deserializeFloat(self):
actual = ObjectMapper().deserialize('1.5', float)
assert actual == 1.5
def test_deserializeNull(self):
actual = ObjectMapper().deserialize('null', str)
assert actual is None
$ python -m pytest
================================================================ short test summary info =================================================================
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeInt - main.json_deserializer.NotImplementedError: Cannot deserialize json(1) to c...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeStr - main.json_deserializer.NotImplementedError: Cannot deserialize json(1) to c...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeFloat - main.json_deserializer.NotImplementedError: Cannot deserialize json(1.5) ...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeNull - main.json_deserializer.NotImplementedError: Cannot deserialize json(Null) ...
=================================================================== 5 failed in 0.21s ====================================================================```
まだ実装していないので当然こけますね。リテラル用の JsonDeserializer を実装します。
class ObjectMapper:
deserializers: List[JsonDeserializer] = [
LiteralDeserializer() ## 追加
]
class LiteralDeserializer(JsonDeserializer):
def canDeserialize(self, json: object, mappingClass: type) -> bool:
return type(json) in [int, float, str, type(None)]
def deserialize(self, json: object, mappingClass: type) -> object:
return json
collected 4 items
tests/test_object_mapper.py ....
=================================================================== 4 passed in 0.03s ====================================================================
リテラルなんだけど、日時・Enum などの型にしたい場合
今度は下記のようなテストを加えます。
from datetime import datetime, date
from enum import Enum
class Member(Enum):
JOHN = 'john'
BOB = 'bob'
----(省略)----
def test_deserializeDate(self):
actual = ObjectMapper().deserialize('"2020-12-23"', date)
assert actual == date(2020, 12, 23)
def test_deserializeNaiveDateTime(self):
actual = ObjectMapper().deserialize('"2020-12-23T03:00:00"', datetime)
assert actual == datetime(2020, 12, 23, 3, 0, 0)
def test_deserializeAwareDateTime(self):
actual = ObjectMapper().deserialize('"2020-12-23T03:00:00+0900"', datetime)
assert actual == datetime(2020, 12, 23, 3, 0, 0, tzinfo=timezone(timedelta(hours=+9), 'JST'))
def test_deserializeEnum(self):
actual = ObjectMapper().deserialize('"john"', Member)
assert actual is Member.JOHN
日時には、Naive(タイムゾーンなし), Aware(タイムゾーンあり)があります。
異なる日時同士では計算できなかったりして、紛らわしいことがあるので全てを Aware にする方がいいこともありますが、
今回はどっちでも対応するデータを作成できるようにします。
試しにテスト実行
================================================================ short test summary info =================================================================
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeDate - AssertionError: assert '2020-12-23' == datetime.date(2020, 12, 23)
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeNativeDateTime - AssertionError: assert '2020-12-23T03:00:00' == datetime.datetim...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeAwareDateTime - AssertionError: assert '2020-12-23T03:00:00+0900' == datetime.dat...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeEnum - AssertionError: assert 'john' is <Member.JOHN: 'john'>
============================================================== 4 failed, 4 passed in 0.12s ===============================================================
...ってことで、実装。
class DateDeserializer(JsonDeserializer):
def canDeserialize(self, json: object, mappingClass: type) -> bool:
return mappingClass == date
def deserialize(self, json: object, mappingClass: type) -> object:
dt = datetime.strptime(json, '%Y-%m-%d')
return date(dt.year, dt.month, dt.day)
class DatetimeDeserializer(JsonDeserializer):
def canDeserialize(self, json: object, mappingClass: type) -> bool:
return mappingClass == datetime
def deserialize(self, json: object, mappingClass: type) -> object:
try:
return datetime.strptime(json, '%Y-%m-%dT%H:%M:%S%z') # かなり雑..
except ValueError:
return datetime.strptime(json, '%Y-%m-%dT%H:%M:%S')
class EnumDeserializer(JsonDeserializer):
def canDeserialize(self, json: object, mappingClass: type) -> bool:
return issubclass(mappingClass, Enum)
def deserialize(self, json: object, mappingClass: type) -> object:
for enum in mappingClass:
if enum.value == json:
return enum
class ObjectMapper:
deserializers: List[JsonDeserializer] = [
DatetimeDeserializer(), ## 追加
EnumDeserializer(), ## 追加
DateDeserializer(), ## 追加
LiteralDeserializer()
]
collected 8 items
tests/test_object_mapper.py ........
=================================================================== 8 passed in 0.07s ====================================================================
コンテナクラスを扱う
さて次からはコンテナクラスを扱います。今回は、下記のコンテナクラスを作ります。
- List
- Dict
- Object
まずは、コンテナ共通クラスを作ります。このようなことをするのは、
コンテナクラスは内部の別構造に対して、再度デシリアライズ依頼をしなければならないからです。
(少しトリッキーですが、内部構造に対して再度 ObjectMapper._deserialize を呼んでいます)
class ContainerDeserializer(JsonDeserializer):
def deserializeChild(self, json: object, mappingClass: type) -> object:
return ObjectMapper._deserialize(json, mappingClass)
class ObjectMapper:
deserializers: List[JsonDeserializer] = [
...JsonDeserializer
]
def deserialize(self, jsonText: str, mappingClass: Type[T]) -> T:
return self._deserialize(json.loads(jsonText), mappingClass)
@staticmethod
def _deserialize(jsonData: object, mappingClass: Type[T]) -> T:
for deserializer in ObjectMapper.deserializers:
if deserializer.canDeserialize(jsonData, mappingClass):
return deserializer.deserialize(jsonData, mappingClass)
raise NotImplementedError(f'Cannot deserialize json({jsonData}) to class({mappingClass}).')
そして、まずは構造を変えたので既存構造が壊れていないことを確認します。
collected 8 items
tests/test_object_mapper.py ........ [100%]
=================================================================== 8 passed in 0.07s ====================================================================
大丈夫なようですので、再度テストを追加します。
@dataclass
class Person:
name: str
age: int
@dataclass
class Group:
name: str
leader: Person
## ---テスト追加----
def test_deserializeRawList(self):
actual = ObjectMapper().deserialize('[{"age": 35, "name": "鈴木"}, {"age": 21, "name": "山田"}]', list)
assert len(actual) == 2
assert actual[0].age == 35
assert actual[0].name == '鈴木'
assert actual[1].age == 21
assert actual[1].name == '山田'
def test_deserializeTypedList(self):
actual = ObjectMapper().deserialize('[{"age": 35, "name": "鈴木"}, {"age": 21, "name": "山田"}]', List[Person])
assert len(actual) == 2
assert type(actual[0]) == Person
assert actual[0].age == 35
assert actual[0].name == '鈴木'
assert type(actual[1]) == Person
assert actual[1].age == 21
assert actual[1].name == '山田'
def test_deserializeRawDict(self):
actual = ObjectMapper().deserialize('{"ID1":{"age": 35, "name": "鈴木"}, "ID3": {"age": 21, "name": "山田"}}', dict)
assert len(actual) == 2
assert actual['ID1'].age == 35
assert actual['ID1'].name == '鈴木'
assert actual['ID3'].age == 21
assert actual['ID3'].name == '山田'
def test_deserializeTypedDict(self):
actual = ObjectMapper().deserialize(
'{"ID1":{"age": 35, "name": "鈴木"}, "ID3": {"age": 21, "name": "山田"}}',
Dict[str, Person]
)
assert len(actual) == 2
assert type(actual['ID1']) == Person
assert actual['ID1'].age == 35
assert actual['ID1'].name == '鈴木'
assert type(actual['ID3']) == Person
assert actual['ID3'].age == 21
assert actual['ID3'].name == '山田'
def test_deserializeRawObject(self):
actual = ObjectMapper().deserialize('{"name": "グループ", "leader": {"age": 35, "name": "鈴木"}}', object)
assert actual.name == 'グループ'
assert actual.leader.age == 35
assert actual.leader.name == '鈴木'
def test_deserializeTypedObject(self):
actual = ObjectMapper().deserialize('{"name": "グループ", "leader": {"age": 35, "name": "鈴木"}}', Group)
assert type(actual) == Group
assert actual.name == 'グループ'
assert type(actual.leader) == Person
assert actual.leader.age == 35
assert actual.leader.name == '鈴木'
試しにテスト実行...当然実装していn(ry
================================================================ short test summary info =================================================================
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeRawList - main.json_deserializer.NotImplementedError: Cannot deserialize json([{'...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeTypedList - main.json_deserializer.NotImplementedError: Cannot deserialize json([...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeRawDict - main.json_deserializer.NotImplementedError: Cannot deserialize json({'I...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeTypedDict - main.json_deserializer.NotImplementedError: Cannot deserialize json({...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeTypedObject - main.json_deserializer.NotImplementedError: Cannot deserialize json...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeRawObject - main.json_deserializer.NotImplementedError: Cannot deserialize json({...
============================================================== 6 failed, 8 passed in 0.36s ===============================================================
実装。
from inspect import signature, _ParameterKind
from typing import Type, TypeVar, List, Dict, Set, get_origin, get_args
class ListDeserializer(ContainerDeserializer):
def canDeserialize(self, json: object, mappingClass: type) -> bool:
return get_origin(mappingClass) == list or mappingClass == list
def deserialize(self, json: object, mappingClass: type) -> object:
genericParams = get_args(mappingClass)
hasGenericParams = genericParams is not None and len(genericParams) > 0
param = genericParams[0] if hasGenericParams else object
return [self.deserializeChild(el, param) for el in json]
class DictDeserializer(ContainerDeserializer):
def canDeserialize(self, json: object, mappingClass: type) -> bool:
return get_origin(mappingClass) == dict or mappingClass == dict
def deserialize(self, json: object, mappingClass: type) -> object:
genericParams = get_args(mappingClass)
hasKeyParam = genericParams is not None and len(genericParams) > 0
hasValueParam = genericParams is not None and len(genericParams) > 1
return {
self.deserializeChild(k, genericParams[0] if hasKeyParam else object)
: self.deserializeChild(v, genericParams[1] if hasValueParam else object)
for k, v in json.items()
}
class ObjectDeserializer(ContainerDeserializer):
def canDeserialize(self, json: object, mappingClass: type) -> bool:
return type(json) == dict
def deserialize(self, json: object, mappingClass: type) -> object:
if mappingClass == object:
return self.createRawObject(json)
else:
return self.createObject(json, mappingClass)
def createRawObject(self, args: Dict[str, object]) -> object:
className = ''.join(key.title() for key in args.keys()) + 'Obejct'
newInstance = type(className, (object,), {})()
for k, v in args.items():
setattr(newInstance, k, self.deserializeChild(v, object))
return newInstance
def createObject(self, args: Dict[str, object], mappingClass: type) -> object:
annotations = self.findAnnotations(mappingClass)
requireArgs = self.findInitRequireArgs(mappingClass)
result = object.__new__(mappingClass)
initArgs = {}
for k, v in args.items():
val = self.deserializeChild(v, annotations.get(k, object))
if k in requireArgs:
initArgs[k] = val
else:
setattr(result, k, val)
for req in requireArgs:
if req not in initArgs:
initArgs[req] = None
result.__init__(**initArgs)
return result
@staticmethod
def findAnnotations(mappingClass: type) -> Dict[str, type]:
if hasattr(mappingClass, '__annotations__'):
return mappingClass.__annotations__
for k, v in signature(mappingClass.__init__).parameters.items():
return {k:v.annotation for k,v in signature(mappingClass.__init__).parameters.items()}
@staticmethod
def findInitRequireArgs(mappingClass: type) -> Set[str]:
return {
k for k, v in signature(mappingClass.__init__).parameters.items()
if v.name != 'self' and v.kind == _ParameterKind.POSITIONAL_OR_KEYWORD
}
class ObjectMapper:
deserializers: List[JsonDeserializer] = [
DateDeserializer(),
DatetimeDeserializer(),
EnumDeserializer(),
LiteralDeserializer(),
ListDeserializer(), ## 追加
DictDeserializer(), ## 追加
ObjectDeserializer() ## 追加
]
テスト実行
collected 14 items
tests/test_object_mapper.py .............. [100%]
=================================================================== 14 passed in 0.18s ===================================================================
Object デシリアライザの挙動説明
Objectをマッピングするものになると、急に難易度が高くなりましたね。..なので、軽く説明します。
また、オブジェクトに内包されているデータをデシリアライズするには、
データのクラスを取得しなければいけませんが、それが格納されている可能性があるのは下記の二箇所です。
- classに型定義がある場合 (下記コードの Group, GroupWithoutDataclass)
-
__init__
の引数で型を定義している場合 (下記コードの GroupInitDef)
@dataclass
class Person:
name: str
age: int
# パターン1
@dataclass
class Group:
name: str
leader: Person
# パターン2
class GroupWithoutDataclass:
name: str
leader: Person
# パターン3
class GroupInitDef:
def __init__(self, name: str, leader: Person):
self.name = name
self.leader = leader
また、python では Person('AA', 23)
と実行すると Person
クラスの __init__
メソッドが実行されます。
このメソッドでは、可変長引数(*args, **kwargs等)で定義されている変数以外は指定しないとエラーになります。
そのため、def findInitRequireArgs(mappingClass: type) -> Set[str]
で定義すべき変数名を取得しています。
そして、今回はJSONデータとマッピングデータが下記のような関係だった場合に、下記の実装にしています。
- JSONデータ が マッピングに必要なデータを持っていない → None で埋める
- JSONデータ が マッピングに不必要なデータを持っている → setattr で属性追加している
今回は、このようにしていますが、場合によってはエラーにしたほうがいいケースもあるかもしれません。
念の為テストを追加。(Group は先ほどテストしているので省略)
def test_deserializeTypedObjectWithoutDataclass(self):
actual = ObjectMapper().deserialize('{"name": "グループ", "leader": {"age": 35, "name": "鈴木"}}', GroupWithoutDataclass)
assert type(actual) == GroupWithoutDataclass
assert actual.name == 'グループ'
assert type(actual.leader) == Person
assert actual.leader.age == 35
assert actual.leader.name == '鈴木'
def test_deserializeTypedObjectInitDef(self):
actual = ObjectMapper().deserialize('{"name": "グループ", "leader": {"age": 35, "name": "鈴木"}}', GroupInitDef)
assert type(actual) == GroupInitDef
assert actual.name == 'グループ'
assert type(actual.leader) == Person
assert actual.leader.age == 35
assert actual.leader.name == '鈴木'
def test_deserializeTypedObjectEmptyJson(self):
actual = ObjectMapper().deserialize('{}', Group)
assert type(actual) == Group
assert actual.name == None
assert actual.leader == None
def test_deserializeTypedObjectRedundantData(self):
actual = ObjectMapper().deserialize('{"name": "グループ", "id": 2, "leader": {"age": 35, "name": "鈴木", "role": "課長"}}', Group)
assert actual.name == 'グループ'
assert actual.id == 2
assert type(actual.leader) == Person
assert actual.leader.age == 35
assert actual.leader.name == '鈴木'
assert actual.leader.role == '課長'
collected 18 items
tests/test_object_mapper.py .................. [100%]
=================================================================== 18 passed in 0.17s ===================================================================
もっと工夫をするなら...
実際はもうちょっと面倒な仕様があったりするので、プロジェクトに合わせて仕様を調整する必要があるかもしれません。
-
__init__
メソッド に*args: T
,**kwargs: T
のような可変長引数 - デフォルト値への対応
- オブジェクト初期化時には init 実行しないようにして欲しいんだけど?
-
OrderedDict
等の順序があるものにも対応できるのか?
ただ、今回はAPI等で受け取った JSON をクラスにマッピングすることが目的です。
JSONモデルに通常はそんなに複雑な要件は必要ないでしょう。
最後に
今回は、JSON からクラスのインスタンスを生成するオブジェクトマッパーを作成しました。
それほど大変な実装ではないですし、都度都度デシリアライズのロジックを追加していけばどんなものでも対応できるので、
メンテナンスはそれほど大変ではないと思います。
python は型定義がどんどん便利になっていますので、このような仕組みを押さえておくと今後何かいいことがあるかもしれません。