LoginSignup
8
4

More than 3 years have passed since last update.

幅優先探索、深さ優先探索を用いてPERTのクリティカルパスを求める

Last updated at Posted at 2020-05-06

背景

Atcoderで茶色から進まなくなってきたので
アルゴリズムの勉強を本格的に始めました。
幅優先探索(BFS)、深さ優先探索(DFS)について少し理解できたので
復習として、基本情報技術者試験の問題でも出題されている
PERTでの最早結合点時刻、最遅結合点時刻、クリティカルパスを
求めてみることにしました。

目的

グラフの情報を入力することで、PERTにおける
・最早結合点時刻
・最遅結合点時刻
・クリティカルパス
を計算するプログラムを実装します。

幅優先探索(BFS),深さ優先探索(DFS)について

@drken様の記事が非常にわかりやすかったです。

DFS (深さ優先探索) 超入門! 〜 グラフ・アルゴリズムの世界への入口 〜【前編】
DFS (深さ優先探索) 超入門! 〜 グラフ・アルゴリズムの世界への入口 〜【後編】
BFS (幅優先探索) 超入門! 〜 キューを鮮やかに使いこなす 〜

PERTについて

作業の順序や依存関係をアローダイアグラムに表すことで、
プロジェクト完了までの最短所要日数と最重要管理作業を明らかにする手法です。
@sakamoto_t様の記事にまとめられていましたのでそちらをご覧ください。
WBSって何?

以下のアローダイアグラムを例として用います。
PERT_図解.png

実装コード

main.py
from collections import deque
import numpy as np
import sys

def main():
    node_num, path_num, start, goal, path = input_data()
    Graph, Graph_critical, Graph_reverse, need_day = made_graph(node_num, path_num, path)
    earliest_node_time = bfs_normal(node_num, start, Graph, need_day)
    latest_node_time = bfs_reverse(node_num, goal, earliest_node_time, Graph_reverse, need_day)
    candidate = check_critical_candidate(node_num, latest_node_time, earliest_node_time)
    temp = list(check_critical_path(start, goal, candidate, Graph_critical))
    critical_path = [str(path) for path in temp]

    #-------------出力部分-----------------
    print('------------------')
    print('Earliest_node_time')
    print(earliest_node_time)
    print('------------------')
    print('Latest_node_time')
    print(latest_node_time)
    print('------------------')
    print('Critical path')
    print(' -> '.join(critical_path))

    return 0

#---------------入力部分--------------------    
def input_data():
    node_num, path_num = map(int, input().split())
    start, goal = 1, node_num
    path = [input().split() for i in range(path_num)]
    return node_num, path_num, start, goal, path


#---------------グラフ作成-------------------
def made_graph(node_num, path_num, path):
    Graph = [deque([]) for i in range(node_num+1)] 
    Graph_critical = [deque([]) for i in range(node_num+1)]
    Graph_reverse = [deque([]) for i in range(node_num+1)]

    #Graph[0], Graph_reverse[0]は捨てている(グラフ構造の作成で頂点番号を1から取っているため)

    need_day = np.zeros((node_num, node_num))

    for i in range(path_num):
        Graph[int(path[i][0])].append(int(path[i][1]))
        Graph_critical[int(path[i][0])].append(int(path[i][1]))
        Graph_reverse[int(path[i][1])].append(int(path[i][0]))     #逆向き有向グラフの作成
        need_day[int(path[i][0]) - 1][int(path[i][1]) - 1] = int(path[i][2])
        need_day[int(path[i][1]) - 1][int(path[i][0]) - 1] = int(path[i][2])

    return Graph, Graph_critical, Graph_reverse, need_day


#------------BFS作業部分(最早結合点時刻を求める)------------------
def bfs_normal(node_num, start, Graph, need_day):
    stack = deque([start])
    earliest_node_time = [0 for i in range(node_num)]

    while stack:    #stackが空になるまで継続
        tmp = stack[0]
        if Graph[tmp]:
            w = Graph[tmp].popleft() 
            if earliest_node_time[w-1] < earliest_node_time[tmp-1] + need_day[tmp-1][w-1]:
                earliest_node_time[w-1] = int(earliest_node_time[tmp-1] + need_day[tmp-1][w-1])
                stack.append(w)
        else:
            stack.popleft()
    return earliest_node_time


#------------BFS作業部分(最遅結合点時刻を求める)------------------
def bfs_reverse(node_num, goal, earliest_node_time, Graph_reverse, need_day):
    stack = deque([goal])
    latest_node_time = [float('inf') for i in range(node_num)]
    latest_node_time[goal-1] = earliest_node_time[goal-1]
    while stack:    #stackが空になるまで継続
        tmp = stack[0]
        if Graph_reverse[tmp]:
            w = Graph_reverse[tmp].popleft() 
            if latest_node_time[w-1] > latest_node_time[tmp-1] - need_day[tmp-1][w-1]:
                latest_node_time[w-1] = int(latest_node_time[tmp-1] - need_day[tmp-1][w-1])
                stack.append(w)
        else:
            stack.popleft()
    return latest_node_time


