0
3

More than 1 year has passed since last update.

pythonによる列生成法の実装と速度比較

Posted at

結論

  • 列生成法をpythonで実装し、CBCソルバーで解く場合は、pulpよりpython-mipが速い。
  • 列生成法をpulpで実装する場合は、CBCソルバーを使うよりscipy.optimize.milpに変換して解くほうが速そう。

背景

pulp, python-mip, scipy.optimize.milpなど、混合整数線形計画(MILP)ソルバーをpythonで使用するためのモデラーやラッパーが色々ある。

ただし、pythonは遅いので、実際に本番運用する際は、求解時間だけでなくモデリングや解のエンコード/デコードにかかる時間にも注意しなければならない。特に、問題を分解して繰り返し解くようなアプローチをとる場合は、要注意である。

今回は代表的な問題分割方法といえる列生成法で、これらのラッパーを使った場合の速度比較を行う。

列生成法の初心者向けのわかりやすい文献として、宮本裕一郎先生の「はじめての列生成法」があり、この文献で扱っているビンパッキング問題を実装してみた。

実装パターンは、以下の3つ。pulp2matは自作したpulp->scipyの変換ツール。
https://pypi.org/project/pulp2mat/

  1. pulp + CBC
  2. python-mip + CBC
  3. pulp + pulp2mat + scipy.optimize.milp

実行結果:

ビンのサイズ10, アイテム数100で計算時間の比較を行った。

Elapsed Time [s] 順位
pulp + CBC 19.1 3
python-mip + CBC 2.4 1
pulp + pulp2mat + scipy.optimize.milp 3.7 2

ある程度事前の予想通り、2. python-mip + CBC > 3. pulp + pulp2mat + scipy.optimize.milp > 1. pulp + CBCの順となった。2、3はpythonから直接ソルバーを実行するが、1はexeを外部プロセスとして実行するので、その分遅くなっていると思われる。3はおそらくpulp2matの変換の分か、あるいはソルバーの性能差によって2よりも遅くなっている可能性がある。

実装

それぞれインスタンスを生成しているように書いてあるが、実際は1つの乱数で生成した同じインスタンスで比較している。

pulp + CBC

from pulp import LpVariable, lpSum, lpDot, LpBinary, LpContinuous, LpProblem, LpStatus
import random
import pulp

random.seed = 42

