こちらのソリューションアクセラレータをウォークスルーします。
Unity Catalogに対応するために一部修正を加えています。
外部ロケーションの作成
こちらのソリューションアクセラレータでは、テーブルを外部ロケーションに保存しています。外部ロケーションがない場合には事前に設定してください。
_resources/00-setup
カタログ名を指定しています。
import os
import re
import mlflow
db_prefix = "supply_chain_optimization"
catalogName = "takaakiyayoi_catalog"
外部ロケーションのパスを変更し、カタログを指定しています。また、Unity Catalog配下にストレージパスを指定したテーブルを作成する際にはMANAGED LOCATION
句が必要となります。
# Get dbName and cloud_storage_path, reset and create database
current_user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
if current_user.rfind('@') > 0:
current_user_no_at = current_user[:current_user.rfind('@')]
else:
current_user_no_at = current_user
current_user_no_at = re.sub(r'\W+', '_', current_user_no_at)
dbName = db_prefix+"_"+current_user_no_at
#cloud_storage_path = f"/Users/{current_user}/field_demos/{db_prefix}"
cloud_storage_path = f"s3://taka-external-location-bucket/external-tables/{db_prefix}"
reset_all = dbutils.widgets.get("reset_all_data") == "true"
spark.sql(f"""USE CATALOG {catalogName}""")
if reset_all:
spark.sql(f"DROP DATABASE IF EXISTS {dbName} CASCADE")
dbutils.fs.rm(cloud_storage_path, True)
spark.sql(f"""create database if not exists {dbName} MANAGED LOCATION '{cloud_storage_path}' """)
spark.sql(f"""USE {dbName}""")
_resources/01-data-generator
こちらは、00-setup
から呼び出されますが、こちらでもカタログの指定が必要なので、ノートブックの冒頭でこちらを実行します。
# Unity Catalog対応
catalogName = "takaakiyayoi_catalog"
spark.sql(f"""USE CATALOG {catalogName}""")
イントロダクション
ノートブック01_Introduction_And_Setup
を実行します。
シチュエーション:
モデリングする対象は電源ツールの製造業者のものです。この製造業者は3つのプラントを有し、5つの配送センターに30の商品SKUを配送しています。それぞれの配送センターには50から60のハードウェア店舗に割り当てられています。これらのパラメータの全ては、コードのパターンをスケールできるように取り扱われます(後ほどみていきます)。それぞれの店舗では、それぞれの商品に対する需要の時系列データがあります。
提供物:
- それぞれのハードウェア店舗におけるそれぞれの商品に対する需要の時系列データ
- ハードウェア店舗に対してそれぞれの配送センターを個別に割り当てるマッピングテーブル。あるハードウェア店舗が別の配送センターから商品を取得することはあり得ますが、ここでは簡素化しています。
- それぞれの製造プラントからそれぞれの配送センターに商品を配送する際のコストを割り当てるテーブル
- それぞれの配送センターにそれぞれのプラントから製造、配送できる商品の最大量を格納するテーブル
2ステップで進めます:
-
需要予測: まず、一週間先の需要予測を行います。集計処理によって、それぞれの配送センターの需要を導出します:
- それぞれの店舗における個々の商品に対する需要予測で、一週間先の予測を生成します
- 配送センターに対しては、それぞれの商品に対する翌週の需要を導出します
- 配送コストの最小化: 製造およびプラントから配送センターへの配送に関するコストと制約のテーブルから、コストが最適化された輸送を導き出します。
%run ./_resources/00-setup $reset_all_data=true
高精細な需要予測
ノートブック02_Fine_Grained_Demand_Forecasting
を実行します。
前提条件: このノートブックを実行する前に 01_Introduction_And_Setup を実行してください。
このノートブックでは、それぞれの店舗と商品に対する翌週の需要を推定するために1週間先の予測を実行します。そして、それぞれの商品に対する配送センターレベルで集計を行います。
このノートブックにおけるハイライト:
- 適切な時系列モデルを特定するために、Databricksのコラボレーティブかつインタラクティブなノートブック環境の活用
- シングルノードのデータサイエンスコードを用いて複数ノードに分散させるためにPandas UDF(ユーザー定義関数)の活用
%run ./_resources/00-setup $reset_all_data=false
print(cloud_storage_path)
print(dbName)
s3://taka-external-location-bucket/external-tables/supply_chain_optimization
supply_chain_optimization_takaaki_yayoi
import os
import datetime as dt
import numpy as np
import pandas as pd
from statsmodels.tsa.api import ExponentialSmoothing
import pyspark.sql.functions as f
from pyspark.sql.types import *
demand_df = spark.read.table(f"{dbName}.part_level_demand")
demand_df = demand_df.cache() # このサンプルノートブックのためのもの
display(demand_df)
Holt’s Wintersの季節性手法を通じたワンステップ先の予測
Holt-Wintersの手法は、三重指数平滑法をベースとしており、トレンドと季節性の両方を考慮することができます。
Exponential Smoothingの概要とアルゴリズム及び実装例について | Deus Ex Machina
def one_step_ahead_forecast(pdf: pd.DataFrame) -> pd.DataFrame:
# 予測時系列の準備
series_df = pd.Series(pdf['demand'].values, index=pdf['date'])
series_df = series_df.asfreq(freq='W-MON')
# ワンステップ先の予測
fit1 = ExponentialSmoothing(
series_df,
trend="add",
seasonal="add",
use_boxcox=False,
initialization_method="estimated",
).fit(method="ls")
fcast1 = fit1.forecast(1).rename("Additive trend and additive seasonal")
# 結果の取得
df = pd.DataFrame(data =
{
'product': pdf['product'].iloc[0],
'store': pdf['store'].iloc[0],
'date' : pd.to_datetime(series_df.index.values[-1]) + dt.timedelta(days=7),
'demand' : int(abs(fcast1.iloc[-1]))
},
index = [0]
)
return df
fc_schema = StructType(
[
StructField('product', StringType()),
StructField('store', StringType()),
StructField('date', DateType()),
StructField('demand', FloatType())
]
)
spark.conf.set("spark.databricks.optimizer.adaptive.enabled", "false")
n_tasks = demand_df.select("product", "store").distinct().count()
forecast_df = (
demand_df
.repartition(n_tasks, "product", "store")
.groupBy("product", "store")
.applyInPandas(one_step_ahead_forecast, schema=fc_schema)
)
display(forecast_df)
一週間先の需要が予測できました(こちらを実行しているのは6/10です)。
配送センターレベルでの予測の集計
distribution_center_to_store_mapping_table = spark.read.table(f"{dbName}.distribution_center_to_store_mapping_table")
店舗と配送センターのマッピングを確認します。
display(distribution_center_to_store_mapping_table)
予測結果とマッピングテーブルを結合します。
distribution_center_demand = (
forecast_df.
join(distribution_center_to_store_mapping_table, [ "store" ] , "left").
groupBy("distribution_center", "product").
agg(f.sum("demand").alias("demand"))
)
display(distribution_center_demand)
Deltaとして保存
distribution_center_demand_df_delta_path = os.path.join(cloud_storage_path, 'distribution_center_demand_df_delta')
# データの書き込み
distribution_center_demand.write \
.mode("overwrite") \
.format("delta") \
.save(distribution_center_demand_df_delta_path)
spark.sql(f"DROP TABLE IF EXISTS {dbName}.distribution_center_demand")
spark.sql(f"CREATE TABLE {dbName}.distribution_center_demand USING DELTA LOCATION '{distribution_center_demand_df_delta_path}'")
display(spark.sql(f"SELECT * FROM {dbName}.distribution_center_demand"))
輸送の最適化
ノートブック03_Optimize_Transportation
を実行していきます。
前提条件: このノートブックを実行する前に 02_Fine_Grained_Demand_Forecasting を実行してください。
このノートブックでは、プラントから配送センターに商品を配送する際の輸送コストを最適化するためにLPを解きます。さらに、数十万の商品にどのようにスケールするのかを説明します。
このノートブックのハイライト:
- 最適経路を特定するためにDatabricksのコラボレーティブかつインタラクティブなノートブック環境の活用
- シングルノードのデータサイエンスコードを用いて複数ノードに分散させるためにPandas UDF(ユーザー定義関数)の活用
より正確には、それぞれの商品に対する以下の最適化問題を解くことになります。
数学的なゴール:
一連の配送センターに商品を配送する製造プラントを有しています。ここでのゴールは全体的な輸送コスト、すなわち以下の値を最小化することになります:
cost_of_plant_1_to_distribution_center_1 * quantity_shipped_of_plant_1_to_distribution_center_1
+ … +
cost_of_plant_1_to_distribution_center_n * quantity_shipped_of_plant_n_to_distribution_center_m
数学的な制約:
- 配送数はゼロあるいは正の値であること
- ある製造プラントから出荷される商品の合計数は、最大供給量を上回ってはならない
- それぞれの配送センターに配送される商品の合計は、少なくとも予測された需要を満たさなくてはならない
# LP問題を解くためにpulpライブラリを使います
# このノートブックの開発にはこちらのドキュメントを使いました
# https://coin-or.github.io/pulp/CaseStudies/a_transportation_problem.html
%pip install pulp
%run ./_resources/00-setup $reset_all_data=false
import os
import datetime as dt
import re
import numpy as np
import pandas as pd
import pulp
import pyspark.sql.functions as f
from pyspark.sql.types import *
LPの定義および解決
# それぞれの配送センターの需要、1行が1商品
distribution_center_demand = spark.read.table(f"{dbName}.distribution_center_demand")
distribution_center_demand = distribution_center_demand.groupBy("Product").pivot("distribution_center").agg(f.first("demand").alias("demand"))
for name in distribution_center_demand.schema.names:
distribution_center_demand = distribution_center_demand.withColumnRenamed(name, name.replace("Distribution_Center", "Demand_Distribution_Center"))
distribution_center_demand = distribution_center_demand.withColumnRenamed("Product", "Product".replace("Product", "product")).sort("product")
display(distribution_center_demand)
# プラントの供給、1行が1商品
plant_supply = spark.read.table(f"{dbName}.supply_table")
for name in plant_supply.schema.names:
plant_supply = plant_supply.withColumnRenamed(name, name.replace("plant", "Supply_Plant"))
plant_supply = plant_supply.sort("product")
display(plant_supply)
# 輸送コストテーブル、1行が商品、プラントごと
transport_cost_table = spark.read.table(f"{dbName}.transport_cost_table")
for name in transport_cost_table.schema.names:
transport_cost_table = transport_cost_table.withColumnRenamed(name, name.replace("Distribution_Center", "Cost_Distribution_Center"))
display(transport_cost_table)
# 処理を行うすべての情報を持つテーブルの作成。このテーブルの1行で、製品とプラントに対応し列方向で表現
# - プラント(行方向)から配送センター(列方向)への輸送コスト
# - それぞれのプラント(列方向)からのそれぞれの商品(行方向)を供給する際の制約
# - それぞれの配送センター(列方向)の需要を満たすそれぞれの商品(行方向)の需要の制約
lp_table_all_info = (transport_cost_table.
join(plant_supply, ["product"], how="inner").
join(distribution_center_demand, ["product"], how="inner")
)
display(lp_table_all_info)
# 最終結果テーブルの出力スキーマの定義
res_schema = StructType(
[
StructField('product', StringType()),
StructField('plant', StringType()),
StructField('distribution_center', StringType()),
StructField('qty_shipped', IntegerType())
]
)
# ある商品に対するLPを解く関数の定義
def transport_optimization(pdf: pd.DataFrame) -> pd.DataFrame:
# プラントのリスト、これはプラントに関係するその他のデータ構造の順番を定義します
plants_lst = sorted(pdf["plant"].unique().tolist())
# 配送センターのリスト、これは配送センターに関係するその他のデータ構造の順番を定義します
p = re.compile('^Cost_(Distribution_Center_\d)$')
distribution_centers_lst = sorted([ s[5:] for s in list(pdf.columns.values) if p.search(s) ])
# 取りうるすべての経路を定義
routes = [(p, d) for p in plants_lst for d in distribution_centers_lst]
# LP変数を格納するディクショナリーの作成。ディクショナリーへの参照キーはプラント名であり、配送センター名です。そしてデータはRoute_Tupleとなります。
# (例えば、["plant_1"]["distribution_center_1"]: Route_plant_1_distribution_center_1)。下限を0にし、上限をNone、integerとして定義します。
vars = pulp.LpVariable.dicts("Route", (plants_lst, distribution_centers_lst), 0, None, pulp.LpInteger)
# 他の検索テーブルのサブセットを作成
ss_prod = pdf[ "product" ][0]
# コスト、配送センターとプラントの順序が問題となります
transport_cost_table_pdf = pdf.filter(regex="^Cost_Distribution_Center_\d+$|^plant$")
transport_cost_table_pdf = (transport_cost_table_pdf.
rename(columns=lambda x: re.sub("^Cost_Distribution_Center","Distribution_Center",x)).
set_index("plant").
reindex(plants_lst, axis=0).
reindex(distribution_centers_lst, axis=1)
)
costs = pulp.makeDict([plants_lst, distribution_centers_lst], transport_cost_table_pdf.values.tolist(), 0)
# 供給、プラントの順序が問題となります
plant_supply_pdf = (pdf.
filter(regex="^Supply_Plant_\d+$").
drop_duplicates().
rename(columns=lambda x: re.sub("^Supply_Plant","plant",x)).
reindex(plants_lst, axis=1)
)
supply = plant_supply_pdf.to_dict("records")[0]
# 需要、配送センターの順序が問題となります
distribution_center_demand_pdf = (pdf.
filter(regex="^Demand_Distribution_Center_\d+$").
drop_duplicates().
rename(columns=lambda x: re.sub("^Demand_Distribution_Center","Distribution_Center",x)).
reindex(distribution_centers_lst, axis=1)
)
demand = distribution_center_demand_pdf.to_dict("records")[0]
# 問題データを含む'prob'変数の作成
prob = pulp.LpProblem("Product_Distribution_Problem", pulp.LpMinimize)
# まず、'prob'に対する目的関数を追加します
prob += (
pulp.lpSum([vars[p][d] * costs[p][d] for (p, d) in routes]),
"Sum_of_Transporting_Costs",
)
# 供給の制約を追加します
for p in plants_lst:
prob += (
pulp.lpSum([vars[p][d] for d in distribution_centers_lst]) <= supply[p],
f"Sum_of_Products_out_of_Plant_{p}",
)
# 需要の制約を追加します
for d in distribution_centers_lst:
prob += (
pulp.lpSum([vars[p][d] for p in plants_lst]) >= demand[d],
f"Sum_of_Products_into_Distibution_Center{d}",
)
# PuLPのソルバー選択を用いて問題が解かれます
prob.solve()
# 商品ごとの出力を書き出します
if (pulp.LpStatus[prob.status] == "Optimal"):
name_lst = [ ]
value_lst = [ ]
for v in prob.variables():
name_lst.append(v.name)
value_lst.append(v.varValue)
res = pd.DataFrame(data={'name': name_lst, 'qty_shipped': value_lst})
res[ "qty_shipped" ] = res[ "qty_shipped" ].astype("int")
res[ "plant" ] = res[ "name" ].str.extract(r'(plant_\d+)')
res[ "distribution_center" ] = res[ "name" ].str.extract(r'(Distribution_Center_\d+)')
res[ "product" ] = ss_prod
res = res.drop("name", axis = 1)
res = res[[ "product", "plant", "distribution_center", "qty_shipped"]]
else:
res = pd.DataFrame(data= { "product" : [ ss_prod ] , "plant" : [ None ], "distribution_center" : [ None ], "qty_shipped" : [ None ]})
return res
# 関数のテスト
product_selection = "nail_1"
pdf = lp_table_all_info.filter(f.col("product")==product_selection).toPandas()
transport_optimization(pdf)
Spark UDFを用いてLPを解きます。
spark.conf.set("spark.databricks.optimizer.adaptive.enabled", "false")
n_tasks = lp_table_all_info.select("product").distinct().count()
optimal_transport_df = (
lp_table_all_info
.repartition(n_tasks, "product")
.groupBy("product")
.applyInPandas(transport_optimization, schema=res_schema)
)
Deltaへの保存
shipment_recommendations_df_delta_path = os.path.join(cloud_storage_path, 'shipment_recommendations_df_delta')
# データの書き出し
optimal_transport_df.write \
.mode("overwrite") \
.format("delta") \
.save(shipment_recommendations_df_delta_path)
spark.sql(f"DROP TABLE IF EXISTS {dbName}.shipment_recommendations")
spark.sql(f"CREATE TABLE {dbName}.shipment_recommendations USING DELTA LOCATION '{shipment_recommendations_df_delta_path}'")
display(spark.sql(f"SELECT * FROM {dbName}.shipment_recommendations"))
それぞれのプラントでそれぞれの商品をいくつ配送センターに輸送すべきかの推奨値が表示されます。
ちなみに、今回作成されたテーブル間のリネージ(依存関係)はこのようになっています。