#--------------EL = TLとなる点を抜き出す-------------------
def check_critical_candidate(node_num, latest_node_time, earliest_node_time):
    critical_candidate = [False for i in range(node_num)]
    for i in range(node_num):
        if latest_node_time[i] == earliest_node_time[i]:
            critical_candidate[i] = True
    return critical_candidate


#--------------DFS作業部分(クリティカルパスの候補点を辿ってゴールまで探索する)-----------
def check_critical_path(start, goal, candidate, Graph_critical):
    stack = [start]
    seen = deque([start])

    while stack:    #stackが空になるまで継続
        tmp = stack[-1]
        if Graph_critical[tmp]:
            w = Graph_critical[tmp].popleft() 
            if w == goal:
                seen.append(w)
                return seen
            elif candidate[w - 1]:
                seen.append(w)
                stack.append(w)
            else:
                continue
        else:
            stack.pop()
            seen.pop()
    return seen


if __name__ == '__main__':
    sys.exit(main())


入力

作業開始のnodeは1、作業終了のnodeはノードの最大値(=node_num)とします。
$N$:状態の数(node_num)
$P$:作業数(path_num)
${s_i, g_i, K_i}$: 作業前状態, 作業後状態, 必要日数
として、

$N\ P$
$s_0\ g_0\ K_0$
$s_1\ g_1\ K_1$
$s_2\ g_2\ K_2$
・・・
$s_{M-1}\ g_{M-1}\ K_{M-1}$

上の画像の例では

$7\ 8$
$1\ 2\ 17$
$2\ 4\ 7$
$4\ 6\ 5$
$6\ 7\ 11$
$1\ 3\ 11$
$3\ 4\ 5$
$3\ 5\ 10$
$5\ 6\ 9$

になります。

出力

------------------
Earliest_node_time
[0, 17, 11, 24, 21, 30, 41]
------------------
Latest_node_time
[0, 18, 11, 25, 21, 30, 41]
------------------
Critical path
1 -> 3 -> 5 -> 6 -> 7

以下、説明します。

課題の分解

クリティカルパスを求めるために、
1. 最早結合点時刻(以下、EL)を求める
2. 最遅結合点時刻(以下、TL)を求める
3. EL = TLとなる、クリティカルパスのノードになりうる候補を抜き出す
4. 3の候補ノードを通る条件の元で深さ優先探索をすることでクリティカルパスの一例を求める

の4段階での実装を考えました。それぞれについて説明していきます。

グラフを作成する

最遅結合点時刻を求める時、各ノードの最早結合点時刻の情報から、各作業にかかる日数を
引いて比較することになるので、グラフの矢印を逆向きに進むことが必要でした。
そこで、グラフ作成時に逆の進行方向をもつグラフも同時に作成し、
作業日数については行列の形にして、逆向きの情報も同時に格納することにしました。

   #---------------グラフ作成-------------------
def made_graph(node_num, path_num, path):
    Graph = [deque([]) for i in range(node_num+1)] 
    Graph_critical = [deque([]) for i in range(node_num+1)]
    Graph_reverse = [deque([]) for i in range(node_num+1)]

    #Graph[0], Graph_reverse[0]は捨てている(グラフ構造の作成で頂点番号を1から取っているため)

    need_day = np.zeros((node_num, node_num))

    for i in range(path_num):
        Graph[int(path[i][0])].append(int(path[i][1]))
        Graph_critical[int(path[i][0])].append(int(path[i][1]))
        Graph_reverse[int(path[i][1])].append(int(path[i][0]))     #逆向き有向グラフの作成
        need_day[int(path[i][0]) - 1][int(path[i][1]) - 1] = int(path[i][2])
        need_day[int(path[i][1]) - 1][int(path[i][0]) - 1] = int(path[i][2])

    return Graph, Graph_critical, Graph_reverse, need_day

"""
------------------
Path
[deque([2, 3]), deque([4]), deque([4, 5]), deque([6]), deque([6]), deque([7]), deque([])]
------------------
Reverse_Path
[deque([]), deque([1]), deque([1]), deque([2, 3]), deque([3]), deque([4, 5]), deque([6])]
------------------
Need_time
[[ 0. 17. 11.  0.  0.  0.  0.]
 [17.  0.  0.  7.  0.  0.  0.]
 [11.  0.  0.  5. 10.  0.  0.]
 [ 0.  7.  5.  0.  0.  5.  0.]
 [ 0.  0. 10.  0.  0.  9.  0.]
 [ 0.  0.  0.  5.  9.  0. 11.]
 [ 0.  0.  0.  0.  0. 11.  0.]]

"""

最早結合点時刻(以下、ET)を求める

