はじめに
外部からCSVを取り込み、自分のところのシステムで使うファイルに変換をする処理を作ることになりました。
ボリューム的にもそこまででもないので、ちゃちゃっと手続き的なプログラムを書いてしまっても良かったのですが、多少余裕があったため、真面目にレイヤー分けをしてみました。
そのとき、どんな風に作ってみたのかを、サンプルの要件と合わせて書いていきたいと思います。
今回は勉強も含め、Python3.7とLambdaで作ってみます。
サンプルコード
最終的な分離のイメージと合わせてご参照下さい。
ありそうな要件
あるチェーン店が会社で使用しているレジから取引データを吸い上げ、各店舗のレジ毎に発生した取引を他システムからの売上のCSVを取り込み、自システムで使えるデータに変換をします。
そのとき、今回読み込むCSVは、そのままスペース区切りに変換をするわけではなく、いくつかはシステムが持つ値に変換をする必要があります。
レジから吸い上げてきたCSVデータのイメージ
1ファイルに、ある1日の各店舗・各レジ分のデータが集まります。
# 3つ目のレコード種別で読み方が変わる
# 01: 1:店舗コード 2:レジ番号 3:種別コード(01:取引ヘッダ) 4:取引番号 5:YYYYMMDDHHMMSS
# 02: 1:店舗コード 2:レジ番号 3:種別コード(02:取引明細) 4:取引番号 5:商品名 6:単価 7:個数
"1","1","01","0000001", "20200816100000"
"1","1","02","0000001","商品A","1000", "2"
"1","1","02","0000001","商品B","500", "4"
"1","1","02","0000001","商品C","100", "10"
"1","1","01","0000002", "20200816113010"
"1","1","02","0000002","商品D","10000", "1"
"1","1","02","0000002","商品E","2000", "1"
"1","2","01","0000001", "20200816102049"
"1","2","02","0000001","商品A","1000", "3"
"1","2","02","0000001","商品D","10000", "2"
"1","2","02","0000001","商品F","500", "5"
"1","2","02","0000001","商品G","4400", "2"
"2","1","01","0000001", "20200816152009"
"2","1","02","0000001","商品F","500", "1"
"2","1","02","0000001","商品G","4400", "1"
取り込んだ後のデータのイメージ
# 1:店舗コード 2:売上年月(YYYYMM) 3:売上金額
001 202008 500000
002 202008 300000
003 202008 900000
# 1:店舗コード 2:売上年月日(YYYYMMDD) 3:売上金額
001 20200816 51300
002 20200816 4900
# 1:店舗コード 2:取引番号 3:レジ番号 4:売上時刻(YYYYMMDDHHMMSS) 5:売上金額
001 0000001 001 20200816100000 5000
001 0000002 001 20200816113010 12000
001 0000001 901 20200816102049 34300
002 0000001 001 20200816152009 4900
取り込んだ後のデータのモデルのイメージ
- 吸い上げたデータで何を知りたいか
- 各店舗の月間売上金額
- 各店舗の日別の売上金額
- 各店舗の日別の取引明細
CSVから変換するLambdaのざっくりイメージ
こんなLambdaを作ります。
この構成のイヤなところ
ただの変換をするだけだから、handlerメソッドに、ロジックベタ書きでいいやーと普段の自分ならやりがちですが、
そうした場合に、以下の出来事によりロジックの変更を余儀なくされます。
- CSVデータの構造が変わった場合
- スペース区切りのファイルの構造が変わった場合
- そもそもCSVじゃなくなる、スペース区切りじゃなくなる
- 自システムで使う為の値に変換するためのテーブルの持ち方が変わる
- 金額の計算ルールが変わる
- 集計するルールが変わる(深夜までやっている or 24h店舗の場合、営業日の切り替わりのタイミングなど)
入出力のデータ構造と、計算などのルール変更による、2種類の変更要求が考えられそうです。
特に今回のような他システムからデータを貰う時には、相手方からのデータ項目定義が出てくるのが遅かったり、サンプルデータがなかなか貰えなかったりします。
そこで開発が進まなかったり、口頭で聞いていた仕様で作っていたら、「実はアレは古い情報だったので、今は違います」なんて言われることもあって、変更が多くなっていきます。
分離をしていく流れ
- LambdaのHandlerとロジックの分離
- データソースとロジックの分離
- CSV→スペース区切りへの変換ルールの分離
- CSVのマッピングルールの分離
Lambdaの入出力とロジックの分離
個人的にPythonに慣れてないこともありましたが、まずはテストを書くようにしました。
この時に、Lambdaのハンドラーとロジックを分離します。
Lambdaのハンドラーは**リクエスト(event, context)を貰い、最終的にレスポンスを返す(HTTPステータス)**ものです。
それがLambdaのハンドラーの責務であるので、ハンドラー内に変換ロジックが出てこないようにします。
必要なパラメータを抜き出して、ロジッククラスに移譲して結果を受け取ったら、(必要があれば)レスポンスの中に含めます。
def import_handler(event, context):
try:
# eventから必要情報を抜き出す
body_str = event['body']
body = json.loads(body_str)
key = body['key']
bucket = os.environ['BUCKET_NAME']
# CSV取り込み→スペース区切り保存
trans_app = CsvToSpaceSeparateApplication(bucket, key)
trans_app.csv_to_space_separate()
dict_value = {'message': 'uploadしました', }
json_str = json.dumps(dict_value)
return {
'statusCode': 200,
'body': json_str
}
分離してよかったこと
- 一旦Lambdaの入出力を忘れて、ロジッククラスのみをテストすることができる
実際私はPythonに慣れていなかったので、CSV読み込みはどうするか、スペース区切りに変換はどうするか、とテスト書いて試す下地ができたので、まずはここの分離をしておいて良かったです。
この時点での構成
データソースと変換ロジックの分離
読み込んだCSVをそのままスペース区切りに変換をするわけではなく、いくつかの値はシステムが持つ値に変換をしていきたいことがあると思います。
外部システムの店舗コードやレジ番号は、設置した通番になっているけど、
システム内部では、それぞれの桁の数字に意味があったりするという、採番体系の違いなどです。
- システム内の店舗コード:3桁
- システム内のレジ番号:3桁かつ、頭1桁目で常設か催事用(セール時のみの増設レジ)が分かれている、など
その為、変換用のテーブルを持っておくようにしたいのですが、データストアが決まってないので、テスト用に仮のテーブルを使うときもあると思います。
また、特定のデータソースの実装詳細(コネクションを張るなど)を、変換ロジックに書いておくと、いざデータソースが変わった時に、そこに手を入れることになります。
それを回避するべく、また実装の詳細の決定を遅延させるため、ひとまずはCSVから取得した値を渡したら、システムが持つ値を返すことを期待する抽象クラスを用意します。
from abc import ABCMeta, abstractmethod
class CodeRepositoryBase(object, metaclass=ABCMeta):
"""
コードをデータストアから取得する抽象クラス
"""
@abstractmethod
def get_shop_code(self, external_system_shop_code: str) -> str:
"""
店舗コードを取得する
:param external_system_shop_code: 外部システムで採番された店舗コード
:return: 店舗コード
"""
raise NotImplementedError()
@abstractmethod
def get_cash_register_code(self, external_system_shop_code: str, external_system_cash_register_code: str) -> str:
"""
レジ番号を取得する
:param external_system_shop_code: 外部システムで採番された店舗コード
:param external_system_cash_register_code: 外部システムで採番されたレジ番号
:return: レジ番号
"""
raise NotImplementedError()
from source.domain.repository.code_repository_base import CodeRepositoryBase
class InMemoryCodeRepository(CodeRepositoryBase):
"""
インメモリリポジトリ実装
"""
def __init__(self):
# key:外部システムの店舗コード value:店舗コード
self.__shop_code_table = {
'1': '001',
'2': '002',
'3': '003'
}
# key:(外部システム店舗コード, 外部システムレジ番号) value:レジ番号
# レジ番号の頭1桁目が「0」:常設レジ、「9」:催事レジ
self.__cash_register_code_table = {
('1', '1'): '001',
('1', '2'): '901',
('2', '1'): '001',
}
def get_shop_code(self, external_system_shop_code: str) -> str:
"""
店舗コードを取得する
:param external_system_shop_code: 外部システムで採番された店舗コード
:return: 店舗コード
"""
result = self.__shop_code_table.get(external_system_shop_code)
if result is None:
raise ValueError(f'指定したキーに該当する店舗コードは存在しません。キー:{external_system_shop_code}')
return result
def get_cash_register_code(self, external_system_shop_code: str, external_system_cash_register_code:str) -> str:
"""
レジ番号を取得する
:param external_system_shop_code: 外部システムで採番された店舗コード
:param external_system_cash_register_code: 外部システムで採番されたレジ番号
:return: レジ番号
"""
result = self.__cash_register_code_table.get((external_system_shop_code, external_system_cash_register_code))
if result is None:
raise ValueError(f'指定したキーに該当するレジ番号は存在しません。キー:{external_system_cash_register_code}')
return result
from pytest import raises
from tests.In_memory_code_repository import InMemoryCodeRepository
class TestInMemoryCodeRepository:
def test_店舗コード001が返る(self):
result = InMemoryCodeRepository().get_shop_code('1')
assert result == '001'
分離イメージ
分離して良かったこと
- データソースの詳細の遅延ができる
- 実際にデータソースを決めて実装をするときも、ロジック側(app.py)にどんな値を返すのか明確になっている
逆に面倒だと思ったこと
- データソースが決まってから、改めて実装の詳細を考える必要があったので、普段より時間は掛かる
- 最初から使うものが決まっているのであれば、ローカル上でテスト環境を用意して、すぐに実装しても良いかもしれない
この時点での構成
CSVマッピングルールの分離
CSVの持つ値を自システムが持つ値に変換するというところまでは、分離ができました。
では以下のようなCSVデータを元に、スペース区切りの値に変換をしていきます。
# 3つ目のレコード種別で読み方が変わる
# 01: 1:店舗コード 2:レジ番号 3:種別コード(01:取引ヘッダ) 4:取引番号 5:YYYYMMDDHHMMSS
# 02: 1:店舗コード 2:レジ番号 3:種別コード(02:取引明細) 4:取引番号 5:商品名 6:単価 7:個数
"1","1","01","0000001","20200816100000"
"1","1","02","0000001","商品A","1000","2"
"1","1","02","0000001","商品B","500","4"
"1","1","02","0000001","商品C","100","10"
"1","1","01","0000002","20200816113010"
"1","1","02","0000002","商品D","10000","1"
"1","1","02","0000002","商品E","2000","1"
"1","2","01","0000001","20200816102049"
"1","2","02","0000001","商品A","1000","3"
"1","2","02","0000001","商品D","10000","2"
"1","2","02","0000001","商品F","500","5"
"1","2","02","0000001","商品G","4400","2"
"2","1","01","0000001","20200816152009"
"2","1","02","0000001","商品F","500","1"
"2","1","02","0000001","商品G","4400","1"
テーブル定義項目書を元に、マッピングした書きます。
すごく雑に書いてこんな感じです。
# 渡されたCSVを元に、項目定義に合わせてマッピングしたリストを返す
# 呼び出し元のほうで、リストをスペース区切りに直す
@dataclass
class CsvToShopSales:
code_respository: CodeRepositoryBase
def csv_to_sales_by_shop(self, csv_list) -> List[List[str]]:
names_list = list(range(10))
df = pd.read_csv(csv_list, names=names_list, dtype='object').fillna('_')
SHOP_COLUMN = 0
# 店舗コードでグルーピングする
shop_group_list = df.groupby(SHOP_COLUMN)
results = []
for group_rows in shop_group_list:
shop_code = self.code_respository.get_shop_code(group_rows[0])
year_month = [record[4] for record in group_rows[1].values.tolist() if record[2] == '01'][0][:6]
amount_list = [int(record[5]) * int(record[6]) for record in group_rows[1].values.tolist() if record[2] == '02']
sales_amount = sum(amount_list)
results.append([shop_code, year_month, str(sales_amount)])
return results
このコードの嫌なところ
- CSVの項目が変わる、もしくは何行目の何列という位置が変わってしまった場合、直す必要が出てくる
- もちろん、変換後の項目にも変更が入った場合は、直す必要が出てくる
変換前のデータと変換後のデータ、どちら側のデータ構造に変更が起きた場合でも、同じコードを修正する状態はよろしく無いと思います。
1つのクラスに対して、変更する理由が2つ以上作りたくありません。
- CSVを受け取り、CSVの中で欲しいデータだけを抽出したモデルにする(入出力)
- CSVのモデルから、売上のドメインモデルに変換する(ルール)
- スペース区切りデータに変換して保存する(入出力)
これらをapp.pyから分離してみます。
app.pyの中身を分離したイメージ図が、下記となります。
CSVを受け取り、CSVモデルに変換する
from abc import ABCMeta, abstractmethod
from typing import List
from source.domain.models.csv_models.csv_cash_transaction_header import CsvCashTransactionHeader
class CsvCashTransactionRepositoryBase(object, metaclass=ABCMeta):
"""
CSVのレジ取引データを受け取るためのRepository抽象クラス
"""
@abstractmethod
def load(self) -> List[CsvCashTransactionHeader]:
"""
レジ取引データモデルを取得する
:return: レジ取引データモデル
"""
raise NotImplementedError()
@abstractmethod
def save(self, data: CsvCashTransactionHeader) -> None:
"""
レジ取引データモデルを保存する
:param data: レジ取引データモデル
"""
raise NotImplementedError('まだSaveできません')
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List
from source.domain.models.csv_models.csv_cash_transaction_detail import CsvCashTransactionDetail
@dataclass(frozen=True, order=True)
class CsvCashTransactionHeader:
"""
レジ取引データCSVのモデル
"""
# 店舗コード
shop_code: str = field(compare=True)
# レジ番号
cash_register_code: str = field(compare=True)
# 取引番号
transaction_code: str = field(compare=True)
# 取引時刻
transaction_datetime: str = field(compare=True)
# 取引明細
transaction_details: List[CsvCashTransactionDetail]
from dataclasses import dataclass
@dataclass(frozen=True)
class CsvCashTransactionDetail:
"""
レジ取引データ明細CSVのモデル
"""
# 商品名
item_name: str
# 単価
unit_price: int
# 数量
quantity: int
モデルとしてはこんな感じになります。
売上・取引のドメインモデルを作る
取り込んだ後のデータのモデルのイメージで書いたようなドメインモデルを作っていきます。
from dataclasses import dataclass, field
from functools import reduce
from operator import add
from typing import List
from source.domain.models.salses.daily_sales import DailySales
@dataclass(frozen=True)
class ShopMonthlySales:
shop_code: str
year_month: str
daily_sales_list: List[DailySales] = field(default_factory=list, compare=False)
def amount(self) -> int:
return reduce(add, map(lambda data: data.amount(), self.daily_sales_list))
from dataclasses import dataclass, field
from datetime import datetime
from functools import reduce
from operator import add
from typing import List
from source.domain.models.salses.daily_sales_detail import DailySalesDetail
@dataclass(frozen=True)
class DailySales:
sales_date: datetime.date
details: List[DailySalesDetail] = field(default_factory=list, compare=False)
def amount(self) -> int:
return reduce(add, map(lambda data: data.amount, self.details))
import datetime
from dataclasses import dataclass
@dataclass(frozen=True)
class DailySalesDetail:
transaction_code: str
transaction_datetime: datetime.datetime
cash_number: str
amount: int
変換ルールの実装
CSVモデルを受け取り、売上のドメインモデルに変換するルールクラスを作ります。
@dataclass(frozen=True)
class TransferRules(object):
"""
変換ルールクラス
"""
repository: CodeRepositoryBase
def to_shop_sales(self, sources: List[CsvCashTransactionHeader]) -> List[ShopMonthlySales]:
results: List[ShopMonthlySales] = []
sources.sort(key=lambda x: x.shop_code)
# 店舗ごとにグルーピングをし、モデルに変換する
for key, g in groupby(sources, key=lambda x: x.shop_code):
shop_code = self.repository.get_shop_code(key)
details: List[DailySalesDetail] = []
dt = ''
day = ''
year_month = ''
for member in g:
dt = datetime.strptime(member.transaction_datetime, '%Y%m%d%H%M%S')
day = date(dt.year, dt.month, dt.day)
year_month = member.transaction_datetime[:6]
cash_register_code = self.repository.get_cash_register_code(member.shop_code, member.cash_register_code)
amount = sum([s.unit_price * s.quantity for s in member.transaction_details])
detail = DailySalesDetail(member.transaction_code,
dt,
cash_register_code,
amount)
details.append(detail)
daily = DailySales(day, details)
shop_sales = ShopMonthlySales(shop_code, year_month, [daily])
results.append(shop_sales)
return results
ドメインモデル→スペース区切りに保存する
ドメインモデルをスペース区切りにして、データストアに保存するクラスを作ります。
今回はS3に保存するためのクラスを作っていきます。
class S3ShopSalesRepository(ShopSalesRepositoryBase):
"""
インメモリでの店舗売上レポジトリの実装
"""
__bucket_name: str
def __init__(self, bucket_name):
self.__bucket_name = bucket_name
def save(self, sources: List[ShopMonthlySales]) -> None:
self.shop_monthly_sales = []
self.daily_sales = []
self.daily_details = []
for source in sources:
self.shop_monthly_sales.append(
[source.shop_code, source.year_month, str(source.amount())]
)
for daily in source.daily_sales_list:
self.daily_sales.append([
source.shop_code,
daily.sales_date.strftime('%Y%m%d'),
str(daily.amount()),
])
for detail in daily.details:
self.daily_details.append(
[source.shop_code,
detail.transaction_code,
detail.cash_number,
detail.transaction_datetime.strftime('%Y%m%d%H%M%S'),
str(detail.amount)]
)
self.shop_monthly_sales = self.__comma2dlist_to_space2dlist(self.shop_monthly_sales)
self.daily_sales = self.__comma2dlist_to_space2dlist(self.daily_sales)
self.daily_details = self.__comma2dlist_to_space2dlist(self.daily_details)
try:
self.__s3_upload(self.shop_monthly_sales, self.__bucket_name, '店舗売上.txt')
self.__s3_upload(self.daily_details, self.__bucket_name, '店舗日別.txt')
self.__s3_upload(self.daily_details, self.__bucket_name, '店舗日別詳細.txt')
except Exception as error:
raise error
データストアに保存することと、スペース区切りに変換する処理の2つは、このクラスの中にまとめています。
理由としては、今回はスペース区切りに変換をするけど、別のデータストアに変換するときは、別の形式に変換したりする可能性があるかもしれないので、ShopSalesRepositoryBase
の実装クラスの方に任せるようにしています。
handlerとapplication service
def import_handler(event, context):
try:
# eventから必要情報を抜き出す
body_str = event['body']
body = json.loads(body_str)
key = body['key']
bucket_name = os.environ['BUCKET_NAME']
code_repository = InMemoryCodeRepository()
csv_repository = S3CsvCashTransactionRepository(key, bucket_name)
# bucketは既に決まっている想定
shop_sales_repository = S3ShopSalesRepository('xxxxx-bucket')
# CSV取り込み→スペース区切り保存
trans_app = CsvToSpaceSeparateApplication(code_repository, csv_repository, shop_sales_repository)
trans_app.csv_to_space_separate()
# Response組み立て
dict_value = {'message': 'uploadしました', }
json_str = json.dumps(dict_value)
return {
'statusCode': 200,
'body': json_str
}
except ValueError as error:
logger.exception(f'{error}')
dict_value = {'message': f'{error}', }
json_str = json.dumps(dict_value)
return {
'statusCode': 500,
'body': json_str
}
except Exception as error:
logger.exception(f'{error}')
dict_value = {'message': f'処理エラーが発生しました。しばらくしてから再実行して下さい', }
json_str = json.dumps(dict_value)
return {
'statusCode': 500,
'body': json_str
}
@dataclass
class CsvToSpaceSeparateApplication(object):
"""
CSV→スペース区切りに変換する処理
"""
code_repository: CodeRepositoryBase
csv_repository: CsvCashTransactionRepositoryBase
shop_sales_repository: ShopSalesRepositoryBase
def csv_to_space_separate(self) -> None:
"""
CSV→スペース区切り変換
"""
# CSVモデルに変換
csv_models = self.csv_repository.load()
# ドメインモデルに変換
shop_monthly_sales = TransferRules(self.code_repository).to_shop_sales(csv_models)
# スペース区切りに変換して保存をする
self.shop_sales_repository.save(shop_monthly_sales)
最終的な分離のイメージ
handlerから呼ばれるCsvToSpaceSeparateApplicationの中でのイメージは、こんな感じです。
それぞれの「出力→変換→出力」の手続きを、Application層内のメソッドで表現しています。
個々の処理は、クラスに纏めることによって処理の意図も表現しています。
この時点での構成
やった結果
同じ様な構成を現場で実践してみましたが、
- 入出力と変換を分け終えてそろそろリリースするとなってから、他システムからのCSVの構成が変わると案の定連絡が来ました
- CSV側のリポジトリの抽象クラスを継承したものを新規作成し、それをhandler.pyからDIしてあげるだけで終わり
- 実際、ドメインモデルのルールは、合計金額の計算くらいでしたが、途中から計算ルールの変更ができないか、という話も出てきました
- 結果的に来なかったけど、計算判断のロジックだけ分けていたので、追加は難しくないとその時判断できました
まとめ
小さい処理ではありましたが、今回入出力と計算・判断のルールの分離をやってみて、今後大きなシステムを作る上でも役に立ちそうだと感じました。
費用対効果みたいな話も出てきそうですが、時間が少しでもあれば、普段から意識できるよう努めていきたいです。
(もちろん、作り捨てるつもりのコードならその限りではないですが、、、大抵はそうならないことが多いので。。。)
参考にしたもの
-
7つの設計原則とオブジェクト指向プログラミング - ソフトウェア設計を考える
- 関心の4象限のところを特に参考にさせて頂いてます。