LoginSignup
8

More than 1 year has passed since last update.

pythonでCSVレコードをモデルに変換

Last updated at Posted at 2021-12-03

この記事は、 python アドベントカレンダーの4日目の記事です。

概要

pythonでは、CSVのライブラリが標準で用意されています。csv --- CSV ファイルの読み書き

しかし、CSVレコードはstr配列で返ってくるため、
データを使う側では、使用したい列が何列目か?でアクセスすることになります。
これは分かりづらい上、CSVレコードに列追加や列削除などで列数が変更されると、途端にバグとなってしまいます。

また、全てはstrで返ってくるため、そこからint, date, datetimeに変換する必要があると
処理が列ごとに手続き的になってしまい、本質でない処理が多くなってしまいます。

そこで、クラスにCSVの変換定義をすると、レコードからオブジェクトに変換してくれるクラスを作ってみました。

どんな感じで作りたいか?

自分の好きなライブラリに SQLAlchemy があります。これは、クラスで定義を記載すると、
その通りに処理がよしなになってくれます。このような感じを目指してみます。

ということで、こんな感じを目指してみます。

こんな感じを目指す
from model import CSVModel, Column
from column_types import String, Integer, DateTime, Date

class Person(CSVModel):
    name = Column(String)
    age = Column(Integer)
    birth_day = Column(Date)
    created_at = Column(DateTime('%Y/%m/%d %H:%M:%S'))

with open('./person.csv', 'r') as f:
  people = list(Person.create_from(f.read()))
person.csv
"name","age","birth_day","created_at"
"山田太郎","26","1995-02-14","2021/12/13 12:34:56"
"佐藤花子","20","2001-01-12","2021/12/03 11:11:11"

列名をそのままプログラムで利用したくないこともあります。(日本語列名とか)
その場合は、このようにするだけです。

列名がプログラムのプロパティと異なる場合
class Person(CSVModel):
    name = Column(String, '名前')
    age = Column(Integer, '年齢')
    birth_day = Column(Date, '生年月日')
    created_at = Column(DateTime('%Y/%m/%d %H:%M:%S'), '作成日')
列名が日本語
"名前","年齢","生年月日","作成日"
"山田太郎","26","1995-02-14","2021/12/13 12:34:56"
"佐藤花子","20","2001-01-12","2021/12/03 11:11:11"

日付/日時データの形式が変わることもよくあります。
例えば、生年月日は現在 %Y-%m-%d 形式ですが、これを %Y/%m/%d形式にするには、
Date -> Date('%Y/%m/%d') で渡すと日付の形式が変わります。

生年月日の形式を変更したモデル
class Person(CSVModel):
    name = Column(String, '名前')
    age = Column(Integer, '年齢')
    birth_day = Column(Date('%Y/%m/%d'), '生年月日')
    created_at = Column(DateTime('%Y/%m/%d %H:%M:%S'), '作成日')

どうやって実現するか?

CSVの読み込み自体は標準で用意されているものを使います。
このため、CSVレコード:str配列→クラスインスタンスの変換を考えることになります。

そのためには、読み込むときに変換設定を内部に作っておく必要があります。
これを解決するために、メタクラスという仕組みを使います。

class CSVModelMeta(type):
    def __new__(cls, cls_name, cls_base, cls_dict) -> Type[Any]:
        """
        モデル化したいクラスにこのメタクラスを指定すると、クラス読み込み時に、この処理が実行される.
        """
        columns = cls.collect_columns(cls_dict)
        for col, setting in columns.items():
            cls_dict[col] = setting.name
        cls_dict['__csv_columns__'] = columns
        return super().__new__(cls, cls_name, cls_base, cls_dict)

    @classmethod
    def collect_columns(cls, cls_dict: Dict[str, Any]) -> Dict[str, ColumnSetting]:
        """
        クラス変数のうち、列定義されているクラス変数を [モデル列名] -> [列設定(ColumnSetting)] の辞書形式で取得する.

        - クラス変数が '__' で始まってない (特殊用途の可能性あるため)
        - プロパティの中身が Column インスタンス
        """
        columns = {}
        column_props = [prop for prop in cls_dict if not prop.startswith('__') and isinstance(cls_dict[prop], Column)]
        for prop in column_props:
            column = cls_dict[prop]
            name = column.name if hasattr(column, 'name') and column.name else prop
            columns[prop] = ColumnSetting(column.type, name, prop)
        return columns