この問題は、各ノードでの辺の重みの合計が最大になる経路を選択する問題に置き換えられます。
逆に重みを最小化する経路問題としては ダイクストラ法が有名です。
今回はその考え方を用いて実装しました。

   #------------BFS作業部分(最早結合点時刻を求める)------------------
def bfs_normal(start):
    stack = deque([start])

    while stack:    #stackが空になるまで継続
        tmp = stack[0]
        if Graph[tmp]:
            w = Graph[tmp].popleft() 
            if earliest_node_time[w-1] < earliest_node_time[tmp-1] + need_day[tmp-1][w-1]:
                earliest_node_time[w-1] = int(earliest_node_time[tmp-1] + need_day[tmp-1][w-1])
                stack.append(w)
        else:
            stack.popleft()
    return earliest_node_time

幅優先探索をしていき、作業日数を足してEarliest_node_timeを更新していきます。
複数経路がある場合には、すでにある数値よりも大きい場合には更新します。

最遅結合点時刻(以下、LT)を求める

次に、LTを求めます。この時は、Latest_node_timeとして、初期値が無限大の
配列を用意しておき、より小さい方に更新します。

 #------------BFS作業部分(最遅結合点時刻を求める)------------------
def bfs_reverse(goal):
    stack = deque([goal])
    latest_node_time[goal-1] = earliest_node_time[goal-1]
    while stack:    #stackが空になるまで継続
        tmp = stack[0]
        if Graph_reverse[tmp]:
            w = Graph_reverse[tmp].popleft() 
            if latest_node_time[w-1] > latest_node_time[tmp-1] - need_day[tmp-1][w-1]:
                latest_node_time[w-1] = int(latest_node_time[tmp-1] - need_day[tmp-1][w-1])
                stack.append(w)
        else:
            stack.popleft()
    return latest_node_time

クリティカルパスを求める

クリティカルパスはEL = TLとなるノードをつないで行ったものです。
逆にいうと、EL = TL を満たすノードを辿ってゴールまでたどり着かなければいけないので、
EL = TLとなる候補ノードを通ることを条件として
深さ優先探索を行い、startからgoalの到達問題をときました。

各要素を比較することによってEL = TLの候補ノードを抜き出します。


#--------------EL = TLとなる点を抜き出す-------------------
def check_critical_candidate(node_num, latest_node_time, earliest_node_time):
    critical_candidate = [False for i in range(node_num)]
    for i in range(node_num):
        if latest_node_time[i] == earliest_node_time[i]:
            critical_candidate[i] = True
    return critical_candidate

最後にDFSによってゴールにたどり着けるかどうかを判定します。
各ノードにたどり着いた時に候補の中にあるかどうかを判定します。
ゴールにたどり着いた時点で終わるので、クリティカルパスが複数ある場合は
そのうちの一つを表示します。


 #--------------DFS作業部分(クリティカルパスの候補点を辿ってゴールまで探索する)-----------
def check_critical_path(start, goal, candidate, Graph_critical):
    stack = [start]
    seen = deque([start])

    while stack:    #stackが空になるまで継続
        tmp = stack[-1]
        if Graph_critical[tmp]:
            w = Graph_critical[tmp].popleft() 
            if w == goal:
                seen.append(w)
                return seen
            elif candidate[w - 1]:
                seen.append(w)
                stack.append(w)
            else:
                continue
        else:
            stack.pop()
            seen.pop()
    return seen

メイン関数部分

def main():
    node_num, path_num, start, goal, path = input_data()
    Graph, Graph_critical, Graph_reverse, need_day = made_graph(node_num, path_num, path)
    earliest_node_time = bfs_normal(node_num, start, Graph, need_day)
    latest_node_time = bfs_reverse(node_num, goal, earliest_node_time, Graph_reverse, need_day)
    candidate = check_critical_candidate(node_num, latest_node_time, earliest_node_time)
    temp = list(check_critical_path(start, goal, candidate, Graph_critical))
    critical_path = [str(path) for path in temp]

    #-------------出力部分-----------------
    print('------------------')
    print('Earliest_node_time')
    print(earliest_node_time)
    print('------------------')
    print('Latest_node_time')
    print(latest_node_time)
    print('------------------')
    print('Critical path')
    print(' -> '.join(critical_path))

    return 0

もう少し綺麗に書きたいんですが
知識が足りませんでした。いつか改良します。

まとめ

・最早結合点時間を出すためには重み付き有向グラフの
 重みの合計値を最大化させる問題として解けば良い。
 (ダイクストラ法の考え方を参考にする)
・クリティカルパスを割り出すためにはDFSとBFSを組み合わせる必要がある(?)
 →より良い方法があれば教えていただきたいです。

感想

初めてQiitaの記事を書いたので、記事を書く練習もできたのでよかったです。
積極的に書いて思考の整理をしたいと思います。

その他参考文献

Pythonを用いたBFSの実装には以下の@takayg1様の記事を参考にさせていただきました。
Pythonでアルゴリズム(深さ優先探索、dfs)

8
4
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
4