2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

こちらのソリューションアクセラレータをウォークスルーします。

Unity Catalogに対応するために一部修正を加えています。

外部ロケーションの作成

こちらのソリューションアクセラレータでは、テーブルを外部ロケーションに保存しています。外部ロケーションがない場合には事前に設定してください。

_resources/00-setup

カタログ名を指定しています。

import os
import re 
import mlflow
db_prefix = "supply_chain_optimization"
catalogName = "takaakiyayoi_catalog"

外部ロケーションのパスを変更し、カタログを指定しています。また、Unity Catalog配下にストレージパスを指定したテーブルを作成する際にはMANAGED LOCATION句が必要となります。

# Get dbName and cloud_storage_path, reset and create database
current_user = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
if current_user.rfind('@') > 0:
  current_user_no_at = current_user[:current_user.rfind('@')]
else:
  current_user_no_at = current_user
current_user_no_at = re.sub(r'\W+', '_', current_user_no_at)

dbName = db_prefix+"_"+current_user_no_at
#cloud_storage_path = f"/Users/{current_user}/field_demos/{db_prefix}"
cloud_storage_path = f"s3://taka-external-location-bucket/external-tables/{db_prefix}"

reset_all = dbutils.widgets.get("reset_all_data") == "true"

spark.sql(f"""USE CATALOG {catalogName}""")

if reset_all:
  spark.sql(f"DROP DATABASE IF EXISTS {dbName} CASCADE")
  dbutils.fs.rm(cloud_storage_path, True)

spark.sql(f"""create database if not exists {dbName} MANAGED LOCATION '{cloud_storage_path}' """)
spark.sql(f"""USE {dbName}""")

_resources/01-data-generator

こちらは、00-setupから呼び出されますが、こちらでもカタログの指定が必要なので、ノートブックの冒頭でこちらを実行します。

# Unity Catalog対応
catalogName = "takaakiyayoi_catalog"
spark.sql(f"""USE CATALOG {catalogName}""")

イントロダクション

ノートブック01_Introduction_And_Setupを実行します。

シチュエーション:
モデリングする対象は電源ツールの製造業者のものです。この製造業者は3つのプラントを有し、5つの配送センターに30の商品SKUを配送しています。それぞれの配送センターには50から60のハードウェア店舗に割り当てられています。これらのパラメータの全ては、コードのパターンをスケールできるように取り扱われます(後ほどみていきます)。それぞれの店舗では、それぞれの商品に対する需要の時系列データがあります。

提供物:

  • それぞれのハードウェア店舗におけるそれぞれの商品に対する需要の時系列データ
  • ハードウェア店舗に対してそれぞれの配送センターを個別に割り当てるマッピングテーブル。あるハードウェア店舗が別の配送センターから商品を取得することはあり得ますが、ここでは簡素化しています。
  • それぞれの製造プラントからそれぞれの配送センターに商品を配送する際のコストを割り当てるテーブル
  • それぞれの配送センターにそれぞれのプラントから製造、配送できる商品の最大量を格納するテーブル

2ステップで進めます:

  • 需要予測: まず、一週間先の需要予測を行います。集計処理によって、それぞれの配送センターの需要を導出します:
    • それぞれの店舗における個々の商品に対する需要予測で、一週間先の予測を生成します
    • 配送センターに対しては、それぞれの商品に対する翌週の需要を導出します
  • 配送コストの最小化: 製造およびプラントから配送センターへの配送に関するコストと制約のテーブルから、コストが最適化された輸送を導き出します。

%run ./_resources/00-setup $reset_all_data=true

Screenshot 2024-06-10 at 13.54.06.png

上のコマンドが終了するといくつかのテーブルが作成されます。
Screenshot 2024-06-10 at 13.55.00.png

高精細な需要予測

ノートブック02_Fine_Grained_Demand_Forecastingを実行します。

前提条件: このノートブックを実行する前に 01_Introduction_And_Setup を実行してください。

このノートブックでは、それぞれの店舗と商品に対する翌週の需要を推定するために1週間先の予測を実行します。そして、それぞれの商品に対する配送センターレベルで集計を行います。

このノートブックにおけるハイライト:

  • 適切な時系列モデルを特定するために、Databricksのコラボレーティブかつインタラクティブなノートブック環境の活用
  • シングルノードのデータサイエンスコードを用いて複数ノードに分散させるためにPandas UDF(ユーザー定義関数)の活用
%run ./_resources/00-setup $reset_all_data=false
print(cloud_storage_path)
print(dbName)
s3://taka-external-location-bucket/external-tables/supply_chain_optimization
supply_chain_optimization_takaaki_yayoi
import os
import datetime as dt
import numpy as np
import pandas as pd