この仕組みが動くと、該当クラス内に __csv_columns__ というデータが作られます。

>>> Person.__csv_columns__
{'name': ColumnSetting(type=<column_types._StringColumnType object at 0x1014eaeb0>, name='名前', prop_name='name'), 'age': ColumnSetting(type=<column_types._IntegerColumnType object at 0x1014ea640>, name='年齢', prop_name='age'), 'birth_day': ColumnSetting(type=<column_types._DateColumnType object at 0x1014ea550>, name='生年月日', prop_name='birth_day'), 'created_at': ColumnSetting(type=<column_types._DateTimeColumnType object at 0x1014eadc0>, name='作成日', prop_name='created_at')}

このデータをもとに、CSVレコード(str配列)からクラスインスタンスを作成して返却します。

class CSVModel(metaclass=CSVModelMeta):

    @classmethod
    def create_from(cls, csv_file: Union[bytes, str]) -> Iterator['CSVModel']:
        content = io.StringIO(csv_file) if isinstance(csv_file, str) else io.BytesIO(csv_file)
        csv_reader = csv.reader(content)

        header: List[str] = next(csv_reader)
        column_indices: Dict[str, int] = { col_name:index for index, col_name in enumerate(header) }
        for row in csv_reader:
            instance = cls()
            for attr, column in cls.__csv_columns__.items():
                index = column_indices.get(column.name)
                if index is None:
                    continue
                _type = cls.__csv_columns__[attr].type
                instance.__setattr__(attr, _type.deserialize(row[index]))
            yield instance

作ったプログラム

model.py
import io
import csv

from abc import ABC, abstractmethod
from typing import TypeVar, Generic, Any, Type, Union, Iterator, List, Dict, cast
from dataclasses import dataclass

T = TypeVar('T')


class ColumnType(Generic[T], ABC):
    @abstractmethod
    def serialize(self, mapped_value: T):
        pass

    @abstractmethod
    def deserialize(self, csv_value: str) -> T:
        pass

@dataclass
class ColumnSetting(Generic[T]):
    # 変換するデータ型
    type: ColumnType
    # CSV 列名
    name: str
    # モデルのプロパティ名
    prop_name: str


@dataclass
class Column:
    type: ColumnType[Any]
    name: str

    def __init__(self, *args, **kwargs) -> None:
        for arg in args:
            if isinstance(arg, str):
                self.name = arg
            elif isinstance(arg, ColumnType):
                self.type = arg
        # self.type は必須のためチェックしとく
        assert self.type


class CSVModelMeta(type):
    def __new__(cls, cls_name, cls_base, cls_dict) -> Type[Any]:
        """
        モデル化したいクラスにこのメタクラスを指定すると、クラス読み込み時にこの処理が実行される.
        """
        columns = cls.collect_columns(cls_dict)
        for col, setting in columns.items():
            cls_dict[col] = setting.name
        cls_dict['__csv_columns__'] = columns
        return super().__new__(cls, cls_name, cls_base, cls_dict)

    @classmethod
    def collect_columns(cls, cls_dict: Dict[str, Any]) -> Dict[str, ColumnSetting]:
        """
        クラス変数のうち、列定義されているクラス変数を [クラス変数名] -> [マッピング設定(ColumnSetting)] の辞書形式で取得する.

        - クラス変数が '__' で始まってない (特殊用途の可能性あるため)
        - プロパティの中身が Column インスタンス
        """
        columns = {}
        column_props = [prop for prop in cls_dict if not prop.startswith('__') and isinstance(cls_dict[prop], Column)]
        for prop in column_props:
            column = cls_dict[prop]
            name = column.name if hasattr(column, 'name') and column.name else prop
            columns[prop] = ColumnSetting(column.type, name, prop)
        return columns


