この記事は、AEON Advent Calendar 2023の8日目です🎉
はじめまして!
イオンリテールという、AEONグループの総合小売業の企業で働く渡邊と申します。
総合小売業とは衣・食・住・H&BCに各テナントを合わせた広い領域を扱う小売業となります。そこで現在私はネットスーパーの新規事業検証に関する部署におります。
前回の記事はこちらになります。
本記事ではこの続きとして、新たに発生した問題に対応したおはなしとなります。
本記事では特定のECサイトへの大量注文を実装していますが、あくまで検証における過程として実施したものであり一般ECサイトに対する実装を推奨するものではありません。
そのため、詳細な実装方法は記載いたしませんが、現場での実装ストーリーを楽しんでいただくとともに、Python
のRust
製ライブラリであるPolars
について認知を広められればと考えています。
ECサイトにメチャクチャ大量に注文しろ
突然ですがご質問です
例えば、上記ECサイト(あくまで例えばですよ、このサイトのバックエンド側のシステム導入を検討しているとか、そのためにこのサイトのRTEを利用して検証を行っているとか、そういうことかどうかはお答えしかねます!)
このサイトに対して、実際のお客さまの注文として想定される商品の構成・数量で、毎日100件程度の注文をしろと言われたらどうしますか?
次の課題
現在、ネットスーパーの新規システム導入の是非を判断するための検証を進めています。前回記事にあったような、バーコード生成の取り組み等々で検証の仕組み自体は動くようになりました。ですが、店舗で実際の検証オペレーションを回すために、お客さまの仮想注文を用意し、システム上にそれを入力することが次の課題として見えてきました。
要件は上の太字の通り。
- 実際のお客さまの注文を想定した商品構成・数量の注文を用意する
- 毎日100件程度、RTE環境のECサイトに注文する
そして、注文された内容に対して実業務を行い、業務上の問題は発生しないか、作業効率がどの程度向上するのかを試算し、最終的には P/L に落とし投資回収期間を算出することで導入可否を判定します。
結局のところ何が大変なのか
具体的な問題点を記述します
1. 実際のお客さまの注文を想定した商品構成・数量の注文を用意する
一番簡単な方法は過去のネットスーパーにおける実際の注文と同じものを注文することなのですが、今回その方法は実施できませんでした。
実態とかけ離れた適当な注文データを入力して検証を行ったところで、検証の妥当性は確保されません。したがって、蓋然性のある注文を実施する必要があるのですが、既存ネットスーパー業務とバッティングすることで店舗の業務・業績に影響が出てはまずいので、検証店舗はネットスーパーがない店舗から選出しました。
最大の問題は同じ弊社のネットスーパーといえど、店舗出荷という特性上、A店舗とB店舗では品揃えが違うのです。したがって、多店舗のネットスーパー注文を参照したところで商品が異なり売場にはその商品が存在しないという事態が頻発し、ピッキング作業が成立しません。
- オンラインの店舗とはいえ出荷元によって品揃えが大きく異なります
仮に現在実施している店舗でネットスーパーを実施した場合に、想定される注文データを生成することが必要でした。
2. 毎日100件程度、RTE環境のECサイトに注文する
上記過程で仮に注文データができたとしても、次はECサイト上で購買の手続きを実施しなければ、実際のピッキング業務はできません。ネットスーパーの注文は1人のお客さまあたり平均30種類程度。それを100人分毎日入力しなければなりません。期間は約1か月程度と、気合と根性で乗り切れる分量ではありません。
そこに加えて、注文をAPI経由で入力することも禁止されていました。
これら2つの課題を解決するために、それぞれプロトタイプを作成しました
そして今回その中核をなしたのが下の図にあるPython
ライブラリとなります
- 今回大車輪の活躍をする白熊
仮想注文の生成
次の手順で生成することとしました
- 検証実施期間(11月)の全店におけるネットスーパー注文データをデータベースより取得
- 抽出した顧客別の注文データを特徴量毎にクラスタリング
- 各クラスターごとに商品カテゴリの平均数量を算出
- 注文ごとの商品カテゴリの出現数量分布をポアソン分布で近似し、クラスターの出現頻度と合わせて500パターンの注文枠をJSONデータとして生成
- 検証店舗の1か月の販売実績から品揃えデータを作成し、作成したJSONデータに当てはめて具体的な商品の注文パターンにまでおとす
以下の文章にて、ポイントとなる部分のみ説明いたします。
実行環境
今回は一旦私の手元で実行できればよいと思ったことと、Polars
を使いたいと考えたのでPythonで実行しました(Rust
でやれ?御冗談を・・・)
Python --version // 3.11.0
仮想環境 venv
実行環境はターミナルとVScodeのjupyter拡張機能を利用requirements.txtipykernel==6.17.1 ipython==8.7.0 japanize-matplotlib==1.1.3 jupyter==1.0.0 matplotlib==3.6.2 numpy==1.23.5 polars==0.17.14 python-dotenv==1.0.0 scikit-learn==1.1.3 xlsx2csv==0.8.1 XlsxWriter==3.0.3 xlwings==0.28.5
requirements.txtはpip install したもののみを記載
データ取得
これは単純に弊社ネットスーパーのSQL serverにVScodeのmssql拡張機能を利用して接続し、以下のSQLを実行することで取得しました
SELECT
店舗名,
店舗コード,
注文ID,
配送指定日,
グループ名,
グループコード,
部門名,
部門コード,
カテゴリ名,
カテゴリコード,
サブカテゴリ名,
サブカテゴリコード,
正規JANコード,
商品名,
商品単価_税抜,
販売数量,
受注額_税抜,
商品金額合計_税抜
FROM
[ar].[注文明細]
WHERE
配送指定日 BETWEEN '2022-11-01' AND '2022-11-30'
;
補足:商品の分類
弊社は以下のような感じです
分類 | 具体例 |
---|---|
グループ | 農産、水産、メンズ、ホームファッション、・・・ |
部門 | 野菜、生鮮魚、ジャケット、・・・ |
カテゴリ | サラダ野菜、葉物野菜、土もの、・・・ |
サブカテゴリ | レタス、ほうれん草、フルーツトマト、玉ねぎ、・・・ |
部門では粗すぎ、サブカテゴリは細かすぎて確率分布が微妙・・・だったので基本的にはカテゴリで分類を進めたというのを念頭に以下お読みいただけるとわかりやすいかと思います
抽出した顧客別の注文データを特徴量毎にクラスタリング
とまあ、データ抽出したのは良いのですが全店1か月の全データということで30GBありました。
当然Excelでは開けませんし、Pandas
で読み込んでもメモリに乗りません。従来であればDask
をかませるところなのですが、今回はPolars
を使うことで少し大きなデータを扱うこととしました。
Lazyframeとscan_csv
polarsはとにかくこれが便利です。ネットスーパー興味ない人もこれだけは覚えて帰ってください
import polars as pl
ldf = pl.scan_csv('./data.csv') # このタイミングでは概要的なものが読み込まれている
print(ldf.columns) # 実行できる
# print(ldf.shape[0]) # 実行するとエラーが出る
df = (
ldf
.filter(pl.col('test_col') == 'test_val')
.select('test_col_2', 'test_col_3')
.collect() # このタイミングでCSVの必要な部分が読み込まれる
)
ざっくりと書いてみましたがpl.scan_csv()
を実行すると、そのタイミングではCSVの全体が読み込まれるわけではなく、pl.Lazyframe
というデータ型で大枠的なものが読み込まれます。そして、様々な処理を実行した後に.collect()
を実行することで、必要な部分が読み込まれてpl.Dataframe()
の形式に変換されて、具体的な値を参照することができるようになります。
このLazyframeの概念があるおかげで、Daskのようなライブラリに頼らずとも10GBを超えるようなデータも問題なく扱うことができるようになります
これで店規模別に分割しながら以下のコードで商品データをクラスタリングして6パターン位に分けました
import matplotlib.pyplot as plt
import polars as pl
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
class StoreCluster:
"""
データのクラスタリング
"""
info_cols = ['店舗コード', '注文ID', ]
def __init__(self, df: pl.DataFrame, random_state=42) -> None:
self.df = df
self.df_info = df[self.info_cols]
self.df_values = df.drop(columns=self.info_cols)
self.random_state = random_state
def calc_ncluster(self):
"""適切なクラスター数を推定
Returns:
plt.show: エルボー法の結果
"""
pipeline = Pipeline([
('scaler', StandardScaler()),
('kmeans', KMeans(random_state=self.random_state))
])
inertia = []
for k in range(1, 10):
pipeline.set_params(kmeans__n_clusters=k)
pipeline.fit(self.df_values)
inertia.append(pipeline.named_steps['kmeans'].inertia_)
plt.plot(range(1, 10), inertia, marker='o')
plt.title('Elbow Method For Optimal k')
plt.xlabel('Number of clusters')
plt.ylabel('Inertia')
plt.xticks(range(1, 10))
plt.grid(True)
return plt.show()
def labeling(self, n_cluster: int)->pl.DataFrame:
"""実際のクラスタリングとラベリングしたデータの生成
Args:
n_cluster (int): クラスター数
Returns:
pl.DataFrame: ラベリングしたデータ
"""
pipeline = Pipeline([
('scaler', StandardScaler()),
('kmeans', KMeans(random_state=self.random_state))
])
pipeline.set_params(kmeans__n_clusters=n_cluster)
pipeline.fit(self.df_values)
labels = pipeline.predict(self.df_values)
df_labels = pl.DataFrame({'label': labels})
df_values_with_columns = pl.concat([self.df_values, df_labels], how="horizontal")
return df_values_with_columns
ポアソン分布でのカテゴリ出現頻度の近似
- あるお客さまの注文の中に含まれるトマトカテゴリの商品個数の確率分布は、全注文のトマトカテゴリの平均個数がλのとき、平均λのポアソン分布で近似される
と思ったのでそうしました。
ポアソン分布はとても低い確率の事象を繰り返した際に、それが発生する回数の確立分布を表したものです。
我々小売業は商品の販売数量予測をPi-value
という顧客1,000人当たりの販売数量で評価するという習慣があります。カテゴリ単位でみたときのPi-valueは基本的に100以下であり、ポアソン分布の条件が十分適用できる範囲と判断しました。
生成するコード(長い)
import re
import ast
import numpy as np
import polars as pl
from xlsxwriter import Workbook
from src.modules.generate_sku_json import generate_sku_json
class Order:
"""疑似注文を生成するクラス
"""
def __init__(
self,
mean_lf: pl.LazyFrame|None=None,
# std_lf: pl.LazyFrame|None=None,
type: str|None=None
) -> None:
"""
Args:
type (str): 'グループ' | '部門' | 'カテゴリ' | 'サブカテゴリ'
"""
self.type = type
self.error_key = []
if self.type is not None:
self.mean_df = self._select_type(mean_lf, self.type)
# self.std_df = self._select_type(std_lf, self.type)
else:
self.mean_lf = mean_lf
# # self.std_lf = std_lf
def _select_cluster(self, data):
probs = data['counts'] / data['counts'].sum()
return np.random.choice(data['label'], p=probs)
def _select_quantity(self, mean, std):
quantity = np.random.normal(loc=mean, scale=std)
return max(0, np.round(quantity)) # round to the nearest integer and ensure non-negative
def _select_poisson_quantity(self, mean):
return np.random.poisson(lam=mean*1.3) # 1.3倍で丁度良かった 丸めたりなんなりで落ちているのか
def _select_type(self, lf: pl.LazyFrame, type: str) -> pl.DataFrame:
"""
Args:
type (str): 'グループ' | '部門' | 'カテゴリ' | 'サブカテゴリ'
"""
cols = lf.columns
info_cols = ['label', 'counts', '販売数量', '受注額_税抜', 'ケース数量']
df_info = lf.select(info_cols).collect()
lf_values = lf.drop(columns=info_cols)
selecter_cols = [col for col in cols if re.match(f'^{type}', col)]
df_selected = lf_values.select(selecter_cols).collect()
for col in selecter_cols:
df_selected = (df_selected
.with_columns(
pl.col(col).alias(col.replace(f'{type}コード_', ''))
)
.drop(columns=col)
)
df_result = pl.concat([df_info, df_selected], how='horizontal')
return df_result
def set_type(self, type: str):
self.type = type
if self.mean_lf is not None:
self.mean_df = self._select_type(self.mean_lf, self.type)
# self.std_df = self._select_type(self.std_lf, self.type)
def set_data(
self,
mean_lf: pl.LazyFrame,
# std_lf: pl.LazyFrame,
type: str|None=None
):
self.mean_lf = mean_lf
# self.std_lf = std_lf
if type is not None:
self.type = type
if self.type is not None:
self.mean_df = self._select_type(mean_lf, self.type)
# self.std_df = self._select_type(std_lf, self.type)
else:
self.mean_lf = mean_lf
# # self.std_lf = std_lf
def get_sample(self, num_sample: int=100, is_print: bool=False):
"""SKUをあてるためのサンプルオーダーの枠組みを取得
Args:
num_sample (int, optional): 生成オーダー数. Defaults to 100.
is_print (bool, optional): コンソールに出力するか. Defaults to False.
"""
exclude_columns = ['label', 'counts', '販売数量', '受注額_税抜']
mean_values = self.mean_df.drop(columns=exclude_columns)
# std_values = self.std_df.drop(columns=exclude_columns)
sample_orders = []
for _ in range(num_sample):
cluster_label = self._select_cluster(self.mean_df)
order = []
for type in mean_values.columns: # exclude non-type columns
mean_quantity = mean_values[type][int(cluster_label)]
# std_quantity = std_values[type][int(cluster_label)]
# Handle NaN values (if any)
if not (np.isnan(mean_quantity)
# or np.isnan(std_quantity)
):
# quantity = self._select_quantity(mean_quantity, std_quantity) # 正規分布
quantity = self._select_poisson_quantity(mean_quantity) # ポアソン分布
if quantity > 0: # add to order only if quantity > 0
order.append((type, quantity))
sample_orders.append((cluster_label, str(order)))
self.df_sample_orders = pl.DataFrame(
sample_orders,
schema={
'Cluster': pl.Utf8,
'Order (type, Quantity)': pl.Utf8
}
)
if is_print:
print(self.df_sample_orders)
def set_sku_json(self, lf_products: pl.LazyFrame):
"""品揃えデータをJSON形式に
Args:
lf_products (pl.LazyFrame): 品揃えデータのLazyFrame
"""
self.sku_json = generate_sku_json(lf_products, self.type)
def set_sample_orderframe(self, lf_orderframe: pl.LazyFrame):
"""CSVから読み込んだデータに対してSKUをあてたい場合
Args:
lf_orderframe (pl.LazyFrame): .get_order()で取得したself.df_sample_ordersを出力したCSVのLazyFrame
"""
self.df_sample_orders = lf_orderframe.collect()
def fit_sku(self)->pl.DataFrame:
order_length = self.df_sample_orders.shape[0]
cluster_list = []
order_list = []
for i in range(order_length):
frame_row = self.df_sample_orders[i]
order_frame_list = ast.literal_eval(frame_row['Order (type, Quantity)'].to_numpy()[0])
fit_order = []
for order_frame in order_frame_list:
order_type, order_quantity = order_frame
try:
selected_products = self.sku_json[order_type]
product_selected = selected_products[np.random.randint(len(selected_products))]
fit_order.append((str(product_selected), order_quantity))
except KeyError:
self.error_key.append(order_type)
fit_order.append((order_type, order_quantity))
cluster_list.append(frame_row['Cluster'][0])
order_list.append(str(fit_order))
self.fitted_json = {
'Cluster': cluster_list,
'Order (type, Quantity)': order_list
}
self.df_fitted_order = pl.DataFrame(
self.fitted_json,
schema={
'Cluster': pl.Int64,
'Order (type, Quantity)': pl.Utf8
},
)
self.error_key = [key for key in list(set(self.error_key)) if key !='ケース数量']
self.error_key.sort()
def display_error_key(self):
print(self.error_key)
def remove_error(self):
rows = self.df_fitted_order.shape[0]
all_rows_index = list(range(rows))
drop_rows_index = []
for i in range(rows):
eval_data = ast.literal_eval(self.df_fitted_order[i]['Order (type, Quantity)'].to_list()[0])
for data in eval_data:
try:
ast.literal_eval(data[0])['商品コード']
except TypeError:
drop_rows_index.append(i)
break
except ValueError:
continue
left_rows_index = [i for i in all_rows_index if i not in drop_rows_index]
self.df_remove_error = self.df_fitted_order[left_rows_index]
return self.df_remove_error
def export(self, path: str, remove: bool=False, drop_case: bool=True):
schema = {
'JANコード': pl.Utf8,
'SKUID': pl.Utf8,
'品名': pl.Utf8,
'数量': pl.Int64,
'平均売価': pl.Utf8
}
info_schema = {
'番号': pl.Int64,
'合計点数': pl.Int64,
'SKU数': pl.Int64,
'合計金額': pl.Int64,
}
if remove:
export_data = self.df_remove_error
else:
export_data = self.df_fitted_order
row_count = export_data.shape[0]
order = export_data['Order (type, Quantity)']
list_df_order = []
info_df = pl.DataFrame(schema=info_schema)
for i in range(row_count):
data_list = ast.literal_eval(order[i])
df_order = pl.DataFrame(schema=schema)
for data in data_list:
try:
code, skuid, name, price = (
ast.literal_eval(data[0])['商品コード'],
ast.literal_eval(data[0])['SKUID'],
ast.literal_eval(data[0])['商品名'],
ast.literal_eval(data[0])['平均売価'],
)
count = data[1]
product_json = {
'JANコード': code,
'SKUID': skuid,
'品名': name,
'数量': count,
'平均売価': price
}
except TypeError:
continue
# count = data[1]
# product_json = {
# 'JANコード': code,
# '品名': name,
# '数量': count,
# '平均売価': price
# }
except SyntaxError:
continue
# count = data[1]
# product_json = {
# 'JANコード': code,
# '品名': name,
# '数量': count,
# '平均売価': price
# }
except ValueError:
count = data[1]
product_json = {
'JANコード': None,
'SKUID': None,
'品名': 'ケース数量',
'数量': count,
'平均売価': price
}
concat_data = pl.DataFrame(product_json, schema=schema)
df_order = pl.concat([df_order, concat_data], how='vertical')
if drop_case:
df_order = df_order.drop_nulls(subset=['JANコード'])
list_df_order.append(df_order)
## append info_df
df_order = (
df_order
.with_columns(
pl.col('数量').cast(pl.Int64).alias('数量'),
pl.col('平均売価').str.replace(',', '').cast(pl.Int64).alias('平均売価')
)
.with_columns(
(pl.col('数量') * pl.col('平均売価')).alias('販売額')
)
)
info_json = {
'番号': i,
'合計点数': df_order['数量'].sum(),
'SKU数': df_order.shape[0],
'合計金額': df_order['販売額'].sum(),
}
concat_info = pl.DataFrame(info_json, schema=info_schema)
info_df = pl.concat([info_df, concat_info], how='vertical')
with Workbook(path) as wb:
wb.add_worksheet('info')
info_df.write_excel(
workbook=wb,
worksheet='info'
)
for i, df_order in enumerate(list_df_order):
wb.add_worksheet(str(i))
df_order.write_excel(
workbook=wb,
worksheet=str(i),
)
このようなクラスを定義した後、jupyter上にimportして実行することで、注文データの自動生成に成功しました。
イメージとしては、すべてのカテゴリに対して乱数を生成して出現個数の判定を行う処理を行い、その個数をJSONとして保存、それを500回繰り返す感じです。
Polarsのおかげでとても早く終わります。Pandasにはもう戻れませんね・・・
生成した結果
- 実際
- 生成結果
クラスターラベル1, 2, 4は非常に発生確率が低いお客さまのパターンだったので今回は生成されなかったようですが、それ以外のパターンは概ね購入点数が近いのでまあ良しとしましょう。
自動注文
さて、仮想注文を生成したは良いもののこれをECのページに入力しなければなりません。
繰り返しになりますが、APIで投入することは禁止されていましたので、selenium
による自動操作にて実装しました。
selenium
の実装自体は得意だったのですが、RTE環境特有の不安定さが付きまとい、実はこちらが地獄の始まりだったのですが、Qiitaのエディタがゲロ重くなってきたので、私がクビになっていなければこちらの記事は後編に記述します。
一旦、そちらのGitHubを張っておきます。RTEのURLとかbasic認証のパスとか、大切な情報は.env
に突っ込んでプッシュしてないので、そのまま実行しても動きません。
余談
完全に余談ですが、私はJupyter上に関数を記述せず別の.py
ファイルに記述し、それをimport
して利用しています
長いまたは何回も使う処理は別ファイルにして、jupyter上では簡単な文のみ書いています
これって一般的なんでしょうか?ご意見ください!