やること
Snowflakeで数理最適化の問題(整数計画問題)を解いてみるで解いた最適化問題の中でも難しい組合せ最適化問題をSnowflake を用いて解いてみる.少し難しい理由は,こちら明日から役立つ「オペレーションズ・リサーチ概論」を視聴いただければ幸いである.
今回は,ナップサック問題と呼ばれる問題を解いてみる.ナップサック問題とは,n 種類の品物(各々,価値vi,重量wi)が与えられたとき,重量の合計がW を超えない範囲で品物のいくつかをナップサックに入れて,その入れた品物の価値の合計を最大化するには入れる品物の組み合わせをどのように選べばよいか」という組合せ最適化問題である.この問題は,NP困難と呼ばれる問題のクラスに属する.
サンプルコード
以下,Python on Snowflake で動作するサンプルコードである.
Snowflake のデータベースへのアクセスは行っていないが,応用すれば,Snowflake のデータベースからデータを入力し,結果を書き込むことも可能である.今回は,単純な全探索,再帰関数を使った探索,動的計画法を使った探索の3つのアルゴリズムについて紹介する.
単純な全探索
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
import pandas as pd
from itertools import combinations
def main(session: snowpark.Session):
# サンプルデータ
values = [60, 100, 120] # 価値
weights = [10, 20, 30] # 重さ
capacity = 50 # ナップザックの容量
# ナップザック問題を全探索で解く
max_value, selected_items = knapsack_bruteforce(values, weights, capacity)
# 結果をフォーマットしてDataFrameに格納
output_data = {"Variable": [], "Value": []}
for i in range(len(values)):
output_data["Variable"].append(f"x{i+1}")
output_data["Value"].append(1 if i in selected_items else 0)
output_data["Variable"].append("opt")
output_data["Value"].append(max_value)
result_df = pd.DataFrame(output_data)
print(result_df)
dataframe = session.create_dataframe(result_df)
return dataframe
# ナップザック問題の全探索による解法
def knapsack_bruteforce(values, weights, capacity):
n = len(values)
max_value = 0
best_combination = []
# すべての組み合わせを試す
for r in range(1, n + 1):
for combination in combinations(range(n), r):
total_weight = sum(weights[i] for i in combination)
total_value = sum(values[i] for i in combination)
if total_weight <= capacity and total_value > max_value:
max_value = total_value
best_combination = combination
return max_value, best_combination
再帰関数を使った探索
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
import pandas as pd
def main(session: snowpark.Session):
# サンプルデータ
values = [60, 100, 120] # 価値
weights = [10, 20, 30] # 重さ
capacity = 50 # ナップザックの容量
# ナップザック問題を解く
max_value = knapsack_recursive(values, weights, capacity, len(values))
selected_items = get_selected_items(values, weights, capacity)
# 結果をフォーマットしてDataFrameに格納
output_data = {"Variable": [], "Value": []}
for i in range(len(values)):
output_data["Variable"].append(f"x{i+1}")
output_data["Value"].append(1 if i in selected_items else 0)
output_data["Variable"].append("opt")
output_data["Value"].append(max_value)
df_result = pd.DataFrame(output_data)
dataframe = session.create_dataframe(df_result)
return dataframe
# ナップザック問題の再帰的解法
def knapsack_recursive(values, weights, capacity, n):
# ベースケース: アイテムがないか容量が0の場合
if n == 0 or capacity == 0:
return 0
# アイテムが容量を超える場合は選べない
if weights[n - 1] > capacity:
return knapsack_recursive(values, weights, capacity, n - 1)
# アイテムを選ぶ場合と選ばない場合の最大値を計算
include_item = values[n - 1] + knapsack_recursive(values, weights, capacity - weights[n - 1], n - 1)
exclude_item = knapsack_recursive(values, weights, capacity, n - 1)
return max(include_item, exclude_item)
# 最適解のアイテムを復元
def get_selected_items(values, weights, capacity):
n = len(values)
selected_items = []
# 再帰的に最適解を構築
while n > 0 and capacity > 0:
if knapsack_recursive(values, weights, capacity, n) != knapsack_recursive(values, weights, capacity, n - 1):
selected_items.append(n - 1)
capacity -= weights[n - 1]
n -= 1
selected_items.reverse()
return selected_items
動的計画法を使った探索
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
import pandas as pd
def main(session: snowpark.Session):
# サンプルデータ
values = [60, 100, 120] # 価値
weights = [10, 20, 30] # 重さ
capacity = 50 # ナップザックの容量
# ナップザック問題を解く
max_value, selected_items = knapsack(values, weights, capacity)
# 結果をフォーマットしてDataFrameに格納
output_data = {"Variable": [], "Value": []}
for i in range(len(values)):
output_data["Variable"].append(f"x{i+1}")
output_data["Value"].append(1 if i in selected_items else 0)
output_data["Variable"].append("opt")
output_data["Value"].append(max_value)
df_result = pd.DataFrame(output_data)
dataframe = session.create_dataframe(df_result)
return dataframe
# ナップザック問題の動的計画法による解法
def knapsack(values, weights, capacity):
n = len(values)
dp = [[0] * (capacity + 1) for _ in range(n + 1)]
# DPテーブルを埋める
for i in range(1, n + 1):
for w in range(capacity + 1):
if weights[i - 1] <= w:
dp[i][w] = max(dp[i - 1][w], dp[i - 1][w - weights[i - 1]] + values[i - 1])
else:
dp[i][w] = dp[i - 1][w]
# 最適値
max_value = dp[n][capacity]
# 最適解(どのアイテムを選んだか)を復元
w = capacity
selected_items = []
for i in range(n, 0, -1):
if dp[i][w] != dp[i - 1][w]:
selected_items.append(i - 1)
w -= weights[i - 1]
selected_items.reverse()
return max_value, selected_items