class BinPackingColGen:
    def __init__(self, N, B, S, timeLimit=100):
        self.problem = LpProblem()
        self.N = N
        self.B = B
        self.S = S
        self.s = [random.randint(1, S) for i in range(N)]
        self.solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=timeLimit)
        self.solver.msg = False
        self.STATUS_SOLVED = 1
        self.STATUS_SOLVING = 0
        self.STATUS_TIMEOUT = 2
        self.solve_status = self.STATUS_SOLVING
        # 最弱の詰め合わせパターン
        self.A = np.eye(self.N)
    def SolvePrimalProblem(self):
        """最低限の詰め込みパターンを列挙して、主問題を定義する
        """
        self.p_problem = LpProblem(sense=pulp.LpMinimize)
        # 変数
        self.x = {
            j: LpVariable("x_"+str(j), 0, 1, LpBinary) for j in range(self.A.shape[1])
        }
        # 制約条件
        for i in range(self.N):
            self.p_problem += lpSum(self.A[i, j] * self.x[j] for j in range(self.A.shape[1])) >= 1
        # 目的関数
        self.p_problem += lpSum(self.x[j] for j in range(self.A.shape[1]))
        self.p_problem.solve(self.solver)
        
    def InitDualProblem(self):
        """線形緩和した双対問題を定義する
        """
        self.d_problem = LpProblem(sense=pulp.LpMaximize)
        # 変数
        self.y = {
            i: LpVariable("y_"+str(i), 0, 1, LpContinuous) for i in range(self.N)
        }
        # 制約条件
        for j in range(self.A.shape[1]):
            self.d_problem += (
                lpSum(
                    self.A[i, j]*self.y[i] for i in range(self.N)
                ) <= 1
            )
        self.d_problem += lpSum(
            self.y[i] for i in range(self.N)
        )
        
    def SolveKnapsackProblem(self):
        """双対問題の解を用いて、ナップサック問題を定義する
        """
        self.k_problem = LpProblem(sense = pulp.LpMaximize)
        self.y_const = [self.y[i].value() for i in range(self.N)]
        #print(self.y_const)
        self.alpha = {i: LpVariable("alpha_" + str(i), 0, 1, LpBinary) for i in range(self.N)}
        self.k_problem += lpSum(self.alpha[i]*self.s[i] for i in range(self.N)) <= self.B
        self.k_problem += lpSum(self.alpha[i]*self.y_const[i] for i in range(self.N))
        result = self.k_problem.solve(self.solver)
        return result
        
    def SolveDualLP(self):
        self.d_problem.solve(self.solver)
    def ColumnGeneration(self):
        """列生成法で解く
        """
        # 2. 双対問題の初期化
        self.InitDualProblem()
        itr = 0
        while self.solve_status == self.STATUS_SOLVING:
            # 3. 双対問題を解く
            self.SolveDualLP()
            print(self.d_problem.objective.value())
            # 4. A双対問題の解を用いてナップサック問題を解く
            result = self.SolveKnapsackProblem()
            if LpStatus[result] != "Optimal":
                self.solve_status = self.STATUS_TIMEOUT
                print("TimeOutError!")
            elif self.k_problem.objective.value() <= 1 + 1e-7:
                # 最適値が1以下の場合、終了
                self.solve_status = self.STATUS_SOLVED
                out_str = "Epoch: " + str(itr) + "\n"
                out_str = "DP objective: " + str(self.d_problem.objective.value()) + "\n"
                out_str += "KP objective: " + str(self.k_problem.objective.value())
                print(out_str)
            else:
                # 5. ナップサック問題の解から、双対問題に制約条件を追加
                new_items = np.array([self.alpha[i].value() for i in range(self.N)])
                self.A = np.hstack((self.A, new_items.reshape(len(new_items), 1)))
                self.d_problem += lpSum(
                    self.A[i, -1] * self.y[i] for i in range(self.N)
                ) <= 1
                #self.InitDualProblem()
                # 追加したアイテム詰め合わせを表示
                #print(new_items)
                out_str = "Epoch: " + str(itr) + "\n"
                out_str = "DP objective: " + str(self.d_problem.objective.value()) + "\n"
                out_str += "KP objective: " + str(self.k_problem.objective.value()) + "\n"
                out_str += "Items: [" 
                for i in range(self.N):
                    if self.alpha[i].value() > 0:
                        out_str += str(self.s[i]) + ","
                out_str += "]"
                print(out_str)
                itr += 1
                
        # 6. ナップサック問題の解から、主問題に詰め込みパターンを追加
        if self.solve_status == self.STATUS_SOLVED:
            print("Column generation completed! Solving primal problem.")
            self.SolvePrimalProblem()
            print("Completed! Objective value is ", str(self.p_problem.objective.value()))

python-mip + CBC

from mip import Model, maximize, minimize, xsum
import mip
import random

random.seed = 42

