8
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Pythonその2Advent Calendar 2020

Day 23

Python でJSONオブジェクトマッパーを作る

Last updated at Posted at 2020-12-23

この記事は、Pythonその2 Advent Calendar 2020 23日目の記事です。

自己紹介

バックエンドのエンジニアとしてそろそろ8年が経ちます、星光輝と申します。
守備範囲はサーバーサイドですが、アプリ・インフラ・テスト・開発環境改善あたりの経験もある、広く浅い系エンジニアです。
最近は chalice というフレームワークを使い、およそ1年近く python に関わっています。

今回書くこと

API を作る時に、入力として受け取った JSON を意図したクラスにマッピングして欲しいなぁ...と思うことがあり、
python にそれっぽいライブラリがなかったので、自作したお話です。
python のバージョンは 3.8 です (from typing import get_origin get_args /できるのが3.8以降)。

なぜ作るのか?

汎用ライブラリの json の json.loads でも簡易的にはできるようなんですが、
そのために設定を頑張って書かないといけなさそうだし、汎用的に書くのは難しそうな感じがしました。

Java でも 古くから Jackson という有名なライブラリがあるのだから、探せばあるだろうと思いましたが、それらしい記載は出てきません。
そこで、いっそのこと作ってしまおうということで作りました。

作っていく

全体的な設計方針

クラスのインスタンスを何らかのデータ形式に落とすことをシリアライズと言い、
逆に何らかのデータ形式をクラスのインスタンスに変換することをデシリアライズと言います。

ここでは、下記のような使い方をするとしましょう。

@dataclass
class Hello:
    hello: str

objectMapper = ObjectMapper()
instance = objectMapper.deserialize('{"hello": "mapper"}', Hello)
print(instance.hello) ## 出力: mapper

今回は、下記のクラス構造で作ります。
スクリーンショット 2020-12-23 1.28.23.png

まずはインタフェースを書く

import json
from typing import Type, TypeVar, List
from abc import ABCMeta, abstractmethod

class NotImplementedError(Exception):
    def __init__(self, message):
        super().__init__(message)    

class JsonDeserializer(metaclass=ABCMeta):
    @abstractmethod
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        pass

    @abstractmethod
    def deserialize(self, json: object, mappingClass: type) -> object:
        pass

T = TypeVar('T')

class ObjectMapper:
    deserializers: List[JsonDeserializer] = []

    def deserialize(self, jsonText: str, mappingClass: Type[T]) -> T:
        jsonData = json.loads(jsonText)
        for deserializer in ObjectMapper.deserializers:
            if deserializer.canDeserialize(jsonData, mappingClass):
                return deserializer.deserialize(jsonData, mappingClass)
        raise NotImplementedError(f'Cannot deserialize json({jsonData}) to class({mappingClass}).')

このようにして、ObjectMapper 内に JsonDeserializer のオブジェクトをリスト形式で持ち、
対応できる JsonDeserializer が見つかったら(canDeserialize(json, mappingClass) == true)、
それを使って様々なクラスに対して利用できるようにします。

単純ケースの攻略

まずは、リテラルに対する単体テストを書きます。

from main.json_deserializer import ObjectMapper

class TestObjectMapper:
    def test_deserializeInt(self):
        actual = ObjectMapper().deserialize('1', int)
        assert actual == 1

    def test_deserializeStr(self):
        actual = ObjectMapper().deserialize('"1"', str)
        assert actual == '1'

    def test_deserializeFloat(self):
        actual = ObjectMapper().deserialize('1.5', float)
        assert actual == 1.5

    def test_deserializeNull(self):
        actual = ObjectMapper().deserialize('null', str)
        assert actual is None
