概要
これは Money Forward Engineering Advent Calendar 2021 3 日目の記事です 🎄
運用しているとある Rails アプリケーションのある機能でリクエストがタイムアウトしてしまう問題が生じたのが発端です。ある n 個の整数の集合 $\{a_1, a_2, ..., a_n\}$ から、和が与えられた sum に等しくなるように部分集合を選ぶ問題を解く処理があります。この問題を 部分和問題 と呼びます。例えば集合が $\{3, 34, 4, 12, 5, 2\}$ で sum = 20 の場合、和が 20 となる部分集合は $\{3, 12, 5\}$です。
今回 n の数がある程度大きい場合に、Rails アプリケーション内の処理に含まれる部分和問題を数分では解くことができず、結果 HTTP リクエストのタイムアウトが発生してしまいました。そして、このタイムアウトは動的計画法というアルゴリズムを用いることで解決できました。その経緯についてお話します。
なお、動的計画法自体の解説については本記事では省略します。「そもそも動的計画法って何?」という方もいらっしゃると思います。Qiita 上に素晴らしい解説記事がありますので、まずは以下をご参考ください。私も非常にお世話になりました 🙏
-
動的計画法超入門! Educational DP Contest の A ~ E 問題の解説と類題集
- 動的計画法について。
-
典型的な DP (動的計画法) のパターンを整理 Part 1 ~ ナップサック DP 編 ~
- 動的計画法の応用例について。
-
意外と解説がない!動的計画法で得た最適解を「復元」する一般的な方法
- 動的計画法では解が存在することは判定できるが、その解の具体的な値を知るには別途復元する方法が必要になる。
内容
部分和問題を解くのに時間がかかる
もともとは以下のコードで部分和問題を解いていました。部分集合の大きさを 1 から n まで増やしながら Array#combination を使って部分和が sum になる部分集合を探索するナイーブ (愚直) な方法です。
def resolve_subset_sum_old_version(values:, sum:)
# 集合の合計が sum に満たない場合は解は見つからない。
return nil if values.sum < sum
# 集合の合計がちょうど sum の場合は集合自体が解になる。
return values if values.sum == sum
(1..values.size).each do |i|
values.combination(i).each do |sub_values|
return sub_values if sub_values.sum == sum
end
end
nil
end
この方法は n = 10 の場合は瞬時に解くことができます。
require 'benchmark'
# n = 10
values = [3590, 1260, 2560, 510, 1780, 2710, 120, 610, 2410, 2620]
sum = 10000
resolve_subset_sum_old_version(values: values, sum: sum)
#=> [3590, 1260, 120, 2410, 2620]
Benchmark.realtime { resolve_subset_sum_old_version(values: values, sum: sum) }
#=> 0.00012099999003112316 # 秒
しかし、n = 25 では暗雲が立ち込めてきます 🌧
require 'benchmark'
# n = 25
values = [3590, 1260, 2560, 510, 1780, 2710, 120, 610, 2410, 2620, 1250, 1910, 50, 4130, 2760, 190, 720, 1560, 2590, 2400, 2090, 3590, 650, 4320, 4420]
# 部分集合の大きさが小さい、つまり探索回数が少なかった場合。
sum = 20000
resolve_subset_sum_old_version(values: values, sum: sum)
#=> [3590, 2560, 2710, 4130, 2590, 4420]
Benchmark.realtime { resolve_subset_sum_old_version(values: values, sum: sum) }
#=> 0.009242000058293343 # 秒
# 部分集合の大きさが大きい、つまり探索回数が多かった場合。
sum = 50000
resolve_subset_sum_old_version(values: values, sum: sum)
#=> [3590, 1260, 2560, 510, 1780, 2710, 120, 2410, 2620, 1250, 1910, 50, 4130, 2760, 720, 1560, 2590, 2400, 2090, 3590, 650, 4320, 4420]
Benchmark.realtime { resolve_subset_sum_old_version(values: values, sum: sum) }
#=> 3.2122450000606477 # 秒
なお、現在は Mac mini (M1, 2020) で動かしていますが、MacBook Pro (15-inch, 2019) では sum = 50000 の場合は処理に 12 秒ほどかかりました。サーバのスペックによっては数分以上かかってしまいそうな予感です。
(1..values.size).each do |i|
values.combination(i).each do |sub_values|
return sub_values if sub_values.sum == sum
end
end
この探索方法の計算量は $O(2^n\cdot n)$ です。Array#combination の結果、つまり探索対象の部分集合の数が指数関数的に増えていくため、n が大きくなった場合に地獄が発生するのは簡単に想像できますね 😇
動的計画法を用いる
Wikipedia で 部分和問題 について調べると
部分和問題は、ナップサック問題に含まれるため、動的計画法等の手法で解くことができる。(詳しくは、ナップサック問題の項を参照。)
との記載があります。早速動的計画法を用いてみましょう 💪 実装時の要件は次の通りです。
- 引数として n 個の整数の集合を values、合計値 sum を sum として与える。
- n + 1 行 sum + 1 列の動的計画法のテーブル
dp_table
を用意し、初期値 0 を設定する。- それぞれ +1 しているのは初期値を参照する行や列を用意するため。
- 解を復元するための復元テーブル
restoration_table
をdp_table
と同じ大きさで用意し、初期値 0 を設定する。 -
dp_table[i][j]
は values の i 番目までの値の中で j 以下になるように値を選んだ場合の部分和の最大値を表している。 -
restoration_table[i][j]
はdp_table[i][j]
がdp[i - 1][restoration_table[i][j]]
によって更新されたことを表している。 - 動的計画法では以下の漸化式に基づいて処理する。
- (スペースの都合のため
dp_table
をdp
、values
をvals
と表記します。)
- (スペースの都合のため
\rm{dp}[0][j] = 0 (j = 0, 1, \dots, sum)
\rm{dp}[i][j] = \left\{
\begin{array}{ll}
\rm max({dp}[i-1][j - vals[i-1]] + vals[i-1], \rm{dp}[i - 1][j]) & (j \ge vals[i - 1]) \\
\rm{dp}[i - i][j] & (j < vals[i - 1])
\end{array}
\right.
# 動的計画法を用いて部分和問題を解くためのクラス。
class ResolveSubsetSumProblem
attr_reader :values, :sum
def self.call(...)
new(...).call
end
def initialize(values:, sum:)
@values = values
@sum = sum
end
def call
return nil if values.sum < sum
selected_values = find_values_without_dp
return selected_values if selected_values
dp_table, restoration_table = find_values_with_dp
# 動的計画法のテーブルの最下行と最右列の要素が sum ではない場合、解は存在しないと判定する。
return nil unless dp_table[n][sum] == sum
restore_selected_values(restoration_table)
end
private
def find_values_without_dp
return values if values.sum == sum
value = values.find { |value| value == sum }
return [value] if value
nil
end
# 動的計画法を使って部分和を解く。
# 同時に解 (部分集合) を復元するための準備をしておく。
def find_values_with_dp
# 動的計画法のテーブル。
# dp_table[i][j] は values の i 番目までの値の中で j 以下になる、
# かつ和が最も大きくなるように値を選んだ場合の部分和を表している。
dp_table = Array.new(n + 1) { Array.new(sum + 1, 0) }
# 解を復元するためのテーブル。
# value = restoration_table[i][j] が存在する場合、
# value は dp_table[i][j] が dp[i - 1][value] によって更新されたことを表している。
restoration_table = Array.new(n + 1) { Array.new(sum + 1, nil) }
(1..n).each do |i|
(1..sum).each do |j|
k = j - values[i - 1]
# values[i - 1] を選ぶ場合。
if j >= values[i - 1] && dp_table[i - 1][j] < dp_table[i - 1][k] + values[i - 1]
dp_table[i][j] = dp_table[i - 1][k] + values[i - 1]
restoration_table[i][j] = k
# values[i - 1] を選ばない場合。
else
dp_table[i][j] = dp_table[i - 1][j]
end
end
end
[dp_table, restoration_table]
end
# 復元テーブルから選んだ値を復元する。
def restore_selected_values(restoration_table)
selected_values = []
tmp_sum = sum
n.downto(1) do |i|
next unless restoration_table[i][tmp_sum]
# values[i - 1] を選んでいた場合。
selected_values << values[i - 1]
tmp_sum = restoration_table[i][tmp_sum]
end
selected_values.reverse
end
def n
values.size
end
end
def resolve_subset_sum_new_version(values:, sum:)
ResolveSubsetSumProblem.call(values: values, sum: sum)
end
require 'benchmark'
sum = 50000
resolve_subset_sum_new_version(values: values, sum: sum)
#=> [3590, 1260, 2560, 510, 1780, 2710, 120, 2410, 2620, 1250, 1910, 50, 4130, 2760, 720, 1560, 2590, 2400, 2090, 3590, 650, 4320, 4420]
Benchmark.realtime { resolve_subset_sum_new_version(values: values, sum: sum) }
#=> 0.3366469999309629 # 秒
非常に早くなりました 💪 さきほどのナイーブな方法と比較してみます。
values = [3590, 1260, 2560, 510, 1780, 2710, 120, 610, 2410, 2620, 1250, 1910, 50, 4130, 2760, 190, 720, 1560, 2590, 2400, 2090, 3590, 650, 4320, 4420]
require 'benchmark'
Benchmark.bm(40) do |r|
m = 5
r.report 'explore all subsets (sum: 10,000)' do
m.times { resolve_subset_sum_old_version(values: values, sum: 10000) }
end
r.report 'use dynamic programming (sum: 10,000)' do
m.times { resolve_subset_sum_new_version(values: values, sum: 10000) }
end
r.report 'explore all subsets (sum: 30,000)' do
m.times { resolve_subset_sum_old_version(values: values, sum: 30000) }
end
r.report 'use dynamic programming (sum: 30,000)' do
m.times { resolve_subset_sum_new_version(values: values, sum: 30000) }
end
r.report 'explore all subsets (sum: 50,000)' do
m.times { resolve_subset_sum_old_version(values: values, sum: 50000) }
end
r.report 'use dynamic programming (sum: 50,000)' do
m.times { resolve_subset_sum_new_version(values: values, sum: 50000) }
end
end
# user system total real
# explore all subsets (sum: 10,000) 0.001162 0.000032 0.001194 ( 0.001386)
# use dynamic programming (sum: 10,000) 0.301114 0.003848 0.304962 ( 0.305438)
# explore all subsets (sum: 30,000) 0.883106 0.007375 0.890481 ( 0.910092)
# use dynamic programming (sum: 30,000) 0.877341 0.011383 0.888724 ( 0.890696)
# explore all subsets (sum: 50,000) 15.447484 0.156102 15.603586 ( 15.618439)
# use dynamic programming (sum: 50,000) 1.526902 0.019747 1.546649 ( 1.548114)
sum | ナイーブな方法 (愚直な探索) | 動的計画法を用いる方法 |
---|---|---|
10,000 | 0.001386 秒 | 0.305438 秒 |
30,000 | 0.910092 秒 | 0.890696 秒 |
50,000 | 15.618439 秒 | 1.548114 秒 |
探索回数が少ない場合 (結果の部分集合が小さい場合) は動的計画法を用いる方法の方が時間がかっていますが、探索回数が多くなるにつれてナイーブな方法では時間が指数関数的に増えているのが分かります。一方、動的計画法を用いる方法ではあくまで比例的に増えているだけです。これはナイーブな方法の計算量が $O(2^n\cdot n)$ だったのに対し、動的計画法を用いた方法の計算量は $O(n\cdot{sum})$ であるためです。
動的計画法のデメリット
ただし、動的計画法では sum が大きな場合に内部で使用する 2 次元配列のサイズが巨大になるためメモリを消費するというデメリットもあります。
require 'objspace'
# [Ruby] メモリ使用量を計測する
# https://qiita.com/QUANON/items/b559548a99395ad0fd0b
def measure_memsize_before_and_after
# メモリ使用量をなるべく正確に計測するために、事前に GC を実行しておく。
GC.start
before = ObjectSpace.memsize_of_all
yield
after = ObjectSpace.memsize_of_all
[before, after]
end
values = [3590, 1260, 2560, 510, 1780, 2710, 120, 610, 2410, 2620, 1250, 1910, 50, 4130, 2760, 190, 720, 1560, 2590, 2400, 2090, 3590, 650, 4320, 4420]
sum = 50000
# ナイーブな方法 (愚直な探索)
before_memsize, after_memsize = measure_memsize_before_and_after { resolve_subset_sum_old_version(values: values, sum: sum) }
((after_memsize - before_memsize).to_f / (1024 ** 2)).round(2)
#=> 2.89 # MB
# 動的計画法を用いる方法
before_memsize, after_memsize = measure_memsize_before_and_after { resolve_subset_sum_new_version(values: values, sum: sum) }
((after_memsize - before_memsize).to_f / (1024 ** 2)).round(2) # MB
#=> 19.84 # MB
sum が大きな数の場合は処理時間も増大するので、動的計画法のテーブルのサイズを圧縮するような工夫が必要そうです (未調査) 😇
large_values = values.map { _1 * 100 }
large_sum = sum * 100
require 'benchmark'
Benchmark.realtime { resolve_subset_sum_new_version(values: large_values, sum: large_sum) }
#=> 24.495768000138924 # 秒
before_memsize, after_memsize = measure_memsize_before_and_after { resolve_subset_sum_new_version(values: large_values, sum: large_sum) }
((after_memsize - before_memsize).to_f / (1024 ** 2)).round(2)
#=> 1983.65 # MB
まとめ
- 部分和問題はナップサック問題の一部で、動的計画法を用いることで効率的に解くことができる 💪
- 動的計画法で部分和問題を解く際に、動的計画法用のテーブルのみでは解があることしかわからない。しかし、復元テーブルを用いることで具体的な解のひとつを求めることができる 💖
- 動的計画法はテーブルのサイズが大きい場合にメモリを消費するので注意が必要 🙄
最後に
引き続き Money Forward Engineering Advent Calendar 2021 をお楽しみください 🙌
参考
記事
- 動的計画法超入門! Educational DP Contest の A ~ E 問題の解説と類題集
- 典型的な DP (動的計画法) のパターンを整理 Part 1 ~ ナップサック DP 編 ~
- 意外と解説がない!動的計画法で得た最適解を「復元」する一般的な方法
書籍
-
なっとく! アルゴリズム
- ゆるふわテイストなイラストと文章でアルゴリズムをわかりやすく説明してくれる良書です。
- 動的計画法についても説明が詳しく載っています。
- 書籍中のコードの言語は Python です 🐍