class BinPackingColGen:
    def __init__(self, N, B, S, timeLimit=100):
        self.problem = Model()
        self.N = N
        self.B = B
        self.S = S
        self.s = [random.randint(1, S) for i in range(N)]
        self.max_seconds = timeLimit
        self.STATUS_SOLVED = 1
        self.STATUS_SOLVING = 0
        self.STATUS_TIMEOUT = 2
        self.solve_status = self.STATUS_SOLVING
        # 最弱の詰め合わせパターン
        self.A = np.eye(self.N)
    def SolvePrimalProblem(self):
        """最低限の詰め込みパターンを列挙して、主問題を定義する
        """
        self.p_problem = Model()
        # 変数
        self.x = {
            j: self.p_problem.add_var(name="x_"+str(j), lb=0, ub=1, var_type=mip.BINARY) for j in range(self.A.shape[1])
        }
        # 制約条件
        for i in range(self.N):
            self.p_problem += xsum(self.A[i, j] * self.x[j] for j in range(self.A.shape[1])) >= 1
        # 目的関数
        self.p_problem += xsum(self.x[j] for j in range(self.A.shape[1]))
        self.p_problem.verbose = 0
        self.p_problem.optimize(max_seconds = self.max_seconds)
        
    def InitDualProblem(self):
        """線形緩和した双対問題を定義する
        """
        self.d_problem = Model()
        # 変数
        self.y = {
            i: self.d_problem.add_var("y_"+str(i), lb=0, ub=1, var_type=mip.CONTINUOUS) for i in range(self.N)
        }
        # 制約条件
        for j in range(self.A.shape[1]):
            self.d_problem += (
                xsum(
                    self.A[i, j]*self.y[i] for i in range(self.N)
                ) <= 1
            )
        self.d_problem.objective = mip.maximize(xsum(
            self.y[i] for i in range(self.N)
        ))
        
    def SolveKnapsackProblem(self):
        """双対問題の解を用いて、ナップサック問題を定義する
        """
        self.k_problem = Model()
        self.y_const = [self.y[i].x for i in range(self.N)]
        #print(self.y_const)
        self.alpha = {i: self.k_problem.add_var(name="alpha_" + str(i), lb=0, ub=1, var_type=mip.BINARY) for i in range(self.N)}
        self.k_problem += xsum(self.alpha[i]*self.s[i] for i in range(self.N)) <= self.B
        self.k_problem.objective = mip.maximize(xsum(self.alpha[i]*self.y_const[i] for i in range(self.N)))
        self.k_problem.verbose=0
        result = self.k_problem.optimize(max_seconds=self.max_seconds)
        return result
        
    def SolveDualLP(self):
        self.d_problem.verbose=0
        self.d_problem.optimize(max_seconds=self.max_seconds)
    def ColumnGeneration(self):
        """列生成法で解く
        """
        # 2. 双対問題の初期化
        self.InitDualProblem()
        itr = 0
        while self.solve_status == self.STATUS_SOLVING:
            # 3. 双対問題を解く
            self.SolveDualLP()
            print(self.d_problem.objective_value)
            # 4. A双対問題の解を用いてナップサック問題を解く
            result = self.SolveKnapsackProblem()
            if result != mip.OptimizationStatus.OPTIMAL:
                self.solve_status = self.STATUS_TIMEOUT
                print("TimeOutError!")
            elif self.k_problem.objective_value <= 1 + 1e-7:
                # 最適値が1以下の場合、終了
                self.solve_status = self.STATUS_SOLVED
                out_str = "Epoch: " + str(itr) + "\n"
                out_str = "DP objective: " + str(self.d_problem.objective_value) + "\n"
                out_str += "KP objective: " + str(self.k_problem.objective_value)
                print(out_str)
            else:
                # 5. ナップサック問題の解から、双対問題に制約条件を追加
                new_items = np.array([self.alpha[i].x for i in range(self.N)])
                self.A = np.hstack((self.A, new_items.reshape(len(new_items), 1)))
                self.d_problem += xsum(
                    self.A[i, -1] * self.y[i] for i in range(self.N)
                ) <= 1
                #self.InitDualProblem()
                # 追加したアイテム詰め合わせを表示
                #print(new_items)
                out_str = "Epoch: " + str(itr) + "\n"
                out_str = "DP objective: " + str(self.d_problem.objective_value) + "\n"
                out_str += "KP objective: " + str(self.k_problem.objective_value) + "\n"
                out_str += "Items: [" 
                for i in range(self.N):
                    if self.alpha[i].x > 0:
                        out_str += str(self.s[i]) + ","
                out_str += "]"
                print(out_str)
                itr += 1
                
        # 6. ナップサック問題の解から、主問題に詰め込みパターンを追加
        if self.solve_status == self.STATUS_SOLVED:
            print("Column generation completed! Solving primal problem.")
            self.SolvePrimalProblem()
            print("Completed! Objective value is ", str(self.p_problem.objective_value))

pulp + pulp2mat + scipy.optimize.milp

from mip import Model, maximize, minimize, xsum
import mip
import random

random.seed = 42

