やること
Snowflakeで数理最適化の問題(組合せ最適化問題)を解いてみる(1/2)で解いたナップサック問題について,少し規模の大きい入力データに対する問題ををSnowflake を用いて解いてみる.数理最適化の基礎について学びたい方は,こちら明日から役立つ「オペレーションズ・リサーチ概論」を視聴いただければ幸いである.
サンプルコード
以下,Python on Snowflake で動作するサンプルコードである.
Snowflake のデータベースへのアクセスは行っていないが,応用すれば,Snowflake のデータベースからデータを入力し,結果を書き込むことも可能である.今回は,ナップサック数20つ,アイテム1,000つに対して,各ナップサックの容量およびアイテムの重さ,価値を乱数で与え,数単純な全探索,再帰関数を使った探索,動的計画法を使った探索の3つのアルゴリズムで解いた結果について紹介する.
全探索
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
import pandas as pd
from itertools import combinations
import random
def main(session: snowpark.Session):
# サンプルデータ(100倍のアイテムと10倍のナップザック)
num_items = 1000
num_knapsacks = 20
values = [random.randint(1, 1000) for _ in range(num_items)] # ランダムな価値
weights = [random.randint(1, 500) for _ in range(num_items)] # ランダムな重さ
capacities = [random.randint(1000, 5000) for _ in range(num_knapsacks)] # ランダムなナップザックの容量
# ナップザック問題を全探索で解く
max_value, best_combinations = knapsack_bruteforce_multiple(values, weights, capacities)
# 結果をフォーマットしてDataFrameに格納
output_data = {"Knapsack": [], "Variable": [], "Value": []}
for k in range(len(capacities)):
for i in range(len(values)):
output_data["Knapsack"].append(f"Knapsack {k+1}")
output_data["Variable"].append(f"x{i+1}")
output_data["Value"].append(1 if i in best_combinations[k] else 0)
output_data["Knapsack"].append("Total")
output_data["Variable"].append("opt")
output_data["Value"].append(max_value)
df_result = pd.DataFrame(output_data)
dataframe = session.create_dataframe(df_result)
return dataframe
# ナップザック問題の全探索による解法
def knapsack_bruteforce_multiple(values, weights, capacities):
n = len(values)
num_knapsacks = len(capacities)
max_value = 0
best_combinations = [[] for _ in range(num_knapsacks)]
# すべての組み合わせを試す
for r in range(1, n + 1):
for combination in combinations(range(n), r):
total_weights = [0] * num_knapsacks
total_value = 0
assignment = [-1] * len(combination)
for idx, item in enumerate(combination):
for k in range(num_knapsacks):
if total_weights[k] + weights[item] <= capacities[k]:
total_weights[k] += weights[item]
total_value += values[item]
assignment[idx] = k
break
if all(a != -1 for a in assignment) and total_value > max_value:
max_value = total_value
best_combinations = [[] for _ in range(num_knapsacks)]
for idx, item in enumerate(combination):
best_combinations[assignment[idx]].append(item)
return max_value, best_combinations
実行結果
再帰関数
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
import pandas as pd
import random
def main(session: snowpark.Session):
# サンプルデータ(100倍のアイテムと10倍のナップザック)
num_items = 1000
num_knapsacks = 20
values = [random.randint(1, 1000) for _ in range(num_items)] # ランダムな価値
weights = [random.randint(1, 500) for _ in range(num_items)] # ランダムな重さ
capacities = [random.randint(1000, 5000) for _ in range(num_knapsacks)] # ランダムなナップザックの容量
# ナップザック問題を再帰的に解く
selected_items = get_selected_items_recursive(values, weights, capacities)
max_value = sum(sum(values[i] for i in selected_items[k]) for k in range(len(capacities)))
# 結果をフォーマットしてDataFrameに格納
output_data = {"Knapsack": [], "Variable": [], "Value": []}
for k in range(len(capacities)):
for i in range(len(values)):
output_data["Knapsack"].append(f"Knapsack {k+1}")
output_data["Variable"].append(f"x{i+1}")
output_data["Value"].append(1 if i in selected_items[k] else 0)
output_data["Knapsack"].append("Total")
output_data["Variable"].append("opt")
output_data["Value"].append(max_value)
df_result = pd.DataFrame(output_data)
dataframe = session.create_dataframe(df_result)
return dataframe
# ナップザック問題の再帰的解法
def knapsack_recursive_multiple(values, weights, capacities, n, k):
# ベースケース: アイテムがない、または容量が0の場合
if n == 0 or capacities[k] == 0:
return 0
# アイテムが容量を超える場合は選べない
if weights[n - 1] > capacities[k]:
return knapsack_recursive_multiple(values, weights, capacities, n - 1, k)
# アイテムを選ぶ場合と選ばない場合の最大値を計算
include_item = values[n - 1] + knapsack_recursive_multiple(values, weights, capacities[:k] + [capacities[k] - weights[n - 1]] + capacities[k + 1:], n - 1, k)
exclude_item = knapsack_recursive_multiple(values, weights, capacities, n - 1, k)
return max(include_item, exclude_item)
# 各ナップザックに割り当てられたアイテムを復元する
def get_selected_items_recursive(values, weights, capacities):
n = len(values)
num_knapsacks = len(capacities)
selected_items = [[] for _ in range(num_knapsacks)]
for k in range(num_knapsacks):
capacity = capacities[k]
for i in range(n, 0, -1):
if knapsack_recursive_multiple(values, weights, [capacity], i, 0) != knapsack_recursive_multiple(values, weights, [capacity], i - 1, 0):
selected_items[k].append(i - 1)
capacity -= weights[i - 1]
return selected_items
実行結果
動的計画法
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import col
import pandas as pd
import random
def main(session: snowpark.Session):
# サンプルデータ(100倍のアイテムと10倍のナップザック)
num_items = 1000
num_knapsacks = 20
values = [random.randint(1, 1000) for _ in range(num_items)] # ランダムな価値
weights = [random.randint(1, 500) for _ in range(num_items)] # ランダムな重さ
capacities = [random.randint(1000, 5000) for _ in range(num_knapsacks)] # ランダムなナップザックの容量
# ナップザック問題を動的計画法で解く
max_value, selected_items = knapsack_dp_multiple(values, weights, capacities)
# 結果をフォーマットしてDataFrameに格納
output_data = {"Knapsack": [], "Variable": [], "Value": []}
for k in range(len(capacities)):
for i in range(len(values)):
output_data["Knapsack"].append(f"Knapsack {k+1}")
output_data["Variable"].append(f"x{i+1}")
output_data["Value"].append(1 if i in selected_items[k] else 0)
output_data["Knapsack"].append("Total")
output_data["Variable"].append("opt")
output_data["Value"].append(max_value)
df_result = pd.DataFrame(output_data)
dataframe = session.create_dataframe(df_result)
return dataframe
# ナップザック問題の動的計画法による解法
def knapsack_dp_multiple(values, weights, capacities):
n = len(values)
num_knapsacks = len(capacities)
# DPテーブルを初期化
dp = [[[0] * (capacities[k] + 1) for k in range(num_knapsacks)] for _ in range(n + 1)]
# DPテーブルを埋める
for i in range(1, n + 1):
for k in range(num_knapsacks):
for w in range(capacities[k] + 1):
if weights[i - 1] <= w:
dp[i][k][w] = max(dp[i - 1][k][w], dp[i - 1][k][w - weights[i - 1]] + values[i - 1])
else:
dp[i][k][w] = dp[i - 1][k][w]
# 最適解とその構成を復元
selected_items = [[] for _ in range(num_knapsacks)]
max_value = sum(dp[n][k][capacities[k]] for k in range(num_knapsacks))
for k in range(num_knapsacks):
w = capacities[k]
for i in range(n, 0, -1):
if dp[i][k][w] != dp[i - 1][k][w]:
selected_items[k].append(i - 1)
w -= weights[i - 1]
return max_value, selected_items
実行結果
まとめ
少し規模の大きい入力データでのナップサック問題を解く場合,探索アルゴリズムを工夫しなければ厳密な最適解が得られないことを確認した.