from statsmodels.tsa.api import ExponentialSmoothing

import pyspark.sql.functions as f
from pyspark.sql.types import *
demand_df = spark.read.table(f"{dbName}.part_level_demand")
demand_df = demand_df.cache() # このサンプルノートブックのためのもの
display(demand_df)

日次での店舗ごと、商品ごとの需要データが確認できます。
Screenshot 2024-06-10 at 13.57.06.png

Holt’s Wintersの季節性手法を通じたワンステップ先の予測

Holt-Wintersの手法は、三重指数平滑法をベースとしており、トレンドと季節性の両方を考慮することができます。

Exponential Smoothingの概要とアルゴリズム及び実装例について | Deus Ex Machina

def one_step_ahead_forecast(pdf: pd.DataFrame) -> pd.DataFrame:

    # 予測時系列の準備
    series_df = pd.Series(pdf['demand'].values, index=pdf['date'])
    series_df = series_df.asfreq(freq='W-MON')

    # ワンステップ先の予測
    fit1 = ExponentialSmoothing(
        series_df,
        trend="add",
        seasonal="add",
        use_boxcox=False,
        initialization_method="estimated",
    ).fit(method="ls")
    fcast1 = fit1.forecast(1).rename("Additive trend and additive seasonal")

    # 結果の取得
    df = pd.DataFrame(data = 
                      {
                         'product': pdf['product'].iloc[0], 
                         'store': pdf['store'].iloc[0], 
                         'date' : pd.to_datetime(series_df.index.values[-1]) + dt.timedelta(days=7), 
                         'demand' : int(abs(fcast1.iloc[-1]))
                      }, 
                          index = [0]
                     )
    return df
fc_schema = StructType(
  [
    StructField('product', StringType()),
    StructField('store', StringType()),
    StructField('date', DateType()),
    StructField('demand', FloatType())
  ]
)
spark.conf.set("spark.databricks.optimizer.adaptive.enabled", "false")
n_tasks = demand_df.select("product", "store").distinct().count()

forecast_df = (
  demand_df
  .repartition(n_tasks, "product", "store")
  .groupBy("product", "store")
  .applyInPandas(one_step_ahead_forecast, schema=fc_schema)
)

display(forecast_df)

一週間先の需要が予測できました(こちらを実行しているのは6/10です)。
Screenshot 2024-06-10 at 13.58.44.png

配送センターレベルでの予測の集計

distribution_center_to_store_mapping_table = spark.read.table(f"{dbName}.distribution_center_to_store_mapping_table")

店舗と配送センターのマッピングを確認します。

display(distribution_center_to_store_mapping_table)

Screenshot 2024-06-10 at 13.59.54.png

予測結果とマッピングテーブルを結合します。

distribution_center_demand = (
  forecast_df.
  join(distribution_center_to_store_mapping_table, [ "store" ] , "left").
  groupBy("distribution_center", "product").
  agg(f.sum("demand").alias("demand"))
)     
display(distribution_center_demand)

Screenshot 2024-06-10 at 14.01.00.png

Deltaとして保存

distribution_center_demand_df_delta_path = os.path.join(cloud_storage_path, 'distribution_center_demand_df_delta')
# データの書き込み 
distribution_center_demand.write \
.mode("overwrite") \
.format("delta") \
.save(distribution_center_demand_df_delta_path)
spark.sql(f"DROP TABLE IF EXISTS {dbName}.distribution_center_demand")
spark.sql(f"CREATE TABLE {dbName}.distribution_center_demand USING DELTA LOCATION '{distribution_center_demand_df_delta_path}'")
display(spark.sql(f"SELECT * FROM {dbName}.distribution_center_demand"))

Screenshot 2024-06-10 at 14.02.01.png

輸送の最適化

ノートブック03_Optimize_Transportationを実行していきます。

前提条件: このノートブックを実行する前に 02_Fine_Grained_Demand_Forecasting を実行してください。

このノートブックでは、プラントから配送センターに商品を配送する際の輸送コストを最適化するためにLPを解きます。さらに、数十万の商品にどのようにスケールするのかを説明します。

線形計画法と Optimizer | Anapedia

このノートブックのハイライト:

  • 最適経路を特定するためにDatabricksのコラボレーティブかつインタラクティブなノートブック環境の活用
  • シングルノードのデータサイエンスコードを用いて複数ノードに分散させるためにPandas UDF(ユーザー定義関数)の活用

より正確には、それぞれの商品に対する以下の最適化問題を解くことになります。

数学的なゴール:

一連の配送センターに商品を配送する製造プラントを有しています。ここでのゴールは全体的な輸送コスト、すなわち以下の値を最小化することになります:

cost_of_plant_1_to_distribution_center_1 * quantity_shipped_of_plant_1_to_distribution_center_1
+ … +
cost_of_plant_1_to_distribution_center_n * quantity_shipped_of_plant_n_to_distribution_center_m

数学的な制約:

  • 配送数はゼロあるいは正の値であること
  • ある製造プラントから出荷される商品の合計数は、最大供給量を上回ってはならない
  • それぞれの配送センターに配送される商品の合計は、少なくとも予測された需要を満たさなくてはならない
# LP問題を解くためにpulpライブラリを使います
# このノートブックの開発にはこちらのドキュメントを使いました
# https://coin-or.github.io/pulp/CaseStudies/a_transportation_problem.html
%pip install pulp
%run ./_resources/00-setup $reset_all_data=false
import os
import datetime as dt
import re
import numpy as np
import pandas as pd

import pulp

import pyspark.sql.functions as f
from pyspark.sql.types import *

LPの定義および解決

# それぞれの配送センターの需要、1行が1商品
distribution_center_demand = spark.read.table(f"{dbName}.distribution_center_demand")
distribution_center_demand = distribution_center_demand.groupBy("Product").pivot("distribution_center").agg(f.first("demand").alias("demand"))
for name in distribution_center_demand.schema.names:
  distribution_center_demand = distribution_center_demand.withColumnRenamed(name, name.replace("Distribution_Center", "Demand_Distribution_Center"))
distribution_center_demand = distribution_center_demand.withColumnRenamed("Product", "Product".replace("Product", "product")).sort("product")
display(distribution_center_demand)

Screenshot 2024-06-10 at 14.04.31.png

# プラントの供給、1行が1商品
plant_supply = spark.read.table(f"{dbName}.supply_table")
for name in plant_supply.schema.names:
  plant_supply = plant_supply.withColumnRenamed(name, name.replace("plant", "Supply_Plant"))
plant_supply = plant_supply.sort("product")
display(plant_supply)

Screenshot 2024-06-10 at 14.04.57.png

# 輸送コストテーブル、1行が商品、プラントごと
transport_cost_table = spark.read.table(f"{dbName}.transport_cost_table")
for name in transport_cost_table.schema.names:
  transport_cost_table = transport_cost_table.withColumnRenamed(name, name.replace("Distribution_Center", "Cost_Distribution_Center"))
display(transport_cost_table)

Screenshot 2024-06-10 at 14.05.23.png

# 処理を行うすべての情報を持つテーブルの作成。このテーブルの1行で、製品とプラントに対応し列方向で表現
# - プラント(行方向)から配送センター(列方向)への輸送コスト
# - それぞれのプラント(列方向)からのそれぞれの商品(行方向)を供給する際の制約
# - それぞれの配送センター(列方向)の需要を満たすそれぞれの商品(行方向)の需要の制約
lp_table_all_info = (transport_cost_table.
                     join(plant_supply, ["product"], how="inner").
                     join(distribution_center_demand, ["product"], how="inner")
                    )
display(lp_table_all_info)

Screenshot 2024-06-10 at 14.05.47.png