$ python -m pytest
================================================================ short test summary info =================================================================
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeInt - main.json_deserializer.NotImplementedError: Cannot deserialize json(1) to c...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeStr - main.json_deserializer.NotImplementedError: Cannot deserialize json(1) to c...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeFloat - main.json_deserializer.NotImplementedError: Cannot deserialize json(1.5) ...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeNull - main.json_deserializer.NotImplementedError: Cannot deserialize json(Null) ...
=================================================================== 5 failed in 0.21s ====================================================================```

まだ実装していないので当然こけますね。リテラル用の JsonDeserializer を実装します。

class ObjectMapper:
    deserializers: List[JsonDeserializer] = [
        LiteralDeserializer()   ## 追加
    ]

class LiteralDeserializer(JsonDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return type(json) in [int, float, str, type(None)]

    def deserialize(self, json: object, mappingClass: type) -> object:
        return json
collected 4 items                                                                                                                                        

tests/test_object_mapper.py ....

=================================================================== 4 passed in 0.03s ====================================================================

リテラルなんだけど、日時・Enum などの型にしたい場合

今度は下記のようなテストを加えます。

from datetime import datetime, date
from enum import Enum

class Member(Enum):
    JOHN = 'john'
    BOB = 'bob'

----(省略)----

    def test_deserializeDate(self):
        actual = ObjectMapper().deserialize('"2020-12-23"', date)
        assert actual == date(2020, 12, 23)

    def test_deserializeNaiveDateTime(self):
        actual = ObjectMapper().deserialize('"2020-12-23T03:00:00"', datetime)
        assert actual == datetime(2020, 12, 23, 3, 0, 0)

    def test_deserializeAwareDateTime(self):
        actual = ObjectMapper().deserialize('"2020-12-23T03:00:00+0900"', datetime)
        assert actual == datetime(2020, 12, 23, 3, 0, 0, tzinfo=timezone(timedelta(hours=+9), 'JST'))

    def test_deserializeEnum(self):
        actual = ObjectMapper().deserialize('"john"', Member)
        assert actual is Member.JOHN

日時には、Naive(タイムゾーンなし), Aware(タイムゾーンあり)があります。
異なる日時同士では計算できなかったりして、紛らわしいことがあるので全てを Aware にする方がいいこともありますが、
今回はどっちでも対応するデータを作成できるようにします。

試しにテスト実行

================================================================ short test summary info =================================================================
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeDate - AssertionError: assert '2020-12-23' == datetime.date(2020, 12, 23)
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeNativeDateTime - AssertionError: assert '2020-12-23T03:00:00' == datetime.datetim...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeAwareDateTime - AssertionError: assert '2020-12-23T03:00:00+0900' == datetime.dat...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeEnum - AssertionError: assert 'john' is <Member.JOHN: 'john'>
============================================================== 4 failed, 4 passed in 0.12s ===============================================================

...ってことで、実装。

class DateDeserializer(JsonDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return mappingClass == date

    def deserialize(self, json: object, mappingClass: type) -> object:
        dt = datetime.strptime(json, '%Y-%m-%d')
        return date(dt.year, dt.month, dt.day)

class DatetimeDeserializer(JsonDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return mappingClass == datetime

    def deserialize(self, json: object, mappingClass: type) -> object:
        try:
            return datetime.strptime(json, '%Y-%m-%dT%H:%M:%S%z') # かなり雑..
        except ValueError:
            return datetime.strptime(json, '%Y-%m-%dT%H:%M:%S')

class EnumDeserializer(JsonDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return issubclass(mappingClass, Enum)

    def deserialize(self, json: object, mappingClass: type) -> object:
        for enum in mappingClass:
            if enum.value == json:
                return enum

class ObjectMapper:
    deserializers: List[JsonDeserializer] = [
        DatetimeDeserializer(), ## 追加
        EnumDeserializer(), ## 追加
        DateDeserializer(), ## 追加
        LiteralDeserializer()
    ]
collected 8 items                                                                                                                                        

tests/test_object_mapper.py ........

=================================================================== 8 passed in 0.07s ====================================================================

コンテナクラスを扱う

さて次からはコンテナクラスを扱います。今回は、下記のコンテナクラスを作ります。

  • List
  • Dict
  • Object

まずは、コンテナ共通クラスを作ります。このようなことをするのは、
コンテナクラスは内部の別構造に対して、再度デシリアライズ依頼をしなければならないからです。
(少しトリッキーですが、内部構造に対して再度 ObjectMapper._deserialize を呼んでいます)

class ContainerDeserializer(JsonDeserializer):
    def deserializeChild(self, json: object, mappingClass: type) -> object:
        return ObjectMapper._deserialize(json, mappingClass)


class ObjectMapper:
    deserializers: List[JsonDeserializer] = [
       ...JsonDeserializer
    ]
    def deserialize(self, jsonText: str, mappingClass: Type[T]) -> T:
        return self._deserialize(json.loads(jsonText), mappingClass)

    @staticmethod
    def _deserialize(jsonData: object, mappingClass: Type[T]) -> T:
        for deserializer in ObjectMapper.deserializers:
            if deserializer.canDeserialize(jsonData, mappingClass):
                return deserializer.deserialize(jsonData, mappingClass)
        raise NotImplementedError(f'Cannot deserialize json({jsonData}) to class({mappingClass}).')

そして、まずは構造を変えたので既存構造が壊れていないことを確認します。

collected 8 items                                                                                                                                        

tests/test_object_mapper.py ........                                                                                                               [100%]

=================================================================== 8 passed in 0.07s ====================================================================

大丈夫なようですので、再度テストを追加します。

@dataclass
class Person:
    name: str
    age: int

@dataclass
class Group:
    name: str
    leader: Person


## ---テスト追加----

    def test_deserializeRawList(self):
        actual = ObjectMapper().deserialize('[{"age": 35, "name": "鈴木"}, {"age": 21, "name": "山田"}]', list)
        assert len(actual) == 2
        assert actual[0].age == 35
        assert actual[0].name == '鈴木'
        assert actual[1].age == 21
        assert actual[1].name == '山田'

    def test_deserializeTypedList(self):
        actual = ObjectMapper().deserialize('[{"age": 35, "name": "鈴木"}, {"age": 21, "name": "山田"}]', List[Person])
        assert len(actual) == 2
        assert type(actual[0]) == Person
        assert actual[0].age == 35
        assert actual[0].name == '鈴木'
        assert type(actual[1]) == Person
        assert actual[1].age == 21
        assert actual[1].name == '山田'

    def test_deserializeRawDict(self):
        actual = ObjectMapper().deserialize('{"ID1":{"age": 35, "name": "鈴木"}, "ID3": {"age": 21, "name": "山田"}}', dict)
        assert len(actual) == 2
        assert actual['ID1'].age == 35
        assert actual['ID1'].name == '鈴木'
        assert actual['ID3'].age == 21
        assert actual['ID3'].name == '山田'

    def test_deserializeTypedDict(self):
        actual = ObjectMapper().deserialize(
            '{"ID1":{"age": 35, "name": "鈴木"}, "ID3": {"age": 21, "name": "山田"}}',
            Dict[str, Person]
        )
        assert len(actual) == 2
        assert type(actual['ID1']) == Person
        assert actual['ID1'].age == 35
        assert actual['ID1'].name == '鈴木'
        assert type(actual['ID3']) == Person
        assert actual['ID3'].age == 21
        assert actual['ID3'].name == '山田'

    def test_deserializeRawObject(self):
        actual = ObjectMapper().deserialize('{"name": "グループ", "leader": {"age": 35, "name": "鈴木"}}', object)
        assert actual.name == 'グループ'
        assert actual.leader.age == 35
        assert actual.leader.name == '鈴木'

    def test_deserializeTypedObject(self):
        actual = ObjectMapper().deserialize('{"name": "グループ", "leader": {"age": 35, "name": "鈴木"}}', Group)
        assert type(actual) == Group
        assert actual.name == 'グループ'
        assert type(actual.leader) == Person
        assert actual.leader.age == 35
        assert actual.leader.name == '鈴木'

試しにテスト実行...当然実装していn(ry

================================================================ short test summary info =================================================================
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeRawList - main.json_deserializer.NotImplementedError: Cannot deserialize json([{'...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeTypedList - main.json_deserializer.NotImplementedError: Cannot deserialize json([...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeRawDict - main.json_deserializer.NotImplementedError: Cannot deserialize json({'I...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeTypedDict - main.json_deserializer.NotImplementedError: Cannot deserialize json({...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeTypedObject - main.json_deserializer.NotImplementedError: Cannot deserialize json...
FAILED tests/test_object_mapper.py::TestObjectMapper::test_deserializeRawObject - main.json_deserializer.NotImplementedError: Cannot deserialize json({...
============================================================== 6 failed, 8 passed in 0.36s ===============================================================

実装。

from inspect import signature, _ParameterKind
from typing import Type, TypeVar, List, Dict, Set, get_origin, get_args

class ListDeserializer(ContainerDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return get_origin(mappingClass) == list or mappingClass == list

    def deserialize(self, json: object, mappingClass: type) -> object:
        genericParams = get_args(mappingClass)
        hasGenericParams = genericParams is not None and len(genericParams) > 0
        param = genericParams[0] if hasGenericParams else object
        return [self.deserializeChild(el, param) for el in json]

class DictDeserializer(ContainerDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return get_origin(mappingClass) == dict or mappingClass == dict

    def deserialize(self, json: object, mappingClass: type) -> object:
        genericParams = get_args(mappingClass)
        hasKeyParam = genericParams is not None and len(genericParams) > 0
        hasValueParam = genericParams is not None and len(genericParams) > 1
        return {
            self.deserializeChild(k, genericParams[0] if hasKeyParam else object)
            : self.deserializeChild(v, genericParams[1] if hasValueParam else object)
            for k, v in json.items()
        }

class ObjectDeserializer(ContainerDeserializer):
    def canDeserialize(self, json: object, mappingClass: type) -> bool:
        return type(json) == dict

    def deserialize(self, json: object, mappingClass: type) -> object:
        if mappingClass == object:
            return self.createRawObject(json)
        else:
            return self.createObject(json, mappingClass)
    
    def createRawObject(self, args: Dict[str, object]) -> object:
        className = ''.join(key.title() for key in args.keys()) + 'Obejct'
        newInstance = type(className, (object,), {})()
        for k, v in args.items():
            setattr(newInstance, k, self.deserializeChild(v, object))
        return newInstance

    def createObject(self, args: Dict[str, object], mappingClass: type) -> object:
        annotations = self.findAnnotations(mappingClass)
        requireArgs = self.findInitRequireArgs(mappingClass)

        result = object.__new__(mappingClass)
        initArgs = {}
        for k, v in args.items():
            val = self.deserializeChild(v, annotations.get(k, object))
            if k in requireArgs:
                initArgs[k] = val
            else:
                setattr(result, k, val)
        for req in requireArgs:
            if req not in initArgs:
                initArgs[req] = None
        result.__init__(**initArgs)
        return result
    
    @staticmethod
    def findAnnotations(mappingClass: type) -> Dict[str, type]:
        if hasattr(mappingClass, '__annotations__'):
            return mappingClass.__annotations__
        for k, v in signature(mappingClass.__init__).parameters.items():
            return {k:v.annotation for k,v in signature(mappingClass.__init__).parameters.items()}

    @staticmethod
    def findInitRequireArgs(mappingClass: type) -> Set[str]:
        return {
            k for k, v in signature(mappingClass.__init__).parameters.items()
            if v.name != 'self' and v.kind == _ParameterKind.POSITIONAL_OR_KEYWORD
        }


class ObjectMapper:
    deserializers: List[JsonDeserializer] = [
        DateDeserializer(),
        DatetimeDeserializer(),
        EnumDeserializer(),
        LiteralDeserializer(),
        ListDeserializer(), ## 追加
        DictDeserializer(), ## 追加
        ObjectDeserializer() ## 追加
    ]

テスト実行

collected 14 items                                                                                                                                       

tests/test_object_mapper.py ..............                                                                                                         [100%]

=================================================================== 14 passed in 0.18s ===================================================================

Object デシリアライザの挙動説明

Objectをマッピングするものになると、急に難易度が高くなりましたね。..なので、軽く説明します。

また、オブジェクトに内包されているデータをデシリアライズするには、
データのクラスを取得しなければいけませんが、それが格納されている可能性があるのは下記の二箇所です。

  • classに型定義がある場合 (下記コードの Group, GroupWithoutDataclass)
  • __init__ の引数で型を定義している場合 (下記コードの GroupInitDef)
@dataclass
class Person:
    name: str
    age: int

# パターン1
@dataclass
class Group:
    name: str
    leader: Person

# パターン2
class GroupWithoutDataclass:
    name: str
    leader: Person

# パターン3
class GroupInitDef:
    def __init__(self, name: str, leader: Person):
        self.name = name
        self.leader = leader

また、python では Person('AA', 23) と実行すると Person クラスの __init__ メソッドが実行されます。
このメソッドでは、可変長引数(*args, **kwargs等)で定義されている変数以外は指定しないとエラーになります。
そのため、def findInitRequireArgs(mappingClass: type) -> Set[str] で定義すべき変数名を取得しています。

そして、今回はJSONデータとマッピングデータが下記のような関係だった場合に、下記の実装にしています。

  • JSONデータ が マッピングに必要なデータを持っていない → None で埋める
  • JSONデータ が マッピングに不必要なデータを持っている → setattr で属性追加している

今回は、このようにしていますが、場合によってはエラーにしたほうがいいケースもあるかもしれません。

念の為テストを追加。(Group は先ほどテストしているので省略)

    def test_deserializeTypedObjectWithoutDataclass(self):
        actual = ObjectMapper().deserialize('{"name": "グループ", "leader": {"age": 35, "name": "鈴木"}}', GroupWithoutDataclass)
        assert type(actual) == GroupWithoutDataclass
        assert actual.name == 'グループ'
        assert type(actual.leader) == Person
        assert actual.leader.age == 35
        assert actual.leader.name == '鈴木'

    def test_deserializeTypedObjectInitDef(self):
        actual = ObjectMapper().deserialize('{"name": "グループ", "leader": {"age": 35, "name": "鈴木"}}', GroupInitDef)
        assert type(actual) == GroupInitDef
        assert actual.name == 'グループ'
        assert type(actual.leader) == Person
        assert actual.leader.age == 35
        assert actual.leader.name == '鈴木'

    def test_deserializeTypedObjectEmptyJson(self):
        actual = ObjectMapper().deserialize('{}', Group)
        assert type(actual) == Group
        assert actual.name == None
        assert actual.leader == None

    def test_deserializeTypedObjectRedundantData(self):
        actual = ObjectMapper().deserialize('{"name": "グループ", "id": 2, "leader": {"age": 35, "name": "鈴木", "role": "課長"}}', Group)
        assert actual.name == 'グループ'
        assert actual.id == 2
        assert type(actual.leader) == Person
        assert actual.leader.age == 35
        assert actual.leader.name == '鈴木'
        assert actual.leader.role == '課長'
collected 18 items                                                                                                                                       

tests/test_object_mapper.py ..................                                                                                                     [100%]

=================================================================== 18 passed in 0.17s ===================================================================

もっと工夫をするなら...

実際はもうちょっと面倒な仕様があったりするので、プロジェクトに合わせて仕様を調整する必要があるかもしれません。

  • __init__メソッド に *args: T, **kwargs: T のような可変長引数
  • デフォルト値への対応
  • オブジェクト初期化時には init 実行しないようにして欲しいんだけど?
  • OrderedDict等の順序があるものにも対応できるのか?

ただ、今回はAPI等で受け取った JSON をクラスにマッピングすることが目的です。
JSONモデルに通常はそんなに複雑な要件は必要ないでしょう。

最後に

今回は、JSON からクラスのインスタンスを生成するオブジェクトマッパーを作成しました。
それほど大変な実装ではないですし、都度都度デシリアライズのロジックを追加していけばどんなものでも対応できるので、
メンテナンスはそれほど大変ではないと思います。

python は型定義がどんどん便利になっていますので、このような仕組みを押さえておくと今後何かいいことがあるかもしれません。

8
8
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?