class CSVModel(metaclass=CSVModelMeta):

    @classmethod
    def create_from(cls, csv_file: Union[bytes, str]) -> Iterator['CSVModel']:
        content = io.StringIO(csv_file) if isinstance(csv_file, str) else io.BytesIO(csv_file)
        csv_reader = csv.reader(content)

        header: List[str] = next(csv_reader)
        column_indices: Dict[str, int] = { col_name:index for index, col_name in enumerate(header) }
        for row in csv_reader:
            instance = cls()
            for attr, column in cls.__csv_columns__.items():
                index = column_indices.get(column.name)
                if index is None:
                    continue
                _type = cls.__csv_columns__[attr].type
                instance.__setattr__(attr, _type.deserialize(row[index]))
            yield instance
column_type.py
from datetime import date, datetime
from model import ColumnType


class _StringColumnType(ColumnType[str]):
    def serialize(self, mapped_value: str):
        return mapped_value

    def deserialize(self, csv_value: str) -> str:
        return csv_value


class _IntegerColumnType(ColumnType[int]):
    def serialize(self, mapped_value: int):
        return str(mapped_value) if mapped_value is not None else ''
    
    def deserialize(self, csv_value: str) -> int:
        return int(csv_value, 10) if csv_value and csv_value.isdigit() else None


class _DateColumnType(ColumnType[date]):
    def __init__(self, format: str) -> None:
        self.format = format

    def __call__(self, format: str) -> '_DateColumnType':
        return _DateColumnType(format)

    def serialize(self, mapped_value: date):
        return date.strftime(mapped_value, self.format) if mapped_value else ''
    
    def deserialize(self, csv_value: str) -> date:
        if not csv_value:
            return None
        dt = datetime.strptime(csv_value, self.format)
        return date(dt.year, dt.month, dt.day)


class _DateTimeColumnType(ColumnType[datetime]):
    def __init__(self, format: str) -> None:
        self.format = format

    def __call__(self, format: str) -> '_DateTimeColumnType':
        return _DateTimeColumnType(format)

    def serialize(self, mapped_value: datetime):
        return datetime.strftime(mapped_value, self.format) if mapped_value else ''
    
    def deserialize(self, csv_value: str) -> datetime:
        if not csv_value:
            return None
        return datetime.strptime(csv_value, self.format)

String = _StringColumnType()
Integer = _IntegerColumnType()
Date = _DateColumnType('%Y-%m-%d')
DateTime = _DateTimeColumnType('%Y-%m-%dT%H:%M:%S')

まとめ

私が関わったある案件は、CSVファイルでデータをインポートする案件だったので、
ファイル数が多く、必要に応じてデータ形式が変更することがよくありました。
このような仕組みは、ある程度コストを減らすことには貢献したのではないかと思います。

余談

CSVファイルへの出力はないのか? ...と思った人がいるかもしれません。
結論から言えば、作ったことは作ったのですが、インタフェースが微妙で今回は省いています。

使う側としてはこんな感じになると思います。

with open('./person.csv', 'r') as f:
  people = list(Person.create_from(f.read()))

# こんな感じで保存すると思う.
# save_as の二番目は出力列の順番定義(どの順番で列定義を出力するか)
CSVCreator.with_targets(people).save_as('./person2.csv', [Person.name, Person.age, Person.birth_day, Person.created_at])

しかし、下記の問題点があると思ってます。

  • 列数が多い場合に、全ての列名を記載しないといけない。
  • CSV列が削除・追加が発生したときに、追随して直さないとバグる。

...とはいっても、モデルに列の出力順番をこれだけのために、定義拡張するのもどうなのよ?
という堂々巡りをして、結果話題に出さないことにしました。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8