class BinPackingColGen:
    def __init__(self, N, B, S, timeLimit=100):
        self.problem = LpProblem()
        self.N = N
        self.B = B
        self.S = S
        self.s = [random.randint(1, S) for i in range(N)]
        self.STATUS_SOLVED = 1
        self.STATUS_SOLVING = 0
        self.STATUS_TIMEOUT = 2
        self.solve_status = self.STATUS_SOLVING
        # 最弱の詰め合わせパターン
        self.A = np.eye(self.N)
        self.timeLimit = timeLimit
    def SolvePrimalProblem(self):
        """最低限の詰め込みパターンを列挙して、主問題を定義する
        """
        self.p_problem = LpProblem(sense=pulp.LpMinimize)
        # 変数
        self.x = {
            j: LpVariable("x_"+str(j), 0, 1, LpBinary) for j in range(self.A.shape[1])
        }
        # 制約条件
        for i in range(self.N):
            self.p_problem += lpSum(self.A[i, j] * self.x[j] for j in range(self.A.shape[1])) >= 1
        # 目的関数
        self.p_problem += lpSum(self.x[j] for j in range(self.A.shape[1]))
        c, integrality, constraints, bounds = pulp2mat.convert_all(self.p_problem)
        result = milp(c, integrality=integrality, constraints=constraints, bounds=bounds)
        pulp2mat.decode_solution(result, self.p_problem)
        
    def InitDualProblem(self):
        """線形緩和した双対問題を定義する
        """
        self.d_problem = LpProblem(sense=pulp.LpMaximize)
        # 変数
        self.y = {
            i: LpVariable("y_"+str(i), 0, 1, LpContinuous) for i in range(self.N)
        }
        # 制約条件
        for j in range(self.A.shape[1]):
            self.d_problem += (
                lpSum(
                    self.A[i, j]*self.y[i] for i in range(self.N)
                ) <= 1
            )
        self.d_problem += lpSum(
            self.y[i] for i in range(self.N)
        )
        
    def SolveKnapsackProblem(self):
        """双対問題の解を用いて、ナップサック問題を定義する
        """
        self.k_problem = LpProblem(sense = pulp.LpMaximize)
        self.y_const = [self.y[i].value() for i in range(self.N)]
        #print(self.y_const)
        self.alpha = {i: LpVariable("alpha_" + str(i), 0, 1, LpBinary) for i in range(self.N)}
        self.k_problem += lpSum(self.alpha[i]*self.s[i] for i in range(self.N)) <= self.B
        self.k_problem += lpSum(self.alpha[i]*self.y_const[i] for i in range(self.N))
        c, integrality, constraints, bounds = pulp2mat.convert_all(self.k_problem)
        result = milp(c, integrality=integrality, constraints=constraints, bounds=bounds, options={"time_limit": self.timeLimit})
        pulp2mat.decode_solution(result, self.k_problem)
        #result = self.k_problem.solve(self.solver)
        return pulp.LpStatusOptimal
        
    def SolveDualLP(self):
        c, integrality, constraints, bounds = pulp2mat.convert_all(self.d_problem)
        result = milp(c, integrality=integrality, constraints=constraints, bounds=bounds, options={"time_limit": self.timeLimit})
        pulp2mat.decode_solution(result, self.d_problem)
        #self.d_problem.solve(self.solver)
    def ColumnGeneration(self):
        """列生成法で解く
        """
        # 2. 双対問題の初期化
        self.InitDualProblem()
        itr = 0
        while self.solve_status == self.STATUS_SOLVING:
            # 3. 双対問題を解く
            self.SolveDualLP()
            print(self.d_problem.objective.value())
            # 4. A双対問題の解を用いてナップサック問題を解く
            result = self.SolveKnapsackProblem()
            if LpStatus[result] != "Optimal":
                self.solve_status = self.STATUS_TIMEOUT
                print("TimeOutError!")
            elif self.k_problem.objective.value() <= 1 + 1e-7:
                # 最適値が1以下の場合、終了
                self.solve_status = self.STATUS_SOLVED
                out_str = "Epoch: " + str(itr) + "\n"
                out_str = "DP objective: " + str(self.d_problem.objective.value()) + "\n"
                out_str += "KP objective: " + str(self.k_problem.objective.value())
                print(out_str)
            else:
                # 5. ナップサック問題の解から、双対問題に制約条件を追加
                new_items = np.array([self.alpha[i].value() for i in range(self.N)])
                self.A = np.hstack((self.A, new_items.reshape(len(new_items), 1)))
                self.d_problem += lpSum(
                    self.A[i, -1] * self.y[i] for i in range(self.N)
                ) <= 1
                #self.InitDualProblem()
                # 追加したアイテム詰め合わせを表示
                #print(new_items)
                out_str = "Epoch: " + str(itr) + "\n"
                out_str = "DP objective: " + str(self.d_problem.objective.value()) + "\n"
                out_str += "KP objective: " + str(self.k_problem.objective.value()) + "\n"
                out_str += "Items: [" 
                for i in range(self.N):
                    if self.alpha[i].value() > 0:
                        out_str += str(self.s[i]) + ","
                out_str += "]"
                print(out_str)
                itr += 1
                
        # 6. ナップサック問題の解から、主問題に詰め込みパターンを追加
        if self.solve_status == self.STATUS_SOLVED:
            print("Column generation completed! Solving primal problem.")
            self.SolvePrimalProblem()
            print("Completed! Objective value is ", str(self.p_problem.objective.value()))
0
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
3