# 最終結果テーブルの出力スキーマの定義
res_schema = StructType(
  [
    StructField('product', StringType()),
    StructField('plant', StringType()),
    StructField('distribution_center', StringType()),
    StructField('qty_shipped', IntegerType())
  ]
)
# ある商品に対するLPを解く関数の定義
def transport_optimization(pdf: pd.DataFrame) -> pd.DataFrame:

  # プラントのリスト、これはプラントに関係するその他のデータ構造の順番を定義します
  plants_lst = sorted(pdf["plant"].unique().tolist())

  # 配送センターのリスト、これは配送センターに関係するその他のデータ構造の順番を定義します
  p = re.compile('^Cost_(Distribution_Center_\d)$')
  distribution_centers_lst = sorted([ s[5:] for s in list(pdf.columns.values) if p.search(s) ])

  # 取りうるすべての経路を定義
  routes = [(p, d) for p in plants_lst for d in distribution_centers_lst]

  # LP変数を格納するディクショナリーの作成。ディクショナリーへの参照キーはプラント名であり、配送センター名です。そしてデータはRoute_Tupleとなります。
  # (例えば、["plant_1"]["distribution_center_1"]: Route_plant_1_distribution_center_1)。下限を0にし、上限をNone、integerとして定義します。
  vars = pulp.LpVariable.dicts("Route", (plants_lst, distribution_centers_lst), 0, None, pulp.LpInteger)

  # 他の検索テーブルのサブセットを作成
  ss_prod = pdf[ "product" ][0]

  # コスト、配送センターとプラントの順序が問題となります
  transport_cost_table_pdf = pdf.filter(regex="^Cost_Distribution_Center_\d+$|^plant$")
  transport_cost_table_pdf = (transport_cost_table_pdf.
                              rename(columns=lambda x: re.sub("^Cost_Distribution_Center","Distribution_Center",x)).
                              set_index("plant").
                              reindex(plants_lst, axis=0).
                              reindex(distribution_centers_lst, axis=1)
                             )
  costs = pulp.makeDict([plants_lst, distribution_centers_lst], transport_cost_table_pdf.values.tolist(), 0)

  # 供給、プラントの順序が問題となります
  plant_supply_pdf = (pdf.
                      filter(regex="^Supply_Plant_\d+$").
                      drop_duplicates().
                      rename(columns=lambda x: re.sub("^Supply_Plant","plant",x)).
                      reindex(plants_lst, axis=1)
                     )

  supply = plant_supply_pdf.to_dict("records")[0]

  # 需要、配送センターの順序が問題となります
  distribution_center_demand_pdf =  (pdf.
                      filter(regex="^Demand_Distribution_Center_\d+$").
                      drop_duplicates().
                      rename(columns=lambda x: re.sub("^Demand_Distribution_Center","Distribution_Center",x)).
                      reindex(distribution_centers_lst, axis=1)
                     )

  demand = distribution_center_demand_pdf.to_dict("records")[0]

  # 問題データを含む'prob'変数の作成
  prob = pulp.LpProblem("Product_Distribution_Problem", pulp.LpMinimize)

  # まず、'prob'に対する目的関数を追加します
  prob += (
      pulp.lpSum([vars[p][d] * costs[p][d] for (p, d) in routes]),
      "Sum_of_Transporting_Costs",
  )

  # 供給の制約を追加します
  for p in plants_lst:
      prob += (
          pulp.lpSum([vars[p][d] for d in distribution_centers_lst]) <= supply[p],
          f"Sum_of_Products_out_of_Plant_{p}",
      )

  # 需要の制約を追加します
  for d in distribution_centers_lst:
      prob += (
          pulp.lpSum([vars[p][d] for p in plants_lst]) >= demand[d],
          f"Sum_of_Products_into_Distibution_Center{d}",
      )

  # PuLPのソルバー選択を用いて問題が解かれます
  prob.solve()

  # 商品ごとの出力を書き出します
  if (pulp.LpStatus[prob.status] == "Optimal"):
    name_lst = [ ]
    value_lst = [ ]
    for v in prob.variables():
      name_lst.append(v.name) 
      value_lst.append(v.varValue)
      res = pd.DataFrame(data={'name': name_lst, 'qty_shipped': value_lst})
      res[ "qty_shipped" ] = res[ "qty_shipped" ].astype("int")
      res[ "plant" ] =  res[ "name" ].str.extract(r'(plant_\d+)')
      res[ "distribution_center" ] =  res[ "name" ].str.extract(r'(Distribution_Center_\d+)')
      res[ "product" ] = ss_prod
      res = res.drop("name", axis = 1)
      res = res[[ "product", "plant", "distribution_center", "qty_shipped"]]
  else:
      res = pd.DataFrame(data= {  "product" : [ ss_prod ] , "plant" : [ None ], "distribution_center" : [ None ], "qty_shipped" : [ None ]})
  return res
# 関数のテスト
product_selection = "nail_1"
pdf = lp_table_all_info.filter(f.col("product")==product_selection).toPandas()
transport_optimization(pdf)

Screenshot 2024-06-10 at 14.06.32.png

Spark UDFを用いてLPを解きます。

spark.conf.set("spark.databricks.optimizer.adaptive.enabled", "false")
n_tasks = lp_table_all_info.select("product").distinct().count()

optimal_transport_df = (
  lp_table_all_info
  .repartition(n_tasks, "product")
  .groupBy("product")
  .applyInPandas(transport_optimization, schema=res_schema)
)

Deltaへの保存

shipment_recommendations_df_delta_path = os.path.join(cloud_storage_path, 'shipment_recommendations_df_delta')
# データの書き出し 
optimal_transport_df.write \
.mode("overwrite") \
.format("delta") \
.save(shipment_recommendations_df_delta_path)
spark.sql(f"DROP TABLE IF EXISTS {dbName}.shipment_recommendations")
spark.sql(f"CREATE TABLE {dbName}.shipment_recommendations USING DELTA LOCATION '{shipment_recommendations_df_delta_path}'")
display(spark.sql(f"SELECT * FROM {dbName}.shipment_recommendations"))

それぞれのプラントでそれぞれの商品をいくつ配送センターに輸送すべきかの推奨値が表示されます。
Screenshot 2024-06-10 at 14.07.58.png

ちなみに、今回作成されたテーブル間のリネージ(依存関係)はこのようになっています。
Screenshot 2024-06-10 at 14.09